/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JdbcSource;
import org.apache.hudi.utilities.testutils.JdbcTestUtils;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.storage.StorageLevel;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestJdbcSource
extends UtilitiesTestBase {
    private static final TypedProperties PROPS = new TypedProperties();
    private static final HoodieTestDataGenerator DATA_GENERATOR = new HoodieTestDataGenerator();
    private static Connection connection;

    @BeforeAll
    public static void beforeAll() throws Exception {
        UtilitiesTestBase.initTestServices(false, false, false);
    }

    @Override
    @BeforeEach
    public void setup() throws Exception {
        super.setup();
        PROPS.setProperty("hoodie.deltastreamer.jdbc.url", "jdbc:h2:mem:test_mem");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.driver.class", "org.h2.Driver");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.user", "test");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.password", "jdbc");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.name", "triprec");
        connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc");
    }

    @Override
    @AfterEach
    public void teardown() throws Exception {
        super.teardown();
        JdbcTestUtils.close(connection);
    }

    @Test
    public void testSingleCommit() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
        try {
            int numRecords = 100;
            String commitTime = "000";
            JdbcTestUtils.clearAndInsert(commitTime, numRecords, connection, DATA_GENERATOR, PROPS);
            Assertions.assertEquals((int)numRecords, (int)JdbcTestUtils.count(connection, "triprec"));
            Dataset rowDataset = (Dataset)this.runSource((Option<String>)Option.empty(), numRecords).getBatch().get();
            Assertions.assertEquals((long)numRecords, (long)rowDataset.count());
        }
        catch (SQLException e) {
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    public void testInsertAndUpdate() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
        try {
            String commitTime = "000";
            int numRecords = 100;
            JdbcTestUtils.update("007", JdbcTestUtils.clearAndInsert("000", 100, connection, DATA_GENERATOR, PROPS).stream().limit(50L).collect(Collectors.toList()), connection, DATA_GENERATOR, PROPS);
            Assertions.assertEquals((int)100, (int)JdbcTestUtils.count(connection, "triprec"));
            Dataset rowDataset = (Dataset)this.runSource((Option<String>)Option.empty(), 100L).getBatch().get();
            Assertions.assertEquals((long)100L, (long)rowDataset.count());
            Dataset firstCommit = rowDataset.where("commit_time=000");
            Assertions.assertEquals((long)50L, (long)firstCommit.count());
            Dataset secondCommit = rowDataset.where("commit_time=007");
            Assertions.assertEquals((long)50L, (long)secondCommit.count());
        }
        catch (Exception e) {
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    public void testTwoCommits() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
        try {
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            Dataset rowDataset = (Dataset)this.runSource((Option<String>)Option.empty(), 10L).getBatch().get();
            Assertions.assertEquals((long)10L, (long)rowDataset.where("commit_time=000").count());
            JdbcTestUtils.insert("001", 5, connection, DATA_GENERATOR, PROPS);
            rowDataset = (Dataset)this.runSource((Option<String>)Option.empty(), 15L).getBatch().get();
            Assertions.assertEquals((long)15L, (long)rowDataset.count());
            Assertions.assertEquals((long)5L, (long)rowDataset.where("commit_time=001").count());
            Assertions.assertEquals((long)10L, (long)rowDataset.where("commit_time=000").count());
            rowDataset = (Dataset)this.runSource((Option<String>)Option.empty(), 15L).getBatch().get();
            Assertions.assertEquals((long)15L, (long)rowDataset.count());
        }
        catch (Exception e) {
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    public void testIncrementalFetchWithCommitTime() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
        try {
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            InputBatch<Dataset<Row>> batch = this.runSource((Option<String>)Option.empty(), 10L);
            Dataset rowDataset = (Dataset)batch.getBatch().get();
            Assertions.assertEquals((long)10L, (long)rowDataset.count());
            JdbcTestUtils.insert("001", 10, connection, DATA_GENERATOR, PROPS);
            rowDataset = (Dataset)this.runSource((Option<String>)Option.of((Object)batch.getCheckpointForNextBatch()), 10L).getBatch().get();
            Assertions.assertEquals((long)10L, (long)rowDataset.count());
            Assertions.assertEquals((long)10L, (long)rowDataset.where("commit_time=001").count());
        }
        catch (Exception e) {
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    public void testIncrementalFetchWithNoMatchingRows() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
        try {
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            InputBatch<Dataset<Row>> batch = this.runSource((Option<String>)Option.empty(), 10L);
            Dataset rowDataset = (Dataset)batch.getBatch().get();
            Assertions.assertEquals((long)10L, (long)rowDataset.count());
            rowDataset = (Dataset)this.runSource((Option<String>)Option.of((Object)batch.getCheckpointForNextBatch()), 10L).getBatch().get();
            Assertions.assertEquals((long)0L, (long)rowDataset.count());
        }
        catch (Exception e) {
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    public void testIncrementalFetchWhenTableRecordsMoreThanSourceLimit() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "id");
        try {
            JdbcTestUtils.clearAndInsert("000", 100, connection, DATA_GENERATOR, PROPS);
            InputBatch<Dataset<Row>> batch = this.runSource((Option<String>)Option.empty(), 100L);
            Dataset rowDataset = (Dataset)batch.getBatch().get();
            Assertions.assertEquals((long)100L, (long)rowDataset.count());
            JdbcTestUtils.insert("001", 100, connection, DATA_GENERATOR, PROPS);
            batch = this.runSource((Option<String>)Option.of((Object)batch.getCheckpointForNextBatch()), 60L);
            rowDataset = (Dataset)batch.getBatch().get();
            Assertions.assertEquals((long)60L, (long)rowDataset.count());
            Assertions.assertEquals((long)60L, (long)rowDataset.where("commit_time=001").count());
            rowDataset = (Dataset)this.runSource((Option<String>)Option.of((Object)batch.getCheckpointForNextBatch()), 75L).getBatch().get();
            Assertions.assertEquals((long)40L, (long)rowDataset.count());
        }
        catch (Exception e) {
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    public void testIncrementalFetchWhenLastCheckpointMoreThanTableRecords() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "id");
        try {
            JdbcTestUtils.clearAndInsert("000", 100, connection, DATA_GENERATOR, PROPS);
            InputBatch<Dataset<Row>> batch = this.runSource((Option<String>)Option.empty(), 100L);
            Dataset rowDataset = (Dataset)batch.getBatch().get();
            Assertions.assertEquals((long)100L, (long)rowDataset.count());
            Assertions.assertEquals((Object)"100", (Object)batch.getCheckpointForNextBatch());
            JdbcTestUtils.insert("001", 100, connection, DATA_GENERATOR, PROPS);
            batch = this.runSource((Option<String>)Option.of((Object)"200"), 50L);
            rowDataset = (Dataset)batch.getBatch().get();
            Assertions.assertEquals((long)0L, (long)rowDataset.count());
        }
        catch (Exception e) {
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    public void testIncrementalFetchFallbackToFullFetchWhenError() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
        try {
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            InputBatch<Dataset<Row>> batch = this.runSource((Option<String>)Option.empty(), 10L);
            Dataset rowDataset = (Dataset)batch.getBatch().get();
            Assertions.assertEquals((long)10L, (long)rowDataset.count());
            JdbcTestUtils.insert("001", 10, connection, DATA_GENERATOR, PROPS);
            PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "dummy_col");
            Assertions.assertThrows(HoodieException.class, () -> this.runSource((Option<String>)Option.of((Object)batch.getCheckpointForNextBatch()), -1L));
            PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch", "true");
            Exception exception = (Exception)Assertions.assertThrows(HoodieException.class, () -> this.runSource((Option<String>)Option.of((Object)batch.getCheckpointForNextBatch()), -1L));
            Assertions.assertTrue((boolean)exception.getMessage().contains("Failed to checkpoint"));
        }
        catch (Exception e) {
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    public void testFullFetchWithCommitTime() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
        try {
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            Dataset rowDataset = (Dataset)this.runSource((Option<String>)Option.empty(), 10L).getBatch().get();
            Assertions.assertEquals((long)10L, (long)rowDataset.count());
            JdbcTestUtils.insert("001", 10, connection, DATA_GENERATOR, PROPS);
            rowDataset = (Dataset)this.runSource((Option<String>)Option.empty(), 20L).getBatch().get();
            Assertions.assertEquals((long)20L, (long)rowDataset.count());
            Assertions.assertEquals((long)10L, (long)rowDataset.where("commit_time=000").count());
            Assertions.assertEquals((long)10L, (long)rowDataset.where("commit_time=001").count());
        }
        catch (Exception e) {
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    public void testFullFetchWithCheckpoint() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "last_insert");
        try {
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            InputBatch<Dataset<Row>> batch = this.runSource((Option<String>)Option.empty(), 10L);
            Dataset rowDataset = (Dataset)batch.getBatch().get();
            Assertions.assertEquals((long)10L, (long)rowDataset.count());
            Assertions.assertEquals((Object)"", (Object)batch.getCheckpointForNextBatch());
            Column incrementalColumn = rowDataset.col(PROPS.getString("hoodie.deltastreamer.jdbc.table.incr.column.name"));
            String max = ((Row)rowDataset.agg(functions.max((Column)incrementalColumn).cast(DataTypes.StringType), new Column[0]).first()).getString(0);
            JdbcTestUtils.insert("001", 10, connection, DATA_GENERATOR, PROPS);
            rowDataset = (Dataset)this.runSource((Option<String>)Option.of((Object)max), 10L).getBatch().get();
            Assertions.assertEquals((long)10L, (long)rowDataset.count());
            Assertions.assertEquals((long)10L, (long)rowDataset.where("commit_time=001").count());
        }
        catch (Exception e) {
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSourceWithPasswordOnFs() {
        try {
            this.writeSecretToFs();
            PROPS.remove((Object)"hoodie.deltastreamer.jdbc.password");
            PROPS.setProperty("hoodie.deltastreamer.jdbc.password.file", "file:///tmp/hudi/config/secret");
            PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            Dataset rowDataset = (Dataset)this.runSource((Option<String>)Option.empty(), 10L).getBatch().get();
            Assertions.assertEquals((long)10L, (long)rowDataset.count());
        }
        catch (Exception e) {
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSourceWithNoPasswordThrowsException() {
        Assertions.assertThrows(HoodieException.class, () -> {
            this.writeSecretToFs();
            PROPS.remove((Object)"hoodie.deltastreamer.jdbc.password");
            PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            this.runSource((Option<String>)Option.empty(), 10L);
        });
    }

    @Test
    public void testSourceWithExtraOptions() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.extra.options.fetchsize", "10");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
        PROPS.remove((Object)"hoodie.deltastreamer.jdbc.table.incr.column.name");
        try {
            JdbcTestUtils.clearAndInsert("000", 20, connection, DATA_GENERATOR, PROPS);
            Dataset rowDataset = (Dataset)this.runSource((Option<String>)Option.empty(), 10L).getBatch().get();
            Assertions.assertEquals((long)10L, (long)rowDataset.count());
        }
        catch (Exception e) {
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSourceWithStorageLevel() {
        PROPS.setProperty("hoodie.deltastreamer.jdbc.storage.level", "NONE");
        PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
        try {
            JdbcTestUtils.clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
            Dataset rowDataset = (Dataset)this.runSource((Option<String>)Option.empty(), 10L).getBatch().get();
            Assertions.assertEquals((long)10L, (long)rowDataset.count());
            Assertions.assertEquals((Object)StorageLevel.NONE(), (Object)rowDataset.storageLevel());
        }
        catch (Exception e) {
            Assertions.fail((String)e.getMessage());
        }
    }

    private void writeSecretToFs() throws IOException {
        FileSystem fs = FileSystem.get((Configuration)new Configuration());
        FSDataOutputStream outputStream = fs.create(new Path("file:///tmp/hudi/config/secret"));
        outputStream.writeBytes("jdbc");
        outputStream.close();
    }

    private InputBatch<Dataset<Row>> runSource(Option<String> lastCkptStr, long sourceLimit) {
        JdbcSource jdbcSource = new JdbcSource(PROPS, jsc, sparkSession, null);
        return jdbcSource.fetchNewData(lastCkptStr, sourceLimit);
    }
}

