/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.deltastreamer;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.TableExecutionContext;
import org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class TestHoodieMultiTableDeltaStreamer
extends HoodieDeltaStreamerTestBase {
    private static final Logger LOG = LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class);

    @Test
    public void testMetaSyncConfig() throws IOException {
        HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig("test-source1.properties", basePath + "/config", TestDataSource.class.getName(), true, true, null);
        HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
        TableExecutionContext executionContext = (TableExecutionContext)streamer.getTableExecutionContexts().get(1);
        Assertions.assertEquals((Object)"com.example.DummySyncTool1,com.example.DummySyncTool2", (Object)executionContext.getConfig().syncClientToolClassNames);
    }

    @Test
    public void testInvalidHiveSyncProps() throws IOException {
        HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig("test-invalid-hive-sync-source1.properties", basePath + "/config", TestDataSource.class.getName(), true, true, null);
        Exception e = (Exception)Assertions.assertThrows(HoodieException.class, () -> new HoodieMultiTableDeltaStreamer(cfg, jsc), (String)"Should fail when hive sync table not provided with enableHiveSync flag");
        LOG.debug((Object)"Expected error when creating table execution objects", (Throwable)e);
        Assertions.assertTrue((boolean)e.getMessage().contains("Meta sync table field not provided!"));
    }

    @Test
    public void testInvalidPropsFilePath() throws IOException {
        HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig("test-invalid-props.properties", basePath + "/config", TestDataSource.class.getName(), true, true, null);
        Exception e = (Exception)Assertions.assertThrows(IllegalArgumentException.class, () -> new HoodieMultiTableDeltaStreamer(cfg, jsc), (String)"Should fail when invalid props file is provided");
        LOG.debug((Object)"Expected error when creating table execution objects", (Throwable)e);
        Assertions.assertTrue((boolean)e.getMessage().contains("Please provide valid common config file path!"));
    }

    @Test
    public void testInvalidTableConfigFilePath() throws IOException {
        HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig("test-invalid-table-config.properties", basePath + "/config", TestDataSource.class.getName(), true, true, null);
        Exception e = (Exception)Assertions.assertThrows(IllegalArgumentException.class, () -> new HoodieMultiTableDeltaStreamer(cfg, jsc), (String)"Should fail when invalid table config props file path is provided");
        LOG.debug((Object)"Expected error when creating table execution objects", (Throwable)e);
        Assertions.assertTrue((boolean)e.getMessage().contains("Please provide valid table config file path!"));
    }

    @Test
    public void testCustomConfigProps() throws IOException {
        HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig("test-source1.properties", basePath + "/config", TestDataSource.class.getName(), false, false, SchemaRegistryProvider.class);
        HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
        TableExecutionContext executionContext = (TableExecutionContext)streamer.getTableExecutionContexts().get(1);
        Assertions.assertEquals((int)2, (int)streamer.getTableExecutionContexts().size());
        Assertions.assertEquals((Object)(basePath + "/multi_table_dataset/uber_db/dummy_table_uber"), (Object)executionContext.getConfig().targetBasePath);
        Assertions.assertEquals((Object)"uber_db.dummy_table_uber", (Object)executionContext.getConfig().targetTableName);
        Assertions.assertEquals((Object)"topic1", (Object)executionContext.getProperties().getString("hoodie.deltastreamer.source.kafka.topic"));
        Assertions.assertEquals((Object)"_row_key", (Object)executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key()));
        Assertions.assertEquals((Object)TestHoodieDeltaStreamer.TestGenerator.class.getName(), (Object)executionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key()));
        Assertions.assertEquals((Object)"uber_hive_dummy_table", (Object)executionContext.getProperties().getString("hoodie.datasource.hive_sync.table"));
        Assertions.assertEquals((Object)"http://localhost:8081/subjects/random-value/versions/latest", (Object)executionContext.getProperties().getString("hoodie.deltastreamer.schemaprovider.registry.url"));
        Assertions.assertEquals((Object)"http://localhost:8081/subjects/topic2-value/versions/latest", (Object)((TableExecutionContext)streamer.getTableExecutionContexts().get(0)).getProperties().getString("hoodie.deltastreamer.schemaprovider.registry.url"));
    }

    @Test
    @Disabled
    public void testInvalidIngestionProps() {
        Exception e = (Exception)Assertions.assertThrows(Exception.class, () -> {
            HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig("test-source1.properties", basePath + "/config", TestDataSource.class.getName(), true, true, null);
            new HoodieMultiTableDeltaStreamer(cfg, jsc);
        }, (String)"Creation of execution object should fail without kafka topic");
        LOG.debug((Object)("Creation of execution object failed with error: " + e.getMessage()), (Throwable)e);
        Assertions.assertTrue((boolean)e.getMessage().contains("Please provide valid table config arguments!"));
    }

    @Test
    public void testMultiTableExecutionWithKafkaSource() throws IOException {
        String topicName1 = "topic" + testNum++;
        String topicName2 = "topic" + testNum;
        testUtils.createTopic(topicName1, 2);
        testUtils.createTopic(topicName2, 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        testUtils.sendMessages(topicName1, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(5), "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        testUtils.sendMessages(topicName2, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(10), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig("test-source1.properties", basePath + "/config", JsonKafkaSource.class.getName(), false, false, null);
        HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
        List executionContexts = streamer.getTableExecutionContexts();
        TypedProperties properties = ((TableExecutionContext)executionContexts.get(1)).getProperties();
        properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source_uber.avsc");
        properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/target_uber.avsc");
        properties.setProperty("hoodie.datasource.write.partitionpath.field", "timestamp");
        properties.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName2);
        ((TableExecutionContext)executionContexts.get(1)).setProperties(properties);
        TypedProperties properties1 = ((TableExecutionContext)executionContexts.get(0)).getProperties();
        properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source_short_trip_uber.avsc");
        properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/target_short_trip_uber.avsc");
        properties1.setProperty("hoodie.datasource.write.partitionpath.field", "timestamp");
        properties1.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName1);
        ((TableExecutionContext)executionContexts.get(0)).setProperties(properties1);
        String targetBasePath1 = ((TableExecutionContext)executionContexts.get((int)0)).getConfig().targetBasePath;
        String targetBasePath2 = ((TableExecutionContext)executionContexts.get((int)1)).getConfig().targetBasePath;
        streamer.sync();
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5L, targetBasePath1, sqlContext);
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10L, targetBasePath2, sqlContext);
        testUtils.sendMessages(topicName1, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", Integer.valueOf(5), "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        testUtils.sendMessages(topicName2, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", Integer.valueOf(10), "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
        streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
        ((TableExecutionContext)streamer.getTableExecutionContexts().get(1)).setProperties(properties);
        ((TableExecutionContext)streamer.getTableExecutionContexts().get(0)).setProperties(properties1);
        streamer.sync();
        Assertions.assertEquals((int)2, (int)streamer.getSuccessTables().size());
        Assertions.assertTrue((boolean)streamer.getFailedTables().isEmpty());
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5L, targetBasePath1, sqlContext);
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10L, targetBasePath2, sqlContext);
        ++testNum;
    }

    @Test
    public void testMultiTableExecutionWithParquetSource() throws IOException {
        String parquetSourceRoot1 = basePath + "/parquetSrcPath1/";
        TestHoodieMultiTableDeltaStreamer.prepareParquetDFSFiles(10, parquetSourceRoot1);
        String parquetSourceRoot2 = basePath + "/parquetSrcPath2/";
        TestHoodieMultiTableDeltaStreamer.prepareParquetDFSFiles(5, parquetSourceRoot2);
        String parquetPropsFile = this.populateCommonPropsAndWriteToFile();
        HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(parquetPropsFile, basePath + "/config", ParquetDFSSource.class.getName(), false, false, false, "multi_table_parquet", null);
        HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
        List executionContexts = streamer.getTableExecutionContexts();
        this.ingestPerParquetSourceProps(executionContexts, Arrays.asList(parquetSourceRoot1, parquetSourceRoot2));
        String targetBasePath1 = ((TableExecutionContext)executionContexts.get((int)0)).getConfig().targetBasePath;
        String targetBasePath2 = ((TableExecutionContext)executionContexts.get((int)1)).getConfig().targetBasePath;
        this.syncAndVerify(streamer, targetBasePath1, targetBasePath2, 10L, 5L);
        int totalTable1Records = 10;
        int totalTable2Records = 5;
        for (int i = 0; i < 3; ++i) {
            int table1Records = 10 + RANDOM.nextInt(100);
            int table2Records = 15 + RANDOM.nextInt(100);
            TestHoodieMultiTableDeltaStreamer.prepareParquetDFSFiles(table1Records, parquetSourceRoot1, i + 2 + ".parquet", false, null, null);
            TestHoodieMultiTableDeltaStreamer.prepareParquetDFSFiles(table2Records, parquetSourceRoot2, i + 2 + ".parquet", false, null, null);
            this.syncAndVerify(streamer, targetBasePath1, targetBasePath2, totalTable1Records += table1Records, totalTable2Records += table2Records);
        }
    }

    @Test
    public void testTableLevelProperties() throws IOException {
        HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig("test-source1.properties", basePath + "/config", TestDataSource.class.getName(), false, false, null);
        HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
        List tableExecutionContexts = streamer.getTableExecutionContexts();
        tableExecutionContexts.forEach(tableExecutionContext -> {
            switch (tableExecutionContext.getTableName()) {
                case "dummy_table_short_trip": {
                    String tableLevelKeyGeneratorClass = tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key());
                    Assertions.assertEquals((Object)TestHoodieDeltaStreamer.TestTableLevelGenerator.class.getName(), (Object)tableLevelKeyGeneratorClass);
                    break;
                }
                default: {
                    String defaultKeyGeneratorClass = tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key());
                    Assertions.assertEquals((Object)TestHoodieDeltaStreamer.TestGenerator.class.getName(), (Object)defaultKeyGeneratorClass);
                }
            }
        });
    }

    private String populateCommonPropsAndWriteToFile() throws IOException {
        TypedProperties commonProps = new TypedProperties();
        TestHoodieMultiTableDeltaStreamer.populateCommonProps(commonProps, basePath);
        UtilitiesTestBase.Helpers.savePropsToDFS(commonProps, fs, basePath + "/" + "test-parquet-dfs-source.properties");
        return "test-parquet-dfs-source.properties";
    }

    private TypedProperties getParquetProps(String parquetSourceRoot) {
        TypedProperties props = new TypedProperties();
        props.setProperty("include", "base.properties");
        props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        props.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        props.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot);
        return props;
    }

    private void ingestPerParquetSourceProps(List<TableExecutionContext> executionContexts, List<String> parquetSourceRoots) {
        int counter = 0;
        for (String parquetSourceRoot : parquetSourceRoots) {
            TypedProperties properties = executionContexts.get(counter).getProperties();
            TypedProperties parquetProps = this.getParquetProps(parquetSourceRoot);
            parquetProps.forEach((k, v) -> properties.setProperty(k.toString(), v.toString()));
            executionContexts.get(counter).setProperties(properties);
            ++counter;
        }
    }

    private void syncAndVerify(HoodieMultiTableDeltaStreamer streamer, String targetBasePath1, String targetBasePath2, long table1ExpectedRecords, long table2ExpectedRecords) {
        streamer.sync();
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(table1ExpectedRecords, targetBasePath1, sqlContext);
        TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(table2ExpectedRecords, targetBasePath2, sqlContext);
    }

    static class TestHelpers {
        TestHelpers() {
        }

        static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync, Class<?> clazz) {
            return TestHelpers.getConfig(fileName, configFolder, sourceClassName, enableHiveSync, enableMetaSync, true, "multi_table_dataset", clazz);
        }

        static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync, boolean setSchemaProvider, String basePathPrefix, Class<?> clazz) {
            HoodieMultiTableDeltaStreamer.Config config = new HoodieMultiTableDeltaStreamer.Config();
            config.configFolder = configFolder;
            config.targetTableName = "dummy_table";
            config.basePathPrefix = basePath + "/" + basePathPrefix;
            config.propsFilePath = basePath + "/" + fileName;
            config.tableType = "COPY_ON_WRITE";
            config.sourceClassName = sourceClassName;
            config.sourceOrderingField = "timestamp";
            if (setSchemaProvider) {
                config.schemaProviderClassName = clazz != null ? clazz.getName() : FilebasedSchemaProvider.class.getName();
            }
            config.enableHiveSync = enableHiveSync;
            config.enableMetaSync = enableMetaSync;
            config.syncClientToolClassNames = "com.example.DummySyncTool1,com.example.DummySyncTool2";
            return config;
        }
    }
}

