/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.deltastreamer;

import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieSparkRecordMerger;
import org.apache.hudi.HoodieSparkUtils$;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncClient;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieIndexer;
import org.apache.hudi.utilities.deltastreamer.DeltaSync;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
import org.apache.hudi.utilities.deltastreamer.NoNewDataTerminationStrategy;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JdbcSource;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.ORCDFSSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.SqlSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
import org.apache.hudi.utilities.testutils.JdbcTestUtils;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF4;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

public class TestHoodieDeltaStreamer
extends HoodieDeltaStreamerTestBase {
    private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);

    protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String tableBasePath, int totalRecords, String asyncCluster, HoodieRecord.HoodieRecordType recordType) throws IOException {
        return this.initialHoodieDeltaStreamer(tableBasePath, totalRecords, asyncCluster, recordType, WriteOperationType.INSERT);
    }

    protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String tableBasePath, int totalRecords, String asyncCluster, HoodieRecord.HoodieRecordType recordType, WriteOperationType writeOperationType) throws IOException {
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, writeOperationType);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        cfg.continuousMode = true;
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        cfg.configs.addAll(this.getAsyncServicesConfigs(totalRecords, "false", "", "", asyncCluster, ""));
        cfg.configs.addAll(this.getAllMultiWriterConfigs());
        return new HoodieDeltaStreamer(cfg, jsc);
    }

    protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute, HoodieRecord.HoodieRecordType recordType) {
        return this.initialHoodieClusteringJob(tableBasePath, clusteringInstantTime, runSchedule, scheduleAndExecute, null, recordType);
    }

    protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute) {
        return this.initialHoodieClusteringJob(tableBasePath, clusteringInstantTime, runSchedule, scheduleAndExecute, null, HoodieRecord.HoodieRecordType.AVRO);
    }

    protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute, Boolean retryLastFailedClusteringJob, HoodieRecord.HoodieRecordType recordType) {
        HoodieClusteringJob.Config scheduleClusteringConfig = this.buildHoodieClusteringUtilConfig(tableBasePath, clusteringInstantTime, runSchedule, scheduleAndExecute, retryLastFailedClusteringJob);
        TestHelpers.addRecordMerger(recordType, scheduleClusteringConfig.configs);
        scheduleClusteringConfig.configs.addAll(this.getAllMultiWriterConfigs());
        return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
    }

    @Test
    public void testProps() {
        TypedProperties props = new DFSPropertiesConfiguration(fs.getConf(), new Path(basePath + "/" + "test-source.properties")).getProps();
        Assertions.assertEquals((int)2, (int)props.getInteger("hoodie.upsert.shuffle.parallelism"));
        Assertions.assertEquals((Object)"_row_key", (Object)props.getString("hoodie.datasource.write.recordkey.field"));
        Assertions.assertEquals((Object)"org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestGenerator", (Object)props.getString("hoodie.datasource.write.keygenerator.class"));
    }

    private static HoodieDeltaStreamer.Config getBaseConfig() {
        HoodieDeltaStreamer.Config base = new HoodieDeltaStreamer.Config();
        base.targetBasePath = "s3://mybucket/blah";
        base.tableType = "COPY_ON_WRITE";
        base.targetTableName = "test";
        return base;
    }

    private static Stream<Arguments> schemaEvolArgs() {
        return Stream.of(Arguments.of((Object[])new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, true, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, false, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, true, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, false, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, true, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, false, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, true, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, false, HoodieRecord.HoodieRecordType.AVRO}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, true, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), true, false, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, true, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL(), false, false, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, true, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), true, false, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, true, HoodieRecord.HoodieRecordType.SPARK}), Arguments.of((Object[])new Object[]{DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL(), false, false, HoodieRecord.HoodieRecordType.SPARK}));
    }

    private static Stream<Arguments> provideValidCliArgs() {
        HoodieDeltaStreamer.Config base = TestHoodieDeltaStreamer.getBaseConfig();
        HoodieDeltaStreamer.Config conf1 = TestHoodieDeltaStreamer.getBaseConfig();
        conf1.baseFileFormat = "PARQUET";
        HoodieDeltaStreamer.Config conf2 = TestHoodieDeltaStreamer.getBaseConfig();
        conf2.sourceLimit = Long.parseLong("500");
        HoodieDeltaStreamer.Config conf3 = TestHoodieDeltaStreamer.getBaseConfig();
        conf3.enableHiveSync = true;
        HoodieDeltaStreamer.Config conf4 = TestHoodieDeltaStreamer.getBaseConfig();
        conf4.configs = Arrays.asList("hoodie.datasource.hive_sync.table=test_table");
        HoodieDeltaStreamer.Config conf5 = TestHoodieDeltaStreamer.getBaseConfig();
        conf5.configs = Arrays.asList("hoodie.datasource.write.recordkey.field=Field1,Field2,Field3");
        HoodieDeltaStreamer.Config conf6 = TestHoodieDeltaStreamer.getBaseConfig();
        conf6.configs = Arrays.asList("hoodie.datasource.hive_sync.table=test_table", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3");
        HoodieDeltaStreamer.Config conf = TestHoodieDeltaStreamer.getBaseConfig();
        conf.baseFileFormat = "PARQUET";
        conf.sourceLimit = Long.parseLong("500");
        conf.enableHiveSync = true;
        conf.configs = Arrays.asList("hoodie.datasource.hive_sync.table=test_table", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3");
        String[] allConfig = new String[]{"--target-base-path", "s3://mybucket/blah", "--source-limit", "500", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--base-file-format", "PARQUET", "--enable-hive-sync", "--hoodie-conf", "hoodie.datasource.hive_sync.table=test_table", "--hoodie-conf", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"};
        return Stream.of(Arguments.of((Object[])new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test"}, base}), Arguments.of((Object[])new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--base-file-format", "PARQUET"}, conf1}), Arguments.of((Object[])new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--source-limit", "500"}, conf2}), Arguments.of((Object[])new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--enable-hive-sync"}, conf3}), Arguments.of((Object[])new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--hoodie-conf", "hoodie.datasource.hive_sync.table=test_table"}, conf4}), Arguments.of((Object[])new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--hoodie-conf", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"}, conf5}), Arguments.of((Object[])new Object[]{new String[]{"--target-base-path", "s3://mybucket/blah", "--table-type", "COPY_ON_WRITE", "--target-table", "test", "--hoodie-conf", "hoodie.datasource.hive_sync.table=test_table", "--hoodie-conf", "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"}, conf6}), Arguments.of((Object[])new Object[]{allConfig, conf}));
    }

    @ParameterizedTest
    @MethodSource(value={"provideValidCliArgs"})
    public void testValidCommandLineArgs(String[] args, HoodieDeltaStreamer.Config expected) {
        Assertions.assertEquals((Object)expected, (Object)HoodieDeltaStreamer.getConfig((String[])args));
    }

    @Test
    public void testKafkaConnectCheckpointProvider() throws IOException {
        String tableBasePath = basePath + "/test_table";
        String bootstrapPath = basePath + "/kafka_topic1";
        String partitionPath = bootstrapPath + "/year=2016/month=05/day=01";
        String filePath = partitionPath + "/kafka_topic1+0+100+200.parquet";
        String checkpointProviderClass = "org.apache.hudi.utilities.checkpointing.KafkaConnectHdfsProvider";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
        TypedProperties props = new DFSPropertiesConfiguration(fs.getConf(), new Path(basePath + "/" + "test-source.properties")).getProps();
        props.put((Object)"hoodie.deltastreamer.checkpoint.provider.path", (Object)bootstrapPath);
        cfg.initialCheckpointProvider = checkpointProviderClass;
        fs.mkdirs(new Path(bootstrapPath));
        fs.mkdirs(new Path(partitionPath));
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(dataGenerator.generateInserts("000", Integer.valueOf(100))), new Path(filePath));
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc, fs, jsc.hadoopConfiguration(), Option.ofNullable((Object)props));
        Assertions.assertEquals((Object)"kafka_topic1,0:200", (Object)deltaStreamer.getConfig().checkpoint);
    }

    @Test
    public void testPropsWithInvalidKeyGenerator() throws Exception {
        Exception e = (Exception)Assertions.assertThrows(IOException.class, () -> {
            String tableBasePath = basePath + "/test_table_invalid_key_gen";
            HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(TripsWithDistanceTransformer.class.getName()), "test-invalid.properties", false), jsc);
            deltaStreamer.sync();
        }, (String)"Should error out when setting the key generator class property to an invalid value");
        LOG.debug((Object)"Expected error during getting the key generator", (Throwable)e);
        Assertions.assertTrue((boolean)e.getMessage().contains("Could not load key generator class"));
    }

    @Test
    public void testTableCreation() throws Exception {
        Exception e = (Exception)Assertions.assertThrows(TableNotFoundException.class, () -> {
            fs.mkdirs(new Path(basePath + "/not_a_table"));
            HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(basePath + "/not_a_table", WriteOperationType.BULK_INSERT), jsc);
            deltaStreamer.sync();
        }, (String)"Should error out when pointed out at a dir thats not a table");
        LOG.debug((Object)"Expected error during table creation", (Throwable)e);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testBulkInsertsAndUpsertsWithBootstrap(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/test_table";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        this.syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
        cfg.sourceLimit = 0L;
        this.syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
        cfg.sourceLimit = 2000L;
        cfg.operation = WriteOperationType.UPSERT;
        this.syncAndAssertRecordCount(cfg, 1950, tableBasePath, "00001", 2);
        List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1950L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        String bootstrapSourcePath = basePath + "/src_bootstrapped";
        Dataset sourceDf = sqlContext.read().format("org.apache.hudi").load(tableBasePath);
        sourceDf.write().format("parquet").partitionBy(new String[]{"rider"}).save(bootstrapSourcePath);
        String newDatasetBasePath = basePath + "/test_dataset_bootstrapped";
        cfg.runBootstrap = true;
        cfg.configs.add(String.format("hoodie.bootstrap.base.path=%s", bootstrapSourcePath));
        cfg.configs.add(String.format("%s=%s", DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "rider"));
        cfg.configs.add(String.format("hoodie.bootstrap.keygen.class=%s", SimpleKeyGenerator.class.getName()));
        cfg.configs.add("hoodie.bootstrap.parallelism=5");
        cfg.targetBasePath = newDatasetBasePath;
        new HoodieDeltaStreamer(cfg, jsc).sync();
        Dataset res = sqlContext.read().format("org.apache.hudi").load(newDatasetBasePath);
        LOG.info((Object)"Schema :");
        res.printSchema();
        TestHelpers.assertRecordCount(1950L, newDatasetBasePath, sqlContext);
        res.registerTempTable("bootstrapped");
        Assertions.assertEquals((long)1950L, (long)sqlContext.sql("select distinct _hoodie_record_key from bootstrapped").count());
        StructField[] fields = res.schema().fields();
        List<String> fieldNames = Arrays.asList(res.schema().fieldNames());
        List<String> expectedFieldNames = Arrays.asList(sourceDf.schema().fieldNames());
        Assertions.assertEquals((int)expectedFieldNames.size(), (int)fields.length);
        Assertions.assertTrue((boolean)fieldNames.containsAll(HoodieRecord.HOODIE_META_COLUMNS));
        Assertions.assertTrue((boolean)fieldNames.containsAll(expectedFieldNames));
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, bootstrapSourcePath);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, newDatasetBasePath);
    }

    @Test
    public void testModifiedTableConfigs() throws Exception {
        String tableBasePath = basePath + "/test_table_modified_configs";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        this.syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
        cfg.sourceLimit = 0L;
        this.syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
        cfg.sourceLimit = 2000L;
        cfg.operation = WriteOperationType.UPSERT;
        cfg.configs.add(HoodieTableConfig.RECORDKEY_FIELDS.key() + "=differentval");
        Assertions.assertThrows(HoodieException.class, () -> this.syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1));
        List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1000L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        HoodieDeltaStreamer.Config newCfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        newCfg.sourceLimit = 2000L;
        newCfg.operation = WriteOperationType.UPSERT;
        this.syncAndAssertRecordCount(newCfg, 1950, tableBasePath, "00001", 2);
        List<Row> counts2 = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1950L, (long)counts2.stream().mapToLong(entry -> entry.getLong(1)).sum());
    }

    private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config cfg, Integer expected, String tableBasePath, String metadata, Integer totalCommits) throws Exception {
        new HoodieDeltaStreamer(cfg, jsc).sync();
        TestHelpers.assertRecordCount(expected.intValue(), tableBasePath, sqlContext);
        TestHelpers.assertDistanceCount(expected.intValue(), tableBasePath, sqlContext);
        TestHelpers.assertCommitMetadata(metadata, tableBasePath, fs, totalCommits);
    }

    @ParameterizedTest
    @MethodSource(value={"schemaEvolArgs"})
    public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, boolean useSchemaPostProcessor, HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/test_table_schema_evolution" + tableType + "_" + useUserProvidedSchema + "_" + useSchemaPostProcessor;
        defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, Collections.singletonList(TestIdentityTransformer.class.getName()), "test-source.properties", false, true, false, null, tableType);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc");
        cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source.avsc");
        cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
        if (!useSchemaPostProcessor) {
            cfg.configs.add("hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false");
        }
        new HoodieDeltaStreamer(cfg, jsc).sync();
        TestHelpers.assertRecordCount(1000L, tableBasePath, sqlContext);
        TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
        cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()), "test-source.properties", false, true, false, null, tableType);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc");
        cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc");
        cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
        if (!useSchemaPostProcessor) {
            cfg.configs.add("hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false");
        }
        new HoodieDeltaStreamer(cfg, jsc).sync();
        TestHelpers.assertRecordCount(1450L, tableBasePath, sqlContext);
        TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
        List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1450L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        sqlContext.read().format("org.apache.hudi").load(tableBasePath).createOrReplaceTempView("tmp_trips");
        long recordCount = sqlContext.sparkSession().sql("select * from tmp_trips where evoluted_optional_union_field is not NULL").count();
        Assertions.assertEquals((long)950L, (long)recordCount);
        if (!useUserProvidedSchema) {
            defaultSchemaProviderClassName = TestFileBasedSchemaProviderNullTargetSchema.class.getName();
        }
        cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TestIdentityTransformer.class.getName()), "test-source.properties", false, true, false, null, tableType);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc");
        if (useUserProvidedSchema) {
            cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc");
        }
        if (!useSchemaPostProcessor) {
            cfg.configs.add("hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false");
        }
        cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
        new HoodieDeltaStreamer(cfg, jsc).sync();
        TestHelpers.assertRecordCount(1900L, tableBasePath, sqlContext);
        TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
        counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1900L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
        Schema tableSchema = tableSchemaResolver.getTableAvroSchemaWithoutMetadataFields();
        Assertions.assertNotNull((Object)tableSchema);
        Schema expectedSchema = !useSchemaPostProcessor ? new Schema.Parser().parse((InputStream)fs.open(new Path(basePath + "/source_evolved.avsc"))) : new Schema.Parser().parse((InputStream)fs.open(new Path(basePath + "/source_evolved_post_processed.avsc")));
        Assertions.assertEquals((Object)expectedSchema, (Object)tableSchema);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(FSUtils.getFs((String)cfg.targetBasePath, (Configuration)jsc.hadoopConfiguration()), basePath + "/" + "test-source.properties");
        TestHoodieDeltaStreamer.writeCommonPropsToFile(fs, basePath);
        defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testUpsertsCOWContinuousMode(HoodieRecord.HoodieRecordType recordType) throws Exception {
        this.testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", recordType);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testUpsertsCOW_ContinuousModeDisabled(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/non_continuous_cow";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        cfg.configs.add(String.format("%s=%s", HoodieMetricsConfig.TURN_METRICS_ON.key(), "true"));
        cfg.configs.add(String.format("%s=%s", HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE"));
        cfg.continuousMode = false;
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        ds.sync();
        TestHelpers.assertRecordCount(1000L, tableBasePath, sqlContext);
        Assertions.assertFalse((boolean)Metrics.isInitialized(), (String)"Metrics should be shutdown");
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testUpsertsCOWContinuousModeShutdownGracefully(HoodieRecord.HoodieRecordType recordType) throws Exception {
        this.testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true, recordType);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testUpsertsMORContinuousMode(HoodieRecord.HoodieRecordType recordType) throws Exception {
        this.testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor", recordType);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testUpsertsMOR_ContinuousModeDisabled(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/non_continuous_mor";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
        cfg.configs.add(String.format("%s=%s", HoodieMetricsConfig.TURN_METRICS_ON.key(), "true"));
        cfg.configs.add(String.format("%s=%s", HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE"));
        cfg.continuousMode = false;
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        ds.sync();
        TestHelpers.assertRecordCount(1000L, tableBasePath, sqlContext);
        Assertions.assertFalse((boolean)Metrics.isInitialized(), (String)"Metrics should be shutdown");
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, HoodieRecord.HoodieRecordType recordType) throws Exception {
        this.testUpsertsContinuousMode(tableType, tempDir, false, recordType);
    }

    private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully, HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/" + tempDir;
        int totalRecords = 3000;
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        cfg.continuousMode = true;
        if (testShutdownGracefully) {
            cfg.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName();
        }
        cfg.tableType = tableType.name();
        cfg.configs.add(String.format("%s=%d", "hoodie.deltastreamer.source.test.max_unique_records", totalRecords));
        cfg.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, cfg, r -> {
            if (tableType.equals((Object)HoodieTableType.MERGE_ON_READ)) {
                TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath, fs);
                TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, fs);
            } else {
                TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, fs);
            }
            TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext);
            TestHelpers.assertDistanceCount(totalRecords, tableBasePath, sqlContext);
            if (testShutdownGracefully) {
                TestDataSource.returnEmptyBatch = true;
            }
            return true;
        });
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition) throws Exception {
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, cfg, condition, "single_ds_job");
    }

    static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition, String jobId) throws Exception {
        Future<?> dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
            try {
                ds.sync();
            }
            catch (Exception ex) {
                LOG.warn((Object)("DS continuous job failed, hence not proceeding with condition check for " + jobId));
                throw new RuntimeException(ex.getMessage(), ex);
            }
        });
        TestHelpers.waitTillCondition(condition, dsFuture, 360L);
        if (cfg != null && !cfg.postWriteTerminationStrategyClass.isEmpty()) {
            TestHoodieDeltaStreamer.awaitDeltaStreamerShutdown(ds);
        } else {
            ds.shutdownGracefully();
            dsFuture.get();
        }
    }

    static void awaitDeltaStreamerShutdown(HoodieDeltaStreamer ds) throws InterruptedException {
        boolean shutDownRequested = false;
        int timeSoFar = 0;
        while (!shutDownRequested) {
            shutDownRequested = ds.getDeltaSyncService().isShutdownRequested();
            Thread.sleep(500L);
            if ((timeSoFar += 500) <= 120000) continue;
            Assertions.fail((String)"Deltastreamer should have shutdown by now");
        }
        boolean shutdownComplete = false;
        while (!shutdownComplete) {
            shutdownComplete = ds.getDeltaSyncService().isShutdown();
            Thread.sleep(500L);
            if ((timeSoFar += 500) <= 120000) continue;
            Assertions.fail((String)"Deltastreamer should have shutdown by now");
        }
    }

    static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, Function<Boolean, Boolean> condition) throws Exception {
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, null, condition);
    }

    @ParameterizedTest
    @CsvSource(value={"true, AVRO", "true, SPARK", "false, AVRO", "false, SPARK"})
    public void testInlineClustering(String preserveCommitMetadata, HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/inlineClustering";
        int totalRecords = 3000;
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        cfg.continuousMode = true;
        cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
        cfg.configs.addAll(this.getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "", preserveCommitMetadata));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, cfg, r -> {
            TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
            TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
            return true;
        });
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @Test
    public void testDeltaSyncWithPendingClustering() throws Exception {
        String tableBasePath = basePath + "/inlineClusteringPending";
        int totalRecords = 2000;
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
        cfg.continuousMode = false;
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        ds.sync();
        TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
        HoodieClusteringJob clusteringJob = this.initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
        clusteringJob.cluster(0);
        HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
        List hoodieClusteringInstants = meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants();
        HoodieInstant clusteringRequest = (HoodieInstant)hoodieClusteringInstants.get(0);
        meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest, Option.empty());
        cfg.configs.addAll(this.getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", ""));
        cfg.retryLastPendingInlineClusteringJob = true;
        HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
        ds2.sync();
        String completeClusteringTimeStamp = ((HoodieInstant)meta.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get()).getTimestamp();
        Assertions.assertEquals((Object)clusteringRequest.getTimestamp(), (Object)completeClusteringTimeStamp);
        TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
        TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
    }

    @ParameterizedTest
    @CsvSource(value={"true, AVRO", "true, SPARK", "false, AVRO", "false, SPARK"})
    public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean, HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/cleanerDeleteReplacedDataWithArchive" + asyncClean;
        int totalRecords = 3000;
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        cfg.continuousMode = true;
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        cfg.configs.addAll(this.getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", ""));
        cfg.configs.add(String.format("%s=%s", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
        cfg.configs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
        cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT"));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, cfg, r -> {
            TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
            return true;
        });
        TestHelpers.assertAtLeastNCommits(6, tableBasePath, fs);
        TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
        HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
        HoodieTimeline replacedTimeline = meta.reloadActiveTimeline().getCompletedReplaceTimeline();
        Option firstReplaceHoodieInstant = replacedTimeline.nthFromLastInstant(1);
        Assertions.assertTrue((boolean)firstReplaceHoodieInstant.isPresent());
        Option firstReplaceHoodieInstantDetails = replacedTimeline.getInstantDetails((HoodieInstant)firstReplaceHoodieInstant.get());
        HoodieReplaceCommitMetadata firstReplaceMetadata = (HoodieReplaceCommitMetadata)HoodieReplaceCommitMetadata.fromBytes((byte[])((byte[])firstReplaceHoodieInstantDetails.get()), HoodieReplaceCommitMetadata.class);
        Map partitionToReplaceFileIds = firstReplaceMetadata.getPartitionToReplaceFileIds();
        String partitionName = null;
        List replacedFileIDs = null;
        for (Map.Entry entry : partitionToReplaceFileIds.entrySet()) {
            partitionName = String.valueOf(entry.getKey());
            replacedFileIDs = (List)entry.getValue();
        }
        Assertions.assertNotNull(partitionName);
        Assertions.assertNotNull(replacedFileIDs);
        ArrayList<String> replacedFilePaths = new ArrayList<String>();
        Path partitionPath = new Path(meta.getBasePath(), partitionName);
        RemoteIterator hoodieFiles = meta.getFs().listFiles(partitionPath, true);
        while (hoodieFiles.hasNext()) {
            LocatedFileStatus f = (LocatedFileStatus)hoodieFiles.next();
            String file = f.getPath().toUri().toString();
            for (Object replacedFileID : replacedFileIDs) {
                if (!file.contains(String.valueOf(replacedFileID))) continue;
                replacedFilePaths.add(file);
            }
        }
        Assertions.assertFalse((boolean)replacedFilePaths.isEmpty());
        List<String> configs = this.getAsyncServicesConfigs(1, "true", "true", "6", "", "");
        configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"));
        configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
        configs.add(String.format("%s=%s", HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "2"));
        configs.add(String.format("%s=%s", HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "3"));
        configs.add(String.format("%s=%s", HoodieCleanConfig.ASYNC_CLEAN.key(), asyncClean));
        configs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
        configs.add(String.format("%s=%s", HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT"));
        if (asyncClean.booleanValue()) {
            configs.add(String.format("%s=%s", HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
            configs.add(String.format("%s=%s", HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name()));
            configs.add(String.format("%s=%s", HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getName()));
        }
        TestHelpers.addRecordMerger(recordType, configs);
        cfg.configs = configs;
        cfg.continuousMode = false;
        ds = new HoodieDeltaStreamer(cfg, jsc);
        ds.sync();
        long count = meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstantsAsStream().filter(instant -> ((HoodieInstant)firstReplaceHoodieInstant.get()).equals(instant)).count();
        Assertions.assertEquals((long)0L, (long)count);
        for (String replacedFilePath : replacedFilePaths) {
            Assertions.assertFalse((boolean)meta.getFs().exists(new Path(replacedFilePath)));
        }
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    private List<String> getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster, String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit, String preserveCommitMetadata) {
        List<String> configs = this.getAsyncServicesConfigs(totalRecords, autoClean, inlineCluster, inlineClusterMaxCommit, asyncCluster, asyncClusterMaxCommit);
        configs.add(String.format("%s=%s", HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key(), preserveCommitMetadata));
        return configs;
    }

    private List<String> getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster, String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) {
        ArrayList<String> configs = new ArrayList<String>();
        configs.add(String.format("%s=%d", "hoodie.deltastreamer.source.test.max_unique_records", totalRecords));
        if (!StringUtils.isNullOrEmpty((String)autoClean)) {
            configs.add(String.format("%s=%s", HoodieCleanConfig.AUTO_CLEAN.key(), autoClean));
        }
        if (!StringUtils.isNullOrEmpty((String)inlineCluster)) {
            configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING.key(), inlineCluster));
        }
        if (!StringUtils.isNullOrEmpty((String)inlineClusterMaxCommit)) {
            configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), inlineClusterMaxCommit));
        }
        if (!StringUtils.isNullOrEmpty((String)asyncCluster)) {
            configs.add(String.format("%s=%s", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), asyncCluster));
        }
        if (!StringUtils.isNullOrEmpty((String)asyncClusterMaxCommit)) {
            configs.add(String.format("%s=%s", HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key(), asyncClusterMaxCommit));
        }
        return configs;
    }

    private List<String> getAllMultiWriterConfigs() {
        ArrayList<String> configs = new ArrayList<String>();
        configs.add(String.format("%s=%s", HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), InProcessLockProvider.class.getCanonicalName()));
        configs.add(String.format("%s=%s", "hoodie.write.lock.wait_time_ms", "3000"));
        configs.add(String.format("%s=%s", HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
        configs.add(String.format("%s=%s", HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name()));
        return configs;
    }

    private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, String clusteringInstantTime, Boolean runSchedule) {
        return this.buildHoodieClusteringUtilConfig(basePath, clusteringInstantTime, runSchedule, null, null);
    }

    private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, String clusteringInstantTime, Boolean runSchedule, String runningMode, Boolean retryLastFailedClusteringJob) {
        HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
        config.basePath = basePath;
        config.clusteringInstantTime = clusteringInstantTime;
        config.runSchedule = runSchedule;
        config.propsFilePath = UtilitiesTestBase.basePath + "/clusteringjob.properties";
        config.runningMode = runningMode;
        if (retryLastFailedClusteringJob != null) {
            config.retryLastFailedClusteringJob = retryLastFailedClusteringJob;
        }
        return config;
    }

    private HoodieIndexer.Config buildIndexerConfig(String basePath, String tableName, String indexInstantTime, String runningMode, String indexTypes) {
        HoodieIndexer.Config config = new HoodieIndexer.Config();
        config.basePath = basePath;
        config.tableName = tableName;
        config.indexInstantTime = indexInstantTime;
        config.propsFilePath = UtilitiesTestBase.basePath + "/indexer.properties";
        config.runningMode = runningMode;
        config.indexTypes = indexTypes;
        return config;
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testHoodieIndexer(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/asyncindexer";
        HoodieDeltaStreamer ds = this.initialHoodieDeltaStreamer(tableBasePath, 1000, "false", recordType);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, r -> {
            TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
            Option scheduleIndexInstantTime = Option.empty();
            try {
                HoodieIndexer scheduleIndexingJob = new HoodieIndexer(jsc, this.buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName, null, "schedule", "COLUMN_STATS"));
                scheduleIndexInstantTime = scheduleIndexingJob.doSchedule();
            }
            catch (Exception e) {
                LOG.info((Object)"Schedule indexing failed", (Throwable)e);
                return false;
            }
            if (scheduleIndexInstantTime.isPresent()) {
                TestHelpers.assertPendingIndexCommit(tableBasePath, fs);
                LOG.info((Object)("Schedule indexing success, now build index with instant time " + (String)scheduleIndexInstantTime.get()));
                HoodieIndexer runIndexingJob = new HoodieIndexer(jsc, this.buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName, (String)scheduleIndexInstantTime.get(), "execute", "COLUMN_STATS"));
                runIndexingJob.start(0);
                LOG.info((Object)"Metadata indexing success");
                TestHelpers.assertCompletedIndexCommit(tableBasePath, fs);
            } else {
                LOG.warn((Object)"Metadata indexing failed");
            }
            return true;
        });
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testHoodieAsyncClusteringJob(boolean shouldPassInClusteringInstantTime) throws Exception {
        String tableBasePath = basePath + "/asyncClusteringJob";
        HoodieDeltaStreamer ds = this.initialHoodieDeltaStreamer(tableBasePath, 3000, "false", HoodieRecord.HoodieRecordType.AVRO);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, r -> {
            TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
            countDownLatch.countDown();
            return true;
        });
        if (countDownLatch.await(2L, TimeUnit.MINUTES)) {
            Option scheduleClusteringInstantTime = Option.empty();
            try {
                HoodieClusteringJob scheduleClusteringJob = this.initialHoodieClusteringJob(tableBasePath, null, true, null);
                scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule();
            }
            catch (Exception e) {
                LOG.warn((Object)"Schedule clustering failed", (Throwable)e);
                Assertions.fail((String)"Schedule clustering failed", (Throwable)e);
            }
            if (scheduleClusteringInstantTime.isPresent()) {
                LOG.info((Object)("Schedule clustering success, now cluster with instant time " + (String)scheduleClusteringInstantTime.get()));
                HoodieClusteringJob.Config clusterClusteringConfig = this.buildHoodieClusteringUtilConfig(tableBasePath, shouldPassInClusteringInstantTime ? (String)scheduleClusteringInstantTime.get() : null, false);
                HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig);
                clusterClusteringJob.cluster(clusterClusteringConfig.retry);
                TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
                LOG.info((Object)"Cluster success");
            } else {
                LOG.warn((Object)"Clustering execution failed");
                Assertions.fail((String)"Clustering execution failed");
            }
        } else {
            Assertions.fail((String)"Deltastreamer should have completed 2 commits.");
        }
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testAsyncClusteringService(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/asyncClustering";
        int totalRecords = 2000;
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        cfg.continuousMode = true;
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        cfg.configs.addAll(this.getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "3"));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, cfg, r -> {
            TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
            return true;
        });
        TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
        TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
        TestHelpers.assertDistinctRecordCount(totalRecords, tableBasePath, sqlContext);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testAsyncClusteringServiceWithConflicts(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/asyncClusteringWithConflicts";
        int totalRecords = 2000;
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        cfg.continuousMode = true;
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        cfg.configs.addAll(this.getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "3"));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, cfg, r -> {
            TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
            return true;
        });
        TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
        TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
        TestHelpers.assertDistinctRecordCount(1900L, tableBasePath, sqlContext);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testAsyncClusteringServiceWithCompaction(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/asyncClusteringCompaction";
        int totalRecords = 2000;
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        cfg.continuousMode = true;
        cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
        cfg.configs.addAll(this.getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "3"));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, cfg, r -> {
            TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, fs);
            TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
            return true;
        });
        TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
        TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
        TestHelpers.assertDistinctRecordCount(totalRecords, tableBasePath, sqlContext);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @ParameterizedTest
    @CsvSource(value={"true, AVRO", "true, SPARK", "false, AVRO", "false, SPARK"})
    public void testAsyncClusteringJobWithRetry(boolean retryLastFailedClusteringJob, HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/asyncClustering3";
        int totalRecords = 3000;
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        cfg.continuousMode = false;
        cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
        cfg.configs.addAll(this.getAsyncServicesConfigs(totalRecords, "false", "false", "0", "false", "0"));
        cfg.configs.addAll(this.getAllMultiWriterConfigs());
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        ds.sync();
        TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
        HoodieClusteringJob schedule = this.initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
        schedule.cluster(0);
        HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
        ds2.sync();
        HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
        List hoodieClusteringInstants = meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants();
        HoodieInstant clusteringRequest = (HoodieInstant)hoodieClusteringInstants.get(0);
        HoodieInstant hoodieInflightInstant = meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest, Option.empty());
        HoodieClusteringJob scheduleAndExecute = this.initialHoodieClusteringJob(tableBasePath, null, false, "scheduleAndExecute", retryLastFailedClusteringJob, recordType);
        scheduleAndExecute.cluster(0);
        String completeClusteringTimeStamp = ((HoodieInstant)meta.getActiveTimeline().reload().getCompletedReplaceTimeline().lastInstant().get()).getTimestamp();
        if (retryLastFailedClusteringJob) {
            Assertions.assertEquals((Object)clusteringRequest.getTimestamp(), (Object)completeClusteringTimeStamp);
        } else {
            Assertions.assertFalse((boolean)clusteringRequest.getTimestamp().equalsIgnoreCase(completeClusteringTimeStamp));
        }
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @ParameterizedTest
    @CsvSource(value={"execute, AVRO", "schedule, AVRO", "scheduleAndExecute, AVRO", "execute, SPARK", "schedule, SPARK", "scheduleAndExecute, SPARK"})
    public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMode, HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/asyncClustering2";
        HoodieDeltaStreamer ds = this.initialHoodieDeltaStreamer(tableBasePath, 3000, "false", recordType, WriteOperationType.BULK_INSERT);
        HoodieClusteringJob scheduleClusteringJob = this.initialHoodieClusteringJob(tableBasePath, null, true, runningMode, recordType);
        TestHoodieDeltaStreamer.deltaStreamerTestRunner(ds, r -> {
            block15: {
                Exception exception = null;
                TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
                try {
                    int result = scheduleClusteringJob.cluster(0);
                    if (result == 0) {
                        LOG.info((Object)"Cluster success");
                    } else {
                        LOG.warn((Object)"Cluster failed");
                        if (!runningMode.toLowerCase().equals("execute")) {
                            return false;
                        }
                    }
                }
                catch (Exception e) {
                    LOG.warn((Object)"ScheduleAndExecute clustering failed", (Throwable)e);
                    exception = e;
                    if (runningMode.equalsIgnoreCase("execute")) break block15;
                    return false;
                }
            }
            switch (runningMode.toLowerCase()) {
                case "scheduleandexecute": {
                    TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
                    return true;
                }
                case "schedule": {
                    TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath, fs);
                    TestHelpers.assertNoReplaceCommits(tableBasePath, fs);
                    return true;
                }
                case "execute": {
                    TestHelpers.assertNoReplaceCommits(tableBasePath, fs);
                    return true;
                }
            }
            throw new IllegalStateException("Unexpected value: " + runningMode);
        });
        if (runningMode.toLowerCase(Locale.ROOT).equals("scheduleandexecute")) {
            UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
        }
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/test_table2";
        String downstreamTableBasePath = basePath + "/test_downstream_table2";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", true);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        cfg.configs.add("hoodie.datasource.hive_sync.partition_fields=year,month,day");
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, tableBasePath, sqlContext);
        TestHelpers.assertDistanceCount(1000L, tableBasePath, sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1000L, tableBasePath, sqlContext);
        String lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
        HoodieDeltaStreamer.Config downstreamCfg = TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.BULK_INSERT, true, null);
        TestHelpers.addRecordMerger(recordType, downstreamCfg.configs);
        new HoodieDeltaStreamer(downstreamCfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, downstreamTableBasePath, sqlContext);
        TestHelpers.assertDistanceCount(1000L, downstreamTableBasePath, sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1000L, downstreamTableBasePath, sqlContext);
        TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, fs, 1);
        cfg.sourceLimit = 0L;
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, tableBasePath, sqlContext);
        TestHelpers.assertDistanceCount(1000L, tableBasePath, sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1000L, tableBasePath, sqlContext);
        TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
        HoodieDeltaStreamer.Config downstreamCfg1 = TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.BULK_INSERT, true, DummySchemaProvider.class.getName());
        new HoodieDeltaStreamer(downstreamCfg1, jsc).sync();
        TestHelpers.assertRecordCount(1000L, downstreamTableBasePath, sqlContext);
        TestHelpers.assertDistanceCount(1000L, downstreamTableBasePath, sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1000L, downstreamTableBasePath, sqlContext);
        TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, fs, 1);
        cfg.sourceLimit = 2000L;
        cfg.operation = WriteOperationType.UPSERT;
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1950L, tableBasePath, sqlContext);
        TestHelpers.assertDistanceCount(1950L, tableBasePath, sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(1950L, tableBasePath, sqlContext);
        lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
        List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1950L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        downstreamCfg = TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.UPSERT, false, null);
        TestHelpers.addRecordMerger(recordType, downstreamCfg.configs);
        downstreamCfg.sourceLimit = 2000L;
        new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
        TestHelpers.assertRecordCount(2000L, downstreamTableBasePath, sqlContext);
        TestHelpers.assertDistanceCount(2000L, downstreamTableBasePath, sqlContext);
        TestHelpers.assertDistanceCountWithExactValue(2000L, downstreamTableBasePath, sqlContext);
        String finalInstant = TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, fs, 2);
        counts = TestHelpers.countsPerCommit(downstreamTableBasePath, sqlContext);
        Assertions.assertEquals((long)2000L, (long)counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
        HiveSyncConfig hiveSyncConfig = TestHoodieDeltaStreamer.getHiveSyncConfig(tableBasePath, "hive_trips");
        hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS, "year,month,day");
        hiveSyncConfig.setHadoopConf((Configuration)hiveTestService.getHiveConf());
        HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig);
        String tableName = hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME);
        Assertions.assertTrue((boolean)hiveClient.tableExists(tableName), (String)("Table " + tableName + " should exist"));
        Assertions.assertEquals((int)3, (int)hiveClient.getAllPartitions(tableName).size(), (String)"Table partitions should match the number of partitions we wrote");
        Assertions.assertEquals((Object)lastInstantForUpstreamTable, (Object)hiveClient.getLastCommitTimeSynced(tableName).get(), (String)"The last commit that was synced should be updated in the TBLPROPERTIES");
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, downstreamTableBasePath);
    }

    @Test
    public void testNullSchemaProvider() throws Exception {
        String tableBasePath = basePath + "/test_table";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", true, false, false, null, null);
        Exception e = (Exception)Assertions.assertThrows(HoodieException.class, () -> new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync(), (String)"Should error out when schema provider is not provided");
        LOG.debug((Object)"Expected error during reading data from source ", (Throwable)e);
        Assertions.assertTrue((boolean)e.getMessage().contains("Please provide a valid schema provider class!"));
    }

    @Test
    public void testPayloadClassUpdate() throws Exception {
        String dataSetBasePath = basePath + "/test_dataset_mor_payload_class_update";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, false, null, "MERGE_ON_READ");
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, dataSetBasePath, sqlContext);
        cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf());
        Properties props = new Properties();
        String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties";
        FileSystem fs = FSUtils.getFs((String)cfg.targetBasePath, (Configuration)jsc.hadoopConfiguration());
        try (FSDataInputStream inputStream = fs.open(new Path(metaPath));){
            props.load((InputStream)inputStream);
        }
        Assertions.assertEquals((Object)new HoodieConfig(props).getString(HoodieTableConfig.PAYLOAD_CLASS_NAME), (Object)DummyAvroPayload.class.getName());
    }

    @Test
    public void testPartialPayloadClass() throws Exception {
        String dataSetBasePath = basePath + "/test_dataset_mor";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, true, PartialUpdateAvroPayload.class.getName(), "MERGE_ON_READ");
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, dataSetBasePath, sqlContext);
        Properties props = new Properties();
        String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties";
        FileSystem fs = FSUtils.getFs((String)cfg.targetBasePath, (Configuration)jsc.hadoopConfiguration());
        try (FSDataInputStream inputStream = fs.open(new Path(metaPath));){
            props.load((InputStream)inputStream);
        }
        Assertions.assertEquals((Object)new HoodieConfig(props).getString(HoodieTableConfig.PAYLOAD_CLASS_NAME), (Object)PartialUpdateAvroPayload.class.getName());
    }

    @Test
    public void testPayloadClassUpdateWithCOWTable() throws Exception {
        String dataSetBasePath = basePath + "/test_dataset_cow";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, false, null, null);
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(1000L, dataSetBasePath, sqlContext);
        cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false, true, true, DummyAvroPayload.class.getName(), null);
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf());
        Properties props = new Properties();
        String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties";
        FileSystem fs = FSUtils.getFs((String)cfg.targetBasePath, (Configuration)jsc.hadoopConfiguration());
        try (FSDataInputStream inputStream = fs.open(new Path(metaPath));){
            props.load((InputStream)inputStream);
        }
        Assertions.assertFalse((boolean)props.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()));
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testFilterDupes(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/test_dupes_table";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        new HoodieDeltaStreamer(cfg, jsc).sync();
        TestHelpers.assertRecordCount(1000L, tableBasePath, sqlContext);
        TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
        cfg.filterDupes = true;
        cfg.sourceLimit = 2000L;
        cfg.operation = WriteOperationType.INSERT;
        new HoodieDeltaStreamer(cfg, jsc).sync();
        TestHelpers.assertRecordCount(2000L, tableBasePath, sqlContext);
        TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
        List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
        Assertions.assertEquals((long)1000L, (long)counts.get(0).getLong(1));
        Assertions.assertEquals((long)1000L, (long)counts.get(1).getLong(1));
        HoodieTableMetaClient mClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
        HoodieInstant lastFinished = (HoodieInstant)mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
        HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
        TestHelpers.addRecordMerger(recordType, cfg2.configs);
        cfg2.filterDupes = false;
        cfg2.sourceLimit = 2000L;
        cfg2.operation = WriteOperationType.UPSERT;
        cfg2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
        HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc);
        ds2.sync();
        mClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
        HoodieInstant newLastFinished = (HoodieInstant)mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
        Assertions.assertTrue((boolean)HoodieTimeline.compareTimestamps((String)newLastFinished.getTimestamp(), (BiPredicate)HoodieTimeline.GREATER_THAN, (String)lastFinished.getTimestamp()));
        HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])((byte[])mClient.getActiveTimeline().getInstantDetails(newLastFinished).get()), HoodieCommitMetadata.class);
        System.out.println("New Commit Metadata=" + commitMetadata);
        Assertions.assertTrue((boolean)commitMetadata.getPartitionToWriteStats().isEmpty());
        cfg2.filterDupes = true;
        cfg2.operation = WriteOperationType.UPSERT;
        try {
            new HoodieDeltaStreamer(cfg2, jsc).sync();
        }
        catch (IllegalArgumentException e) {
            Assertions.assertTrue((boolean)e.getMessage().contains("'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."));
        }
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @Test
    public void testDistributedTestDataSource() {
        TypedProperties props = new TypedProperties();
        props.setProperty("hoodie.deltastreamer.source.test.max_unique_records", "1000");
        props.setProperty("hoodie.deltastreamer.source.test.num_partitions", "1");
        props.setProperty("hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys", "true");
        DistributedTestDataSource distributedTestDataSource = new DistributedTestDataSource(props, jsc, sparkSession, null);
        InputBatch batch = distributedTestDataSource.fetchNext(Option.empty(), 10000000L);
        ((JavaRDD)batch.getBatch().get()).cache();
        long c = ((JavaRDD)batch.getBatch().get()).count();
        Assertions.assertEquals((long)1000L, (long)c);
    }

    private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) {
        if (createTopic) {
            try {
                testUtils.createTopic(topicName, 2);
            }
            catch (TopicExistsException topicExistsException) {
                // empty catch block
            }
        }
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        testUtils.sendMessages(topicName, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", Integer.valueOf(numRecords), "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}")));
    }

    private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String emptyBatchParam) throws IOException {
        this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", emptyBatchParam);
    }

    private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
        this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, "");
    }

    private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, String propsFileName, String parquetSourceRoot, boolean addCommonProps, String partitionPath) throws IOException {
        this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps, partitionPath, "");
    }

    private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, String propsFileName, String parquetSourceRoot, boolean addCommonProps, String partitionPath, String emptyBatchParam) throws IOException {
        TypedProperties parquetProps = new TypedProperties();
        if (addCommonProps) {
            TestHoodieDeltaStreamer.populateCommonProps(parquetProps, basePath);
        }
        parquetProps.setProperty("hoodie.datasource.write.keygenerator.class", TestGenerator.class.getName());
        parquetProps.setProperty("include", "base.properties");
        parquetProps.setProperty("hoodie.embed.timeline.server", "false");
        parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", partitionPath);
        if (useSchemaProvider) {
            parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/" + sourceSchemaFile);
            if (hasTransformer) {
                parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/" + targetSchemaFile);
            }
        }
        parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot);
        if (!StringUtils.isNullOrEmpty((String)emptyBatchParam)) {
            parquetProps.setProperty(TestParquetDFSSourceEmptyBatch.RETURN_EMPTY_BATCH, emptyBatchParam);
        }
        UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, fs, basePath + "/" + propsFileName);
    }

    private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
        this.testParquetDFSSource(useSchemaProvider, transformerClassNames, false);
    }

    private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch) throws Exception {
        HoodieTableMetaClient metaClient;
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
        int parquetRecordsCount = 10;
        boolean hasTransformer = transformerClassNames != null && !transformerClassNames.isEmpty();
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null);
        this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" : "");
        String tableBasePath = basePath + "/test_parquet_table" + testNum;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName() : ParquetDFSSource.class.getName(), transformerClassNames, "test-parquet-dfs-source.properties", false, useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
        deltaStreamer.sync();
        TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
        if (testEmptyBatch) {
            TestHoodieDeltaStreamer.prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
            deltaStreamer.sync();
            TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
            metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
            TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
            Assertions.assertNotEquals((Object)tableSchemaResolver.getTableAvroSchema(), (Object)Schema.create((Schema.Type)Schema.Type.NULL).toString());
        }
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "3.parquet", false, null, null);
        deltaStreamer.sync();
        TestHelpers.assertRecordCount(parquetRecordsCount + 100, tableBasePath, sqlContext);
        metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
        metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry -> this.assertValidSchemaInCommitMetadata((HoodieInstant)entry, metaClient));
        ++testNum;
    }

    private void assertValidSchemaInCommitMetadata(HoodieInstant instant, HoodieTableMetaClient metaClient) {
        try {
            HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])((byte[])metaClient.getActiveTimeline().getInstantDetails(instant).get()), HoodieCommitMetadata.class);
            Assertions.assertFalse((boolean)StringUtils.isNullOrEmpty((String)commitMetadata.getMetadata("schema")));
        }
        catch (IOException ioException) {
            throw new HoodieException("Failed to parse commit metadata for " + instant.toString());
        }
    }

    private void testORCDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
        TypedProperties orcProps = new TypedProperties();
        orcProps.setProperty("include", "base.properties");
        orcProps.setProperty("hoodie.embed.timeline.server", "false");
        orcProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        orcProps.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        if (useSchemaProvider) {
            orcProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source.avsc");
            if (transformerClassNames != null) {
                orcProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/target.avsc");
            }
        }
        orcProps.setProperty("hoodie.deltastreamer.source.dfs.root", ORC_SOURCE_ROOT);
        UtilitiesTestBase.Helpers.savePropsToDFS(orcProps, fs, basePath + "/" + "test-orc-dfs-source.properties");
        String tableBasePath = basePath + "/test_orc_source_table" + testNum;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ORCDFSSource.class.getName(), transformerClassNames, "test-orc-dfs-source.properties", false, useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
        deltaStreamer.sync();
        TestHelpers.assertRecordCount(5L, tableBasePath, sqlContext);
        ++testNum;
    }

    private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException {
        TypedProperties props = new TypedProperties();
        TestHoodieDeltaStreamer.populateAllCommonProps(props, basePath, testUtils.brokerAddress());
        props.setProperty("include", "base.properties");
        props.setProperty("hoodie.embed.timeline.server", "false");
        props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        props.setProperty("hoodie.datasource.write.partitionpath.field", "driver");
        props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT);
        props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName);
        props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type", this.kafkaCheckpointType);
        props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source_uber.avsc");
        props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/target_uber.avsc");
        props.setProperty("auto.offset.reset", autoResetValue);
        UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + "/" + propsFileName);
    }

    private void testDeltaStreamerTransitionFromParquetToKafkaSource(boolean autoResetToLatest) throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfsToKafka" + testNum;
        int parquetRecords = 10;
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, "1.parquet", true, "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}", HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
        this.prepareParquetDFSSource(true, true, "source_uber.avsc", "target_uber.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "driver");
        String tableBasePath = basePath + "/test_dfs_to_kafka" + testNum;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), Collections.emptyList(), "test-parquet-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", null), jsc);
        deltaStreamer.sync();
        TestHelpers.assertRecordCount(parquetRecords, tableBasePath, sqlContext);
        deltaStreamer.shutdownGracefully();
        topicName = "topic" + testNum;
        TestHoodieDeltaStreamer.prepareJsonKafkaDFSFiles(5, true, topicName);
        this.prepareJsonKafkaDFSSource("test-json-kafka-dfs-source.properties", autoResetToLatest ? "latest" : "earliest", topicName);
        deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", null), jsc);
        deltaStreamer.sync();
        int totalExpectedRecords = parquetRecords + (autoResetToLatest ? 0 : 5);
        TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
        TestHoodieDeltaStreamer.prepareJsonKafkaDFSFiles(20, false, topicName);
        deltaStreamer.sync();
        TestHelpers.assertRecordCount(totalExpectedRecords += 20, tableBasePath, sqlContext);
        ++testNum;
    }

    @Test
    public void testJsonKafkaDFSSource() throws Exception {
        topicName = "topic" + testNum;
        TestHoodieDeltaStreamer.prepareJsonKafkaDFSFiles(5, true, topicName);
        this.prepareJsonKafkaDFSSource("test-json-kafka-dfs-source.properties", "earliest", topicName);
        String tableBasePath = basePath + "/test_json_kafka_table" + testNum;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", null), jsc);
        deltaStreamer.sync();
        TestHelpers.assertRecordCount(5L, tableBasePath, sqlContext);
        int totalRecords = 5;
        int records = 10;
        TestHoodieDeltaStreamer.prepareJsonKafkaDFSFiles(records, false, topicName);
        deltaStreamer.sync();
        TestHelpers.assertRecordCount(totalRecords += records, tableBasePath, sqlContext);
    }

    @Test
    public void testKafkaTimestampType() throws Exception {
        topicName = "topic" + testNum;
        this.kafkaCheckpointType = "timestamp";
        TestHoodieDeltaStreamer.prepareJsonKafkaDFSFiles(5, true, topicName);
        this.prepareJsonKafkaDFSSource("test-json-kafka-dfs-source.properties", "earliest", topicName);
        String tableBasePath = basePath + "/test_json_kafka_table" + testNum;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc);
        deltaStreamer.sync();
        TestHelpers.assertRecordCount(5L, tableBasePath, sqlContext);
        TestHoodieDeltaStreamer.prepareJsonKafkaDFSFiles(5, false, topicName);
        deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), Collections.emptyList(), "test-json-kafka-dfs-source.properties", false, true, 100000, false, null, null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc);
        deltaStreamer.sync();
        TestHelpers.assertRecordCount(10L, tableBasePath, sqlContext);
    }

    @Test
    public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws Exception {
        this.testDeltaStreamerTransitionFromParquetToKafkaSource(false);
    }

    @Test
    public void testParquetSourceToKafkaSourceLatestAutoResetValue() throws Exception {
        this.testDeltaStreamerTransitionFromParquetToKafkaSource(true);
    }

    @Test
    public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
        this.testParquetDFSSource(false, null);
    }

    @Test
    public void testParquetDFSSourceForEmptyBatch() throws Exception {
        this.testParquetDFSSource(false, null, true);
    }

    @Test
    public void testDeltaStreamerRestartAfterMissingHoodieProps() throws Exception {
        this.testDeltaStreamerRestartAfterMissingHoodieProps(true);
    }

    @Test
    public void testDeltaStreamerRestartAfterMissingHoodiePropsAfterValidCommit() throws Exception {
        this.testDeltaStreamerRestartAfterMissingHoodieProps(false);
    }

    private void testDeltaStreamerRestartAfterMissingHoodieProps(boolean testInitFailure) throws Exception {
        PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
        int parquetRecordsCount = 10;
        boolean hasTransformer = false;
        boolean useSchemaProvider = false;
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, "1.parquet", false, null, null);
        this.prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path", "0");
        String tableBasePath = basePath + "/test_parquet_table" + testNum;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, testInitFailure ? TestParquetDFSSourceEmptyBatch.class.getName() : ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
        deltaStreamer.sync();
        if (testInitFailure) {
            FileStatus[] fileStatuses = fs.listStatus(new Path(tableBasePath + "/.hoodie/"));
            Arrays.stream(fileStatuses).filter(entry -> entry.getPath().getName().contains("commit") || entry.getPath().getName().contains("inflight")).forEach(entry -> {
                try {
                    fs.delete(entry.getPath());
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }
        fs.delete(new Path(tableBasePath + "/.hoodie/hoodie.properties"));
        if (testInitFailure) {
            deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
            deltaStreamer.sync();
            TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
        } else {
            Assertions.assertThrows(HoodieIOException.class, () -> new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc));
        }
        ++testNum;
    }

    @Test
    public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
        this.testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testParquetDFSSourceWithSourceSchemaFileAndNoTransformer() throws Exception {
        this.testParquetDFSSource(true, null);
    }

    @Test
    public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception {
        this.testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testORCDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
        if (HoodieSparkUtils$.MODULE$.gteqSpark3_0()) {
            this.testORCDFSSource(false, null);
        }
    }

    @Test
    public void testORCDFSSourceWithSchemaProviderAndWithTransformer() throws Exception {
        if (HoodieSparkUtils$.MODULE$.gteqSpark3_0()) {
            this.testORCDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
        }
    }

    private void prepareCsvDFSSource(boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) throws IOException {
        String sourceRoot = basePath + "/csvFiles";
        String recordKeyField = hasHeader || useSchemaProvider ? "_row_key" : "_c1";
        String partitionPath = hasHeader || useSchemaProvider ? "partition_path" : "_c2";
        TypedProperties csvProps = new TypedProperties();
        csvProps.setProperty("include", "base.properties");
        csvProps.setProperty("hoodie.datasource.write.recordkey.field", recordKeyField);
        csvProps.setProperty("hoodie.datasource.write.partitionpath.field", partitionPath);
        if (useSchemaProvider) {
            csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source-flattened.avsc");
            if (hasTransformer) {
                csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/target-flattened.avsc");
            }
        }
        csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot);
        if (sep != ',') {
            if (sep == '\t') {
                csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t");
            } else {
                csvProps.setProperty("hoodie.deltastreamer.csv.sep", Character.toString(sep));
            }
        }
        if (hasHeader) {
            csvProps.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(hasHeader));
        }
        UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, fs, basePath + "/" + "test-csv-dfs-source.properties");
        String path = sourceRoot + "/1.csv";
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        UtilitiesTestBase.Helpers.saveCsvToDFS(hasHeader, sep, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(3), true)), fs, path);
    }

    private void testCsvDFSSource(boolean hasHeader, char sep, boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
        this.prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassNames != null);
        String tableBasePath = basePath + "/test_csv_table" + testNum;
        String sourceOrderingField = hasHeader || useSchemaProvider ? "timestamp" : "_c0";
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, CsvDFSSource.class.getName(), transformerClassNames, "test-csv-dfs-source.properties", false, useSchemaProvider, 1000, false, null, null, sourceOrderingField, null), jsc);
        deltaStreamer.sync();
        TestHelpers.assertRecordCount(3L, tableBasePath, sqlContext);
        ++testNum;
    }

    @Test
    public void testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws Exception {
        this.testCsvDFSSource(true, ',', false, null);
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndNoTransformer() throws Exception {
        this.testCsvDFSSource(true, '\t', false, null);
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndNoTransformer() throws Exception {
        this.testCsvDFSSource(true, '\t', true, null);
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndWithTransformer() throws Exception {
        this.testCsvDFSSource(true, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndTransformer() throws Exception {
        this.testCsvDFSSource(true, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndNoTransformer() throws Exception {
        this.testCsvDFSSource(false, '\t', false, null);
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithSchemaProviderAndNoTransformer() throws Exception {
        this.testCsvDFSSource(false, '\t', true, null);
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndWithTransformer() throws Exception {
        Exception e = (Exception)Assertions.assertThrows(AnalysisException.class, () -> this.testCsvDFSSource(false, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName())), (String)"Should error out when doing the transformation.");
        LOG.debug((Object)"Expected error during transformation", (Throwable)e);
        Assertions.assertTrue((boolean)e.getMessage().contains("cannot resolve '`begin_lat`' given input columns:"));
    }

    @Test
    public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Exception {
        this.testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
    }

    private void prepareSqlSource() throws IOException {
        String sourceRoot = basePath + "sqlSourceFiles";
        TypedProperties sqlSourceProps = new TypedProperties();
        sqlSourceProps.setProperty("include", "base.properties");
        sqlSourceProps.setProperty("hoodie.embed.timeline.server", "false");
        sqlSourceProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
        sqlSourceProps.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
        sqlSourceProps.setProperty("hoodie.deltastreamer.source.sql.sql.query", "select * from test_sql_table");
        UtilitiesTestBase.Helpers.savePropsToDFS(sqlSourceProps, fs, basePath + "/" + "test-sql-source-source.properties");
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        this.generateSqlSourceTestTable(sourceRoot, "1", "1000", 1000, dataGenerator);
    }

    private void generateSqlSourceTestTable(String dfsRoot, String filename, String instantTime, int n, HoodieTestDataGenerator dataGenerator) throws IOException {
        Path path = new Path(dfsRoot, filename);
        UtilitiesTestBase.Helpers.saveParquetToDFS(UtilitiesTestBase.Helpers.toGenericRecords(dataGenerator.generateInserts(instantTime, Integer.valueOf(n), false)), path);
        sparkSession.read().parquet(dfsRoot).createOrReplaceTempView("test_sql_table");
    }

    @Test
    public void testSqlSourceSource() throws Exception {
        this.prepareSqlSource();
        String tableBasePath = basePath + "/test_sql_source_table" + testNum++;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, SqlSource.class.getName(), Collections.emptyList(), "test-sql-source-source.properties", false, false, 1000, false, null, null, "timestamp", null, true), jsc);
        deltaStreamer.sync();
        TestHelpers.assertRecordCount(1000L, tableBasePath, sqlContext);
    }

    @Disabled
    @Test
    public void testJdbcSourceIncrementalFetchInContinuousMode() {
        try (Connection connection = DriverManager.getConnection("jdbc:h2:mem:test_mem", "test", "jdbc");){
            TypedProperties props = new TypedProperties();
            props.setProperty("hoodie.deltastreamer.jdbc.url", "jdbc:h2:mem:test_mem");
            props.setProperty("hoodie.deltastreamer.jdbc.driver.class", "org.h2.Driver");
            props.setProperty("hoodie.deltastreamer.jdbc.user", "test");
            props.setProperty("hoodie.deltastreamer.jdbc.password", "jdbc");
            props.setProperty("hoodie.deltastreamer.jdbc.table.name", "triprec");
            props.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
            props.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name", "id");
            props.setProperty("hoodie.datasource.write.keygenerator.class", SimpleKeyGenerator.class.getName());
            props.setProperty("hoodie.datasource.write.recordkey.field", "ID");
            props.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
            UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + "/test-jdbc-source.properties");
            int numRecords = 1000;
            int sourceLimit = 100;
            String tableBasePath = basePath + "/triprec";
            HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, JdbcSource.class.getName(), null, "test-jdbc-source.properties", false, false, sourceLimit, false, null, null, "timestamp", null);
            cfg.continuousMode = true;
            JdbcTestUtils.clearAndInsert("000", numRecords, connection, new HoodieTestDataGenerator(), props);
            HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
            TestHoodieDeltaStreamer.deltaStreamerTestRunner(deltaStreamer, cfg, r -> {
                TestHelpers.assertAtleastNCompactionCommits(numRecords / sourceLimit + (numRecords % sourceLimit == 0 ? 0 : 1), tableBasePath, fs);
                TestHelpers.assertRecordCount(numRecords, tableBasePath, sqlContext);
                return true;
            });
        }
        catch (Exception e) {
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    public void testHoodieIncrFallback() throws Exception {
        String tableBasePath = basePath + "/incr_test_table";
        String downstreamTableBasePath = basePath + "/incr_test_downstream_table";
        this.insertInTable(tableBasePath, 1, WriteOperationType.BULK_INSERT);
        HoodieDeltaStreamer.Config downstreamCfg = TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.BULK_INSERT, true, null);
        downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=1");
        new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
        this.insertInTable(tableBasePath, 9, WriteOperationType.UPSERT);
        Assertions.assertThrows(AnalysisException.class, () -> new HoodieDeltaStreamer(downstreamCfg, jsc).sync());
        TestHelpers.assertRecordCount(1000L, downstreamTableBasePath, sqlContext);
        if (downstreamCfg.configs == null) {
            downstreamCfg.configs = new ArrayList();
        }
        downstreamCfg.configs.remove(downstreamCfg.configs.size() - 1);
        downstreamCfg.configs.add(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key() + "=true");
        downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=10");
        downstreamCfg.operation = WriteOperationType.UPSERT;
        new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
        new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
        long baseTableRecords = sqlContext.read().format("org.apache.hudi").load(tableBasePath).count();
        long downStreamTableRecords = sqlContext.read().format("org.apache.hudi").load(downstreamTableBasePath).count();
        Assertions.assertEquals((long)baseTableRecords, (long)downStreamTableRecords);
    }

    private void insertInTable(String tableBasePath, int count, WriteOperationType operationType) throws Exception {
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, operationType, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), "test-source.properties", false);
        if (cfg.configs == null) {
            cfg.configs = new ArrayList();
        }
        cfg.configs.add("hoodie.cleaner.commits.retained=3");
        cfg.configs.add("hoodie.keep.min.commits=4");
        cfg.configs.add("hoodie.keep.max.commits=5");
        cfg.configs.add("hoodie.test.source.generate.inserts=true");
        for (int i = 0; i < count; ++i) {
            new HoodieDeltaStreamer(cfg, jsc).sync();
        }
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testInsertOverwrite(HoodieRecord.HoodieRecordType recordType) throws Exception {
        this.testDeltaStreamerWithSpecifiedOperation(basePath + "/insert_overwrite", WriteOperationType.INSERT_OVERWRITE, recordType);
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testInsertOverwriteTable(HoodieRecord.HoodieRecordType recordType) throws Exception {
        this.testDeltaStreamerWithSpecifiedOperation(basePath + "/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE, recordType);
    }

    @Disabled(value="Local run passing; flaky in CI environment.")
    @Test
    public void testDeletePartitions() throws Exception {
        this.prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", "test-parquet-dfs-source.properties", PARQUET_SOURCE_ROOT, false, "partition_path");
        String tableBasePath = basePath + "/test_parquet_table" + testNum;
        HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), null, "test-parquet-dfs-source.properties", false, false, 100000, false, null, null, "timestamp", null), jsc);
        deltaStreamer.sync();
        TestHelpers.assertRecordCount(5L, tableBasePath, sqlContext);
        ++testNum;
        TestHoodieDeltaStreamer.prepareParquetDFSFiles(5, PARQUET_SOURCE_ROOT);
        this.prepareParquetDFSSource(false, false);
        deltaStreamer = new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.DELETE_PARTITION, ParquetDFSSource.class.getName(), Collections.singletonList(TestSpecificPartitionTransformer.class.getName()), "test-parquet-dfs-source.properties", false, false, 100000, false, null, null, "timestamp", null), jsc);
        deltaStreamer.sync();
        TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, "2016/03/15");
    }

    void testDeltaStreamerWithSpecifiedOperation(String tableBasePath, WriteOperationType operationType, HoodieRecord.HoodieRecordType recordType) throws Exception {
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        new HoodieDeltaStreamer(cfg, jsc).sync();
        TestHelpers.assertRecordCount(1000L, tableBasePath, sqlContext);
        TestHelpers.assertDistanceCount(1000L, tableBasePath, sqlContext);
        TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
        cfg.operation = operationType;
        cfg.sourceLimit = 0L;
        new HoodieDeltaStreamer(cfg, jsc).sync();
        TestHelpers.assertRecordCount(1000L, tableBasePath, sqlContext);
        TestHelpers.assertDistanceCount(1000L, tableBasePath, sqlContext);
        TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
        cfg.sourceLimit = 1000L;
        new HoodieDeltaStreamer(cfg, jsc).sync();
        TestHelpers.assertRecordCount(1950L, tableBasePath, sqlContext);
        TestHelpers.assertDistanceCount(1950L, tableBasePath, sqlContext);
        TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @Test
    public void testFetchingCheckpointFromPreviousCommits() throws IOException {
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(basePath + "/testFetchPreviousCheckpoint", WriteOperationType.BULK_INSERT);
        TypedProperties properties = new TypedProperties();
        properties.setProperty("hoodie.datasource.write.recordkey.field", "key");
        properties.setProperty("hoodie.datasource.write.partitionpath.field", "pp");
        TestDeltaSync testDeltaSync = new TestDeltaSync(cfg, sparkSession, null, properties, jsc, fs, jsc.hadoopConfiguration(), null);
        properties.put((Object)HoodieTableConfig.NAME.key(), (Object)"sample_tbl");
        HoodieTableMetaClient metaClient = HoodieTestUtils.init((Configuration)jsc.hadoopConfiguration(), (String)basePath, (HoodieTableType)HoodieTableType.COPY_ON_WRITE, (Properties)properties);
        HashMap<String, String> extraMetadata = new HashMap<String, String>();
        extraMetadata.put("deltastreamer.checkpoint.key", "abc");
        TestHoodieDeltaStreamer.addCommitToTimeline(metaClient, extraMetadata);
        metaClient.reloadActiveTimeline();
        Assertions.assertEquals((Object)((HoodieCommitMetadata)testDeltaSync.getLatestCommitMetadataWithValidCheckpointInfo(metaClient.getActiveTimeline().getCommitsTimeline()).get()).getMetadata("deltastreamer.checkpoint.key"), (Object)"abc");
        extraMetadata.put("deltastreamer.checkpoint.key", "def");
        TestHoodieDeltaStreamer.addCommitToTimeline(metaClient, extraMetadata);
        metaClient.reloadActiveTimeline();
        Assertions.assertEquals((Object)((HoodieCommitMetadata)testDeltaSync.getLatestCommitMetadataWithValidCheckpointInfo(metaClient.getActiveTimeline().getCommitsTimeline()).get()).getMetadata("deltastreamer.checkpoint.key"), (Object)"def");
        TestHoodieDeltaStreamer.addReplaceCommitToTimeline(metaClient, Collections.emptyMap());
        metaClient.reloadActiveTimeline();
        Assertions.assertEquals((Object)((HoodieCommitMetadata)testDeltaSync.getLatestCommitMetadataWithValidCheckpointInfo(metaClient.getActiveTimeline().getCommitsTimeline()).get()).getMetadata("deltastreamer.checkpoint.key"), (Object)"def");
    }

    @ParameterizedTest
    @EnumSource(value=HoodieRecord.HoodieRecordType.class, names={"AVRO", "SPARK"})
    public void testDropPartitionColumns(HoodieRecord.HoodieRecordType recordType) throws Exception {
        String tableBasePath = basePath + "/test_drop_partition_columns" + testNum++;
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
        TestHelpers.addRecordMerger(recordType, cfg.configs);
        cfg.configs.add(String.format("%s=%s", HoodieTableConfig.DROP_PARTITION_COLUMNS.key(), "true"));
        HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
        ds.sync();
        TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
        TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
        Schema tableSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile();
        Assertions.assertNotNull((Object)tableSchema);
        List tableFields = tableSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
        Assertions.assertFalse((boolean)tableFields.contains("partition_path"));
        UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
    }

    @Test
    public void testForceEmptyMetaSync() throws Exception {
        String tableBasePath = basePath + "/test_force_empty_meta_sync";
        HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
        cfg.sourceLimit = 0L;
        cfg.allowCommitOnNoCheckpointChange = true;
        cfg.enableMetaSync = true;
        cfg.forceEmptyMetaSync = true;
        new HoodieDeltaStreamer(cfg, jsc, fs, (Configuration)hiveServer.getHiveConf()).sync();
        TestHelpers.assertRecordCount(0L, tableBasePath, sqlContext);
        HiveSyncConfig hiveSyncConfig = TestHoodieDeltaStreamer.getHiveSyncConfig(tableBasePath, "hive_trips");
        hiveSyncConfig.setHadoopConf((Configuration)hiveServer.getHiveConf());
        HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig);
        String tableName = hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME);
        Assertions.assertTrue((boolean)hiveClient.tableExists(tableName), (String)("Table " + tableName + " should exist"));
    }

    private static Stream<Arguments> testORCDFSSource() {
        return Stream.of(Arguments.arguments((Object[])new Object[]{false, null}), Arguments.arguments((Object[])new Object[]{true, Collections.singletonList(TripsWithDistanceTransformer.class.getName())}));
    }

    public static class TestFileBasedSchemaProviderNullTargetSchema
    extends FilebasedSchemaProvider {
        public TestFileBasedSchemaProviderNullTargetSchema(TypedProperties props, JavaSparkContext jssc) {
            super(props, jssc);
        }

        public Schema getTargetSchema() {
            return null;
        }
    }

    public static class TripsWithEvolvedOptionalFieldTransformer
    implements Transformer {
        public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
            return rowDataset.withColumn("evoluted_optional_union_field", functions.col((String)"rider"));
        }
    }

    public static class TestSpecificPartitionTransformer
    implements Transformer {
        public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
            Dataset toReturn = rowDataset.filter("partition_path == '2016/03/15'");
            return toReturn;
        }
    }

    public static class TestIdentityTransformer
    implements Transformer {
        public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
            return rowDataset;
        }
    }

    public static class DropAllTransformer
    implements Transformer {
        public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
            System.out.println("DropAllTransformer called !!");
            return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema());
        }
    }

    public static class DummyAvroPayload
    extends OverwriteWithLatestAvroPayload {
        public DummyAvroPayload(GenericRecord gr, Comparable orderingVal) {
            super(gr, orderingVal);
        }
    }

    public static class TestTableLevelGenerator
    extends SimpleKeyGenerator {
        public TestTableLevelGenerator(TypedProperties props) {
            super(props);
        }
    }

    public static class TestGenerator
    extends SimpleKeyGenerator {
        public TestGenerator(TypedProperties props) {
            super(props);
        }
    }

    public static class TripsWithDistanceTransformer
    implements Transformer {
        public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
            rowDataset.sqlContext().udf().register("distance_udf", (UDF4)new DistanceUDF(), DataTypes.DoubleType);
            return rowDataset.withColumn("haversine_distance", functions.callUDF((String)"distance_udf", (Column[])new Column[]{functions.col((String)"begin_lat"), functions.col((String)"end_lat"), functions.col((String)"begin_lon"), functions.col((String)"end_lat")}));
        }
    }

    public static class DistanceUDF
    implements UDF4<Double, Double, Double, Double, Double> {
        public Double call(Double lat1, Double lat2, Double lon1, Double lon2) {
            return HoodieDeltaStreamerTestBase.RANDOM.nextDouble();
        }
    }

    class TestDeltaSync
    extends DeltaSync {
        public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf, Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
            super(cfg, sparkSession, schemaProvider, props, jssc, fs, conf, onInitializingHoodieWriteClient);
        }

        protected Option<HoodieCommitMetadata> getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws IOException {
            return super.getLatestCommitMetadataWithValidCheckpointInfo(timeline);
        }
    }

    static class TestHelpers {
        TestHelpers() {
        }

        static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, WriteOperationType op) {
            return TestHelpers.makeConfig(basePath, op, Collections.singletonList(DropAllTransformer.class.getName()));
        }

        static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op) {
            return TestHelpers.makeConfig(basePath, op, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
        }

        static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames) {
            return TestHelpers.makeConfig(basePath, op, transformerClassNames, "test-source.properties", false);
        }

        static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames, String propsFilename, boolean enableHiveSync) {
            return TestHelpers.makeConfig(basePath, op, transformerClassNames, propsFilename, enableHiveSync, true, false, null, null);
        }

        static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass, String payloadClassName, String tableType) {
            return TestHelpers.makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassNames, propsFilename, enableHiveSync, useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp", null);
        }

        static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName, List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField, String checkpoint) {
            return TestHelpers.makeConfig(basePath, op, sourceClassName, transformerClassNames, propsFilename, enableHiveSync, useSchemaProviderClass, sourceLimit, updatePayloadClass, payloadClassName, tableType, sourceOrderingField, checkpoint, false);
        }

        static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName, List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField, String checkpoint, boolean allowCommitOnNoCheckpointChange) {
            HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
            cfg.targetBasePath = basePath;
            cfg.targetTableName = "hoodie_trips";
            cfg.tableType = tableType == null ? "COPY_ON_WRITE" : tableType;
            cfg.sourceClassName = sourceClassName;
            cfg.transformerClassNames = transformerClassNames;
            cfg.operation = op;
            cfg.enableHiveSync = enableHiveSync;
            cfg.sourceOrderingField = sourceOrderingField;
            cfg.propsFilePath = basePath + "/" + propsFilename;
            cfg.sourceLimit = sourceLimit;
            cfg.checkpoint = checkpoint;
            if (updatePayloadClass) {
                cfg.payloadClassName = payloadClassName;
            }
            if (useSchemaProviderClass) {
                cfg.schemaProviderClassName = HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName;
            }
            cfg.allowCommitOnNoCheckpointChange = allowCommitOnNoCheckpointChange;
            return cfg;
        }

        static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, WriteOperationType op, boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) {
            HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
            cfg.targetBasePath = basePath;
            cfg.targetTableName = "hoodie_trips_copy";
            cfg.tableType = "COPY_ON_WRITE";
            cfg.sourceClassName = HoodieIncrSource.class.getName();
            cfg.operation = op;
            cfg.sourceOrderingField = "timestamp";
            cfg.propsFilePath = basePath + "/test-downstream-source.properties";
            cfg.sourceLimit = 1000L;
            if (null != schemaProviderClassName) {
                cfg.schemaProviderClassName = schemaProviderClassName;
            }
            ArrayList<String> cfgs = new ArrayList<String>();
            cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" + addReadLatestOnMissingCkpt);
            cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath);
            cfgs.add("hoodie.deltastreamer.source.hoodieincr.partition.fields=datestr");
            cfg.configs = cfgs;
            return cfg;
        }

        static void addRecordMerger(HoodieRecord.HoodieRecordType type, List<String> hoodieConfig) {
            if (type == HoodieRecord.HoodieRecordType.SPARK) {
                hoodieConfig.add(String.format("%s=%s", HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), HoodieSparkRecordMerger.class.getName()));
                hoodieConfig.add(String.format("%s=%s", HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"));
            }
        }

        static void assertRecordCount(long expected, String tablePath, SQLContext sqlContext) {
            sqlContext.clearCache();
            long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).count();
            Assertions.assertEquals((long)expected, (long)recordCount);
        }

        static Map<String, Long> getPartitionRecordCount(String basePath, SQLContext sqlContext) {
            sqlContext.clearCache();
            List rows = sqlContext.read().format("org.apache.hudi").load(basePath).groupBy(HoodieRecord.PARTITION_PATH_METADATA_FIELD, new String[0]).count().collectAsList();
            HashMap<String, Long> partitionRecordCount = new HashMap<String, Long>();
            rows.stream().forEach(row -> partitionRecordCount.put(row.getString(0), row.getLong(1)));
            return partitionRecordCount;
        }

        static void assertNoPartitionMatch(String basePath, SQLContext sqlContext, String partitionToValidate) {
            sqlContext.clearCache();
            Assertions.assertEquals((long)0L, (long)sqlContext.read().format("org.apache.hudi").load(basePath).filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " = " + partitionToValidate).count());
        }

        static void assertDistinctRecordCount(long expected, String tablePath, SQLContext sqlContext) {
            sqlContext.clearCache();
            long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).select("_hoodie_record_key", new String[0]).distinct().count();
            Assertions.assertEquals((long)expected, (long)recordCount);
        }

        static List<Row> countsPerCommit(String tablePath, SQLContext sqlContext) {
            sqlContext.clearCache();
            List rows = sqlContext.read().format("org.apache.hudi").load(tablePath).groupBy("_hoodie_commit_time", new String[0]).count().sort("_hoodie_commit_time", new String[0]).collectAsList();
            return rows;
        }

        static void assertDistanceCount(long expected, String tablePath, SQLContext sqlContext) {
            sqlContext.clearCache();
            sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
            long recordCount = sqlContext.sql("select * from tmp_trips where haversine_distance is not NULL").count();
            Assertions.assertEquals((long)expected, (long)recordCount);
        }

        static void assertDistanceCountWithExactValue(long expected, String tablePath, SQLContext sqlContext) {
            sqlContext.clearCache();
            sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
            long recordCount = sqlContext.sql("select * from tmp_trips where haversine_distance = 1.0").count();
            Assertions.assertEquals((long)expected, (long)recordCount);
        }

        static void assertAtleastNCompactionCommits(int minExpected, String tablePath, FileSystem fs) {
            HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
            HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
            LOG.info((Object)("Timeline Instants=" + meta.getActiveTimeline().getInstants()));
            int numCompactionCommits = timeline.countInstants();
            Assertions.assertTrue((minExpected <= numCompactionCommits ? 1 : 0) != 0, (String)("Got=" + numCompactionCommits + ", exp >=" + minExpected));
        }

        static void assertAtleastNDeltaCommits(int minExpected, String tablePath, FileSystem fs) {
            HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
            HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
            LOG.info((Object)("Timeline Instants=" + meta.getActiveTimeline().getInstants()));
            int numDeltaCommits = timeline.countInstants();
            Assertions.assertTrue((minExpected <= numDeltaCommits ? 1 : 0) != 0, (String)("Got=" + numDeltaCommits + ", exp >=" + minExpected));
        }

        static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath, FileSystem fs) {
            HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
            HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
            LOG.info((Object)("Timeline Instants=" + meta.getActiveTimeline().getInstants()));
            int numCompactionCommits = timeline.countInstants();
            Assertions.assertTrue((minExpected <= numCompactionCommits ? 1 : 0) != 0, (String)("Got=" + numCompactionCommits + ", exp >=" + minExpected));
        }

        static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath, FileSystem fs) {
            HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
            HoodieTimeline timeline = meta.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
            LOG.info((Object)("Timeline Instants=" + meta.getActiveTimeline().getInstants()));
            int numDeltaCommits = timeline.countInstants();
            Assertions.assertTrue((minExpected <= numDeltaCommits ? 1 : 0) != 0, (String)("Got=" + numDeltaCommits + ", exp >=" + minExpected));
        }

        static String assertCommitMetadata(String expected, String tablePath, FileSystem fs, int totalCommits) throws IOException {
            HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
            HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
            HoodieInstant lastInstant = (HoodieInstant)timeline.lastInstant().get();
            HoodieCommitMetadata commitMetadata = (HoodieCommitMetadata)HoodieCommitMetadata.fromBytes((byte[])((byte[])timeline.getInstantDetails(lastInstant).get()), HoodieCommitMetadata.class);
            Assertions.assertEquals((int)totalCommits, (int)timeline.countInstants());
            Assertions.assertEquals((Object)expected, (Object)commitMetadata.getMetadata("deltastreamer.checkpoint.key"));
            return lastInstant.getTimestamp();
        }

        static void waitTillCondition(Function<Boolean, Boolean> condition, Future dsFuture, long timeoutInSecs) throws Exception {
            Future<Boolean> res = Executors.newSingleThreadExecutor().submit(() -> {
                boolean ret = false;
                while (!ret && !dsFuture.isDone()) {
                    try {
                        Thread.sleep(3000L);
                        ret = (Boolean)condition.apply(true);
                    }
                    catch (Throwable error) {
                        LOG.warn((Object)"Got error :", error);
                        ret = false;
                    }
                }
                return ret;
            });
            res.get(timeoutInSecs, TimeUnit.SECONDS);
        }

        static void assertAtLeastNCommits(int minExpected, String tablePath, FileSystem fs) {
            HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
            HoodieTimeline timeline = meta.getActiveTimeline().filterCompletedInstants();
            LOG.info((Object)("Timeline Instants=" + meta.getActiveTimeline().getInstants()));
            int numDeltaCommits = timeline.countInstants();
            Assertions.assertTrue((minExpected <= numDeltaCommits ? 1 : 0) != 0, (String)("Got=" + numDeltaCommits + ", exp >=" + minExpected));
        }

        static void assertAtLeastNReplaceCommits(int minExpected, String tablePath, FileSystem fs) {
            HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
            HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline();
            LOG.info((Object)("Timeline Instants=" + meta.getActiveTimeline().getInstants()));
            int numDeltaCommits = timeline.countInstants();
            Assertions.assertTrue((minExpected <= numDeltaCommits ? 1 : 0) != 0, (String)("Got=" + numDeltaCommits + ", exp >=" + minExpected));
        }

        static void assertPendingIndexCommit(String tablePath, FileSystem fs) {
            HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
            HoodieTimeline timeline = meta.getActiveTimeline().getAllCommitsTimeline().filterPendingIndexTimeline();
            LOG.info((Object)("Timeline Instants=" + meta.getActiveTimeline().getInstants()));
            int numIndexCommits = timeline.countInstants();
            Assertions.assertEquals((int)1, (int)numIndexCommits, (String)("Got=" + numIndexCommits + ", exp=1"));
        }

        static void assertCompletedIndexCommit(String tablePath, FileSystem fs) {
            HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
            HoodieTimeline timeline = meta.getActiveTimeline().getAllCommitsTimeline().filterCompletedIndexTimeline();
            LOG.info((Object)("Timeline Instants=" + meta.getActiveTimeline().getInstants()));
            int numIndexCommits = timeline.countInstants();
            Assertions.assertEquals((int)1, (int)numIndexCommits, (String)("Got=" + numIndexCommits + ", exp=1"));
        }

        static void assertNoReplaceCommits(String tablePath, FileSystem fs) {
            HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
            HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline();
            LOG.info((Object)("Timeline Instants=" + meta.getActiveTimeline().getInstants()));
            int numDeltaCommits = timeline.countInstants();
            Assertions.assertEquals((int)0, (int)numDeltaCommits, (String)("Got=" + numDeltaCommits + ", exp =" + 0));
        }

        static void assertAtLeastNReplaceRequests(int minExpected, String tablePath, FileSystem fs) {
            HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
            HoodieTimeline timeline = meta.getActiveTimeline().filterPendingReplaceTimeline();
            LOG.info((Object)("Timeline Instants=" + meta.getActiveTimeline().getInstants()));
            int numDeltaCommits = timeline.countInstants();
            Assertions.assertTrue((minExpected <= numDeltaCommits ? 1 : 0) != 0, (String)("Got=" + numDeltaCommits + ", exp >=" + minExpected));
        }
    }
}

