package org.apache.hudi.client.functional;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.util.Time;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.hash.ColumnIndexID;
import org.apache.hudi.common.util.hash.PartitionIndexID;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.storage.HoodieAvroHFileReader;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("functional")
/* loaded from: input_file:org/apache/hudi/client/functional/TestHoodieBackedMetadata.class */
public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
    private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class);

    public static List<Arguments> tableTypeAndEnableOperationArgs() {
        return Arrays.asList(Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, true}), Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, false}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, true}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, false}));
    }

    public static List<Arguments> tableOperationsTestArgs() {
        return Arrays.asList(Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, true, true}), Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, true, false}), Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, false, true}), Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, false, false}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, true, true}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, true, false}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, false, true}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, false, false}));
    }

    @MethodSource({"tableTypeAndEnableOperationArgs"})
    @ParameterizedTest
    public void testMetadataTableBootstrap(HoodieTableType hoodieTableType, boolean z) throws Exception {
        init(hoodieTableType, false);
        doPreBootstrapOperations(testTable);
        this.writeConfig = getWriteConfig(true, true);
        initWriteConfigAndMetatableWriter(this.writeConfig, true);
        syncTableMetadata(this.writeConfig);
        validateMetadata(testTable);
        doWriteInsertAndUpsert(testTable);
        validateMetadata(testTable);
        if (z) {
            doWriteOperationAndValidate(testTable, "0000003");
            doRollbackAndValidate(testTable, "0000003", "0000004");
        }
        doWriteOperation(testTable, "0000005");
        doWriteOperation(testTable, "0000006");
        doWriteOperation(testTable, "0000007");
        doCleanAndValidate(testTable, "0000008", Arrays.asList("0000007"));
        validateMetadata(testTable, true);
    }

    @Test
    public void testTurnOffMetadataIndexAfterEnable() throws Exception {
        initPath();
        HoodieWriteConfig build = getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER).withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
        init(HoodieTableType.COPY_ON_WRITE);
        HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(this.jsc);
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, build);
        Throwable th = null;
        try {
            try {
                List generateInserts = this.dataGen.generateInserts("0000001", 20);
                sparkRDDWriteClient.startCommitWithTime("0000001");
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.insert(this.jsc.parallelize(generateInserts, 1), "0000001").collect());
                sparkRDDWriteClient.startCommitWithTime("0000002");
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(this.jsc.parallelize(this.dataGen.generateUniqueUpdates("0000002", 10), 1), "0000002").collect());
                validateMetadata(sparkRDDWriteClient);
                if (sparkRDDWriteClient != null) {
                    if (0 != 0) {
                        try {
                            sparkRDDWriteClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        sparkRDDWriteClient.close();
                    }
                }
                HoodieTableMetaClient.reload(this.metaClient);
                HoodieTableConfig tableConfig = this.metaClient.getTableConfig();
                org.junit.jupiter.api.Assertions.assertFalse(tableConfig.getMetadataPartitions().isEmpty());
                org.junit.jupiter.api.Assertions.assertTrue(tableConfig.getMetadataPartitions().contains(MetadataPartitionType.FILES.getPartitionPath()));
                org.junit.jupiter.api.Assertions.assertFalse(tableConfig.getMetadataPartitions().contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
                org.junit.jupiter.api.Assertions.assertFalse(tableConfig.getMetadataPartitions().contains(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()));
                HoodieWriteConfig build2 = HoodieWriteConfig.newBuilder().withProperties(build.getProps()).withMetadataConfig(HoodieMetadataConfig.newBuilder().withProperties(build.getMetadataConfig().getProps()).withMetadataIndexColumnStats(true).build()).build();
                sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, build2);
                Throwable th3 = null;
                try {
                    try {
                        sparkRDDWriteClient.startCommitWithTime("0000003");
                        Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(this.jsc.parallelize(this.dataGen.generateUniqueUpdates("0000003", 10), 1), "0000003").collect());
                        validateMetadata(sparkRDDWriteClient);
                        if (sparkRDDWriteClient != null) {
                            if (0 != 0) {
                                try {
                                    sparkRDDWriteClient.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                sparkRDDWriteClient.close();
                            }
                        }
                        HoodieTableMetaClient.reload(this.metaClient);
                        HoodieTableConfig tableConfig2 = this.metaClient.getTableConfig();
                        org.junit.jupiter.api.Assertions.assertFalse(tableConfig2.getMetadataPartitions().isEmpty());
                        org.junit.jupiter.api.Assertions.assertTrue(tableConfig2.getMetadataPartitions().contains(MetadataPartitionType.FILES.getPartitionPath()));
                        org.junit.jupiter.api.Assertions.assertTrue(tableConfig2.getMetadataPartitions().contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
                        org.junit.jupiter.api.Assertions.assertFalse(tableConfig2.getMetadataPartitions().contains(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()));
                        SparkRDDWriteClient sparkRDDWriteClient2 = new SparkRDDWriteClient(hoodieSparkEngineContext, HoodieWriteConfig.newBuilder().withProperties(build.getProps()).withMetadataConfig(HoodieMetadataConfig.newBuilder().withProperties(build.getMetadataConfig().getProps()).withMetadataIndexColumnStats(false).build()).build());
                        Throwable th5 = null;
                        try {
                            try {
                                sparkRDDWriteClient2.startCommitWithTime("0000004");
                                Assertions.assertNoWriteErrors(sparkRDDWriteClient2.upsert(this.jsc.parallelize(this.dataGen.generateUniqueUpdates("0000004", 10), 1), "0000004").collect());
                                validateMetadata(sparkRDDWriteClient2);
                                if (sparkRDDWriteClient2 != null) {
                                    if (0 != 0) {
                                        try {
                                            sparkRDDWriteClient2.close();
                                        } catch (Throwable th6) {
                                            th5.addSuppressed(th6);
                                        }
                                    } else {
                                        sparkRDDWriteClient2.close();
                                    }
                                }
                                HoodieTableMetaClient.reload(this.metaClient);
                                HoodieTableConfig tableConfig3 = this.metaClient.getTableConfig();
                                org.junit.jupiter.api.Assertions.assertFalse(tableConfig3.getMetadataPartitions().isEmpty());
                                org.junit.jupiter.api.Assertions.assertTrue(tableConfig3.getMetadataPartitions().contains(MetadataPartitionType.FILES.getPartitionPath()));
                                org.junit.jupiter.api.Assertions.assertFalse(tableConfig3.getMetadataPartitions().contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
                                org.junit.jupiter.api.Assertions.assertFalse(tableConfig3.getMetadataPartitions().contains(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()));
                                SparkRDDWriteClient sparkRDDWriteClient3 = new SparkRDDWriteClient(hoodieSparkEngineContext, HoodieWriteConfig.newBuilder().withProperties(build2.getProps()).withMetadataConfig(HoodieMetadataConfig.newBuilder().withProperties(build2.getMetadataConfig().getProps()).withMetadataIndexBloomFilter(true).build()).build());
                                Throwable th7 = null;
                                try {
                                    try {
                                        sparkRDDWriteClient3.startCommitWithTime("0000005");
                                        Assertions.assertNoWriteErrors(sparkRDDWriteClient3.upsert(this.jsc.parallelize(this.dataGen.generateUniqueUpdates("0000005", 10), 1), "0000005").collect());
                                        validateMetadata(sparkRDDWriteClient3);
                                        if (sparkRDDWriteClient3 != null) {
                                            if (0 != 0) {
                                                try {
                                                    sparkRDDWriteClient3.close();
                                                } catch (Throwable th8) {
                                                    th7.addSuppressed(th8);
                                                }
                                            } else {
                                                sparkRDDWriteClient3.close();
                                            }
                                        }
                                        HoodieTableMetaClient.reload(this.metaClient);
                                        HoodieTableConfig tableConfig4 = this.metaClient.getTableConfig();
                                        org.junit.jupiter.api.Assertions.assertFalse(tableConfig4.getMetadataPartitions().isEmpty());
                                        org.junit.jupiter.api.Assertions.assertTrue(tableConfig4.getMetadataPartitions().contains(MetadataPartitionType.FILES.getPartitionPath()));
                                        org.junit.jupiter.api.Assertions.assertTrue(tableConfig4.getMetadataPartitions().contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
                                        org.junit.jupiter.api.Assertions.assertTrue(tableConfig4.getMetadataPartitions().contains(MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()));
                                    } finally {
                                    }
                                } finally {
                                    if (sparkRDDWriteClient3 != null) {
                                        if (th7 != null) {
                                            try {
                                                sparkRDDWriteClient3.close();
                                            } catch (Throwable th9) {
                                                th7.addSuppressed(th9);
                                            }
                                        } else {
                                            sparkRDDWriteClient3.close();
                                        }
                                    }
                                }
                            } finally {
                            }
                        } finally {
                            if (sparkRDDWriteClient2 != null) {
                                if (th5 != null) {
                                    try {
                                        sparkRDDWriteClient2.close();
                                    } catch (Throwable th10) {
                                        th5.addSuppressed(th10);
                                    }
                                } else {
                                    sparkRDDWriteClient2.close();
                                }
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testTurnOffMetadataTableAfterEnable() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE, true);
        HoodieCommitMetadata doWriteOperationWithMeta = doWriteOperationWithMeta(testTable, "0000001", WriteOperationType.INSERT);
        File file = new File(this.metaClient.getBasePath() + "/p1", ".hoodie_partition_metadata");
        File file2 = new File(this.metaClient.getBasePath() + "/p2", ".hoodie_partition_metadata");
        file.createNewFile();
        file2.createNewFile();
        this.metaClient.reloadActiveTimeline();
        Option metadataWriter = HoodieSparkTable.create(this.writeConfig, this.context, this.metaClient).getMetadataWriter("0000001", Option.of(doWriteOperationWithMeta));
        validateMetadata(testTable, true);
        org.junit.jupiter.api.Assertions.assertTrue(metadataWriter.isPresent());
        org.junit.jupiter.api.Assertions.assertFalse(new HoodieTableConfig(this.fs, this.metaClient.getMetaPath(), this.writeConfig.getPayloadClass(), this.writeConfig.getStringOrDefault(HoodieWriteConfig.RECORD_MERGER_IMPLS)).getMetadataPartitions().isEmpty());
        HoodieWriteConfig build = HoodieWriteConfig.newBuilder().withProperties(this.writeConfig.getProps()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
        testTable = HoodieTestTable.of(this.metaClient);
        HoodieCommitMetadata doWriteOperationWithMeta2 = doWriteOperationWithMeta(testTable, "0000002", WriteOperationType.INSERT);
        this.metaClient.reloadActiveTimeline();
        org.junit.jupiter.api.Assertions.assertFalse(HoodieSparkTable.create(build, this.context, this.metaClient).getMetadataWriter("0000002", Option.of(doWriteOperationWithMeta2)).isPresent());
        org.junit.jupiter.api.Assertions.assertEquals(Collections.emptySet(), new HoodieTableConfig(this.fs, this.metaClient.getMetaPath(), build.getPayloadClass(), this.writeConfig.getStringOrDefault(HoodieWriteConfig.RECORD_MERGER_IMPLS)).getMetadataPartitions());
        org.junit.jupiter.api.Assertions.assertFalse(this.metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(build.getBasePath()))));
        HoodieWriteConfig build2 = HoodieWriteConfig.newBuilder().withProperties(this.writeConfig.getProps()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
        testTable = HoodieTestTable.of(this.metaClient);
        this.metaClient.reloadActiveTimeline();
        HoodieCommitMetadata doWriteOperationWithMeta3 = doWriteOperationWithMeta(testTable, "0000003", WriteOperationType.INSERT);
        this.metaClient.reloadActiveTimeline();
        Option metadataWriter2 = HoodieSparkTable.create(build2, this.context, this.metaClient).getMetadataWriter("0000003", Option.of(doWriteOperationWithMeta3));
        validateMetadata(testTable, true);
        org.junit.jupiter.api.Assertions.assertTrue(metadataWriter2.isPresent());
        org.junit.jupiter.api.Assertions.assertFalse(new HoodieTableConfig(this.fs, this.metaClient.getMetaPath(), this.writeConfig.getPayloadClass(), this.writeConfig.getStringOrDefault(HoodieWriteConfig.RECORD_MERGER_IMPLS)).getMetadataPartitions().isEmpty());
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testOnlyValidPartitionsAdded(HoodieTableType hoodieTableType) throws Exception {
        init(hoodieTableType, false);
        String str = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-nonpartition";
        Files.createDirectories(Paths.get(this.basePath, str), new FileAttribute[0]);
        String str2 = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-filterDir1";
        String str3 = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-filterDir2";
        testTable.withPartitionMetaFiles(new String[]{"p1", "p2", str2, str3, ".backups"}).addCommit("0000001").withBaseFilesInPartition("p1", new int[]{10}).withBaseFilesInPartition("p2", new int[]{10, 10}).addCommit("0000002").withBaseFilesInPartition("p1", new int[]{10}).withBaseFilesInPartition("p2", new int[]{10, 10, 10});
        this.writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, true, true, false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withDirectoryFilterRegex(".*-filterDir\\d|\\..*").build()).build();
        testTable.doWriteOperation("0000003", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 1, true);
        syncTableMetadata(this.writeConfig);
        List allPartitionPaths = metadataWriter(this.writeConfig).getTableMetadata().getAllPartitionPaths();
        org.junit.jupiter.api.Assertions.assertFalse(allPartitionPaths.contains(str), "Must not contain the non-partition " + str);
        org.junit.jupiter.api.Assertions.assertTrue(allPartitionPaths.contains("p1"), "Must contain partition p1");
        org.junit.jupiter.api.Assertions.assertTrue(allPartitionPaths.contains("p2"), "Must contain partition p2");
        org.junit.jupiter.api.Assertions.assertFalse(allPartitionPaths.contains(str2), "Must not contain the filtered directory " + str2);
        org.junit.jupiter.api.Assertions.assertFalse(allPartitionPaths.contains(str3), "Must not contain the filtered directory " + str3);
        org.junit.jupiter.api.Assertions.assertFalse(allPartitionPaths.contains(".backups"), "Must not contain the filtered directory .backups");
        org.junit.jupiter.api.Assertions.assertEquals(hoodieTableType == HoodieTableType.COPY_ON_WRITE ? 3 : 4, metadata(this.writeConfig, this.context).getAllFilesInPartition(new Path(this.basePath, "p1")).length);
        org.junit.jupiter.api.Assertions.assertEquals(hoodieTableType == HoodieTableType.COPY_ON_WRITE ? 6 : 7, metadata(this.writeConfig, this.context).getAllFilesInPartition(new Path(this.basePath, "p2")).length);
        Map allFilesInPartitions = metadata(this.writeConfig, this.context).getAllFilesInPartitions(Arrays.asList(this.basePath + "/p1", this.basePath + "/p2"));
        org.junit.jupiter.api.Assertions.assertEquals(2, allFilesInPartitions.size());
        org.junit.jupiter.api.Assertions.assertEquals(hoodieTableType == HoodieTableType.COPY_ON_WRITE ? 3 : 4, ((FileStatus[]) allFilesInPartitions.get(this.basePath + "/p1")).length);
        org.junit.jupiter.api.Assertions.assertEquals(hoodieTableType == HoodieTableType.COPY_ON_WRITE ? 6 : 7, ((FileStatus[]) allFilesInPartitions.get(this.basePath + "/p2")).length);
    }

    @MethodSource({"tableOperationsTestArgs"})
    @ParameterizedTest
    public void testTableOperations(HoodieTableType hoodieTableType, boolean z, boolean z2) throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Long.valueOf(Long.parseLong(HoodieActiveTimeline.createNewInstantTime())));
        for (int i = 0; i < 8; i++) {
            arrayList.add(Long.valueOf(getNextCommitTime(((Long) arrayList.get(arrayList.size() - 1)).longValue()).longValue()));
        }
        init(hoodieTableType, true, z, z2, false);
        doWriteInsertAndUpsert(testTable, ((Long) arrayList.get(0)).toString(), ((Long) arrayList.get(1)).toString(), false);
        doWriteOperationAndValidate(testTable, ((Long) arrayList.get(2)).toString());
        if (HoodieTableType.MERGE_ON_READ.equals(hoodieTableType)) {
            doCompactionAndValidate(testTable, ((Long) arrayList.get(3)).toString());
        }
        doWriteOperation(testTable, ((Long) arrayList.get(4)).toString());
        doCleanAndValidate(testTable, ((Long) arrayList.get(5)).toString(), Collections.singletonList(((Long) arrayList.get(0)).toString()));
        doWriteOperation(testTable, ((Long) arrayList.get(6)).toString());
        doWriteOperation(testTable, ((Long) arrayList.get(7)).toString());
        validateMetadata(testTable, Collections.emptyList(), true);
    }

    @Test
    public void testMetadataTableArchival() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE, false);
        this.writeConfig = getWriteConfigBuilder(true, true, false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).enableFullScan(true).enableMetrics(false).withMaxNumDeltaCommitsBeforeCompaction(3).archiveCommitsWith(4, 5).retainCommits(3).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()).build();
        initWriteConfigAndMetatableWriter(this.writeConfig, true);
        AtomicInteger atomicInteger = new AtomicInteger(1);
        for (int i = 1; i <= 2; i++) {
            doWriteOperation(testTable, "000000" + atomicInteger.getAndIncrement(), WriteOperationType.INSERT);
        }
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.metadataTableBasePath).build();
        org.junit.jupiter.api.Assertions.assertEquals(3, build.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants());
        doCluster(testTable, "000000" + atomicInteger.getAndIncrement());
        org.junit.jupiter.api.Assertions.assertEquals(4, build.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants());
        getHoodieWriteClient(this.writeConfig);
        doWriteOperation(testTable, "000000" + atomicInteger.getAndIncrement(), WriteOperationType.INSERT);
        archiveDataTable(this.writeConfig, HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.basePath).build());
        doWriteOperation(testTable, "000000" + atomicInteger.getAndIncrement(), WriteOperationType.INSERT);
        org.junit.jupiter.api.Assertions.assertEquals(4, build.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants());
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testMetadataInsertUpsertClean(HoodieTableType hoodieTableType) throws Exception {
        init(hoodieTableType);
        doWriteOperation(testTable, "0000001", WriteOperationType.INSERT);
        doWriteOperation(testTable, "0000002");
        doCleanAndValidate(testTable, "0000003", Arrays.asList("0000001"));
        if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
            doCompaction(testTable, "0000004");
        }
        doWriteOperation(testTable, "0000005");
        validateMetadata(testTable, Collections.emptyList(), true);
    }

    @Test
    public void testUpdationOfPopulateMetaFieldsForMetadataTable() throws Exception {
        this.tableType = HoodieTableType.COPY_ON_WRITE;
        init(this.tableType, false);
        this.writeConfig = getWriteConfigBuilder(true, true, false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withPopulateMetaFields(true).build()).build();
        initWriteConfigAndMetatableWriter(this.writeConfig, true);
        doWriteOperation(testTable, "0000001", WriteOperationType.INSERT);
        org.junit.jupiter.api.Assertions.assertTrue(HoodieTableMetaClient.builder().setBasePath(this.writeConfig.getBasePath() + "/.hoodie/metadata").setConf(this.hadoopConf).build().getTableConfig().populateMetaFields());
        this.writeConfig = getWriteConfigBuilder(true, true, false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withPopulateMetaFields(false).build()).build();
        initWriteConfigAndMetatableWriter(this.writeConfig, true);
        doWriteOperation(testTable, "0000002", WriteOperationType.INSERT);
        org.junit.jupiter.api.Assertions.assertFalse(HoodieTableMetaClient.builder().setBasePath(this.writeConfig.getBasePath() + "/.hoodie/metadata").setConf(this.hadoopConf).build().getTableConfig().populateMetaFields());
    }

    @Test
    public void testMetadataInsertUpsertCleanNonPartitioned() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE);
        doWriteOperationNonPartitioned(testTable, "0000001", WriteOperationType.INSERT);
        doWriteOperationNonPartitioned(testTable, "0000002", WriteOperationType.UPSERT);
        testTable.doCleanBasedOnCommits("0000003", Arrays.asList("0000001"));
        validateMetadata(testTable, Collections.emptyList(), true);
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testInsertUpsertCluster(HoodieTableType hoodieTableType) throws Exception {
        init(hoodieTableType);
        doWriteOperation(testTable, "0000001", WriteOperationType.INSERT);
        doWriteOperation(testTable, "0000002");
        doClusterAndValidate(testTable, "0000003");
        if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
            doCompaction(testTable, "0000004");
        }
        doCleanAndValidate(testTable, "0000005", Arrays.asList("0000001"));
        validateMetadata(testTable, Collections.emptyList(), true);
    }

    @Test
    public void testMetadataTableServices() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE, false);
        this.writeConfig = getWriteConfigBuilder(true, true, false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).enableFullScan(true).enableMetrics(false).withMaxNumDeltaCommitsBeforeCompaction(3).build()).build();
        initWriteConfigAndMetatableWriter(this.writeConfig, true);
        doWriteOperation(testTable, "0000001", WriteOperationType.INSERT);
        doCleanAndValidate(testTable, "0000003", Arrays.asList("0000001"));
        org.junit.jupiter.api.Assertions.assertFalse(metadata(this.writeConfig, this.context).getLatestCompactionTime().isPresent());
        doWriteOperation(testTable, "0000004", WriteOperationType.UPSERT);
        HoodieTableMetadata metadata = metadata(this.writeConfig, this.context);
        org.junit.jupiter.api.Assertions.assertTrue(metadata.getLatestCompactionTime().isPresent());
        org.junit.jupiter.api.Assertions.assertEquals(metadata.getLatestCompactionTime().get(), "0000003001");
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testTableOperationsWithMetadataIndex(HoodieTableType hoodieTableType) throws Exception {
        initPath();
        HoodieWriteConfig build = getWriteConfigBuilder(true, true, false).withIndexConfig(HoodieIndexConfig.newBuilder().bloomIndexBucketizedChecking(false).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexBloomFilter(true).withMetadataIndexBloomFilterFileGroups(4).withMetadataIndexColumnStats(true).withMetadataIndexBloomFilterFileGroups(2).build()).build();
        init(hoodieTableType, build);
        testTableOperationsForMetaIndexImpl(build);
    }

    private void testTableOperationsForMetaIndexImpl(HoodieWriteConfig hoodieWriteConfig) throws Exception {
        testTableOperationsImpl(new HoodieSparkEngineContext(this.jsc), hoodieWriteConfig);
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testMetadataTableDeletePartition(HoodieTableType hoodieTableType) throws IOException {
        initPath();
        HoodieWriteConfig build = getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER).withCleanConfig(HoodieCleanConfig.newBuilder().withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
        init(hoodieTableType);
        HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(this.jsc);
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, build);
        Throwable th = null;
        try {
            try {
                List generateInserts = this.dataGen.generateInserts("0000001", 20);
                sparkRDDWriteClient.startCommitWithTime("0000001");
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.bulkInsert(this.jsc.parallelize(generateInserts, 1), "0000001").collect());
                validateMetadata(sparkRDDWriteClient);
                sparkRDDWriteClient.startCommitWithTime("0000002");
                validateMetadata(sparkRDDWriteClient);
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(this.jsc.parallelize(this.dataGen.generateInserts("0000002", 10), 1), "0000002").collect());
                HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(sparkRDDWriteClient);
                org.junit.jupiter.api.Assertions.assertNotNull(metadataWriter, "MetadataWriter should have been initialized");
                metadataWriter.deletePartitions("0000003", Arrays.asList(MetadataPartitionType.COLUMN_STATS));
                HoodieTableMetaClient build2 = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.metadataTableBasePath).build();
                List allPartitionPaths = FSUtils.getAllPartitionPaths(hoodieSparkEngineContext, build2.getBasePath(), false, false);
                org.junit.jupiter.api.Assertions.assertEquals(metadataWriter.getEnabledPartitionTypes().size(), allPartitionPaths.size());
                org.junit.jupiter.api.Assertions.assertFalse(allPartitionPaths.contains(MetadataPartitionType.COLUMN_STATS.getPartitionPath()));
                Option lastInstant = build2.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant();
                org.junit.jupiter.api.Assertions.assertTrue(lastInstant.isPresent());
                org.junit.jupiter.api.Assertions.assertEquals("0000003", ((HoodieInstant) lastInstant.get()).getTimestamp());
                HashMap hashMap = new HashMap();
                metadataWriter.getEnabledPartitionTypes().forEach(metadataPartitionType -> {
                });
                HoodieTableFileSystemView hoodieTableFileSystemView = new HoodieTableFileSystemView(build2, build2.getActiveTimeline());
                allPartitionPaths.forEach(str -> {
                    List list = (List) hoodieTableFileSystemView.getLatestFileSlices(str).collect(Collectors.toList());
                    if (MetadataPartitionType.COLUMN_STATS.getPartitionPath().equals(str)) {
                        org.junit.jupiter.api.Assertions.assertTrue(list.isEmpty());
                        return;
                    }
                    org.junit.jupiter.api.Assertions.assertFalse(list.isEmpty());
                    org.junit.jupiter.api.Assertions.assertTrue(list.stream().map((v0) -> {
                        return v0.getBaseFile();
                    }).count() <= ((long) ((MetadataPartitionType) hashMap.get(str)).getFileGroupCount()), "Should have a single latest base file per file group");
                    org.junit.jupiter.api.Assertions.assertTrue(list.size() <= ((MetadataPartitionType) hashMap.get(str)).getFileGroupCount(), "Should have a single latest file slice per file group");
                });
                if (sparkRDDWriteClient != null) {
                    if (0 == 0) {
                        sparkRDDWriteClient.close();
                        return;
                    }
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (sparkRDDWriteClient != null) {
                if (th != null) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testVirtualKeysInBaseFiles() throws Exception {
        boolean z = false;
        init(HoodieTableType.MERGE_ON_READ, false);
        this.writeConfig = getWriteConfigBuilder(true, true, false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).enableFullScan(true).enableMetrics(false).withPopulateMetaFields(false).withMaxNumDeltaCommitsBeforeCompaction(2).build()).build();
        initWriteConfigAndMetatableWriter(this.writeConfig, true);
        doWriteOperation(testTable, "0000001", WriteOperationType.INSERT);
        doClean(testTable, "0000003", Arrays.asList("0000001"));
        doWriteOperation(testTable, "0000004", WriteOperationType.UPSERT);
        HoodieTableMetadata metadata = metadata(this.writeConfig, this.context);
        org.junit.jupiter.api.Assertions.assertTrue(metadata.getLatestCompactionTime().isPresent());
        org.junit.jupiter.api.Assertions.assertEquals(metadata.getLatestCompactionTime().get(), "0000003001");
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.metadataTableBasePath).build();
        HoodieWriteConfig metadataWriteConfig = getMetadataWriteConfig(this.writeConfig);
        build.reloadActiveTimeline();
        HoodieSparkTable create = HoodieSparkTable.create(metadataWriteConfig, this.context, build);
        create.getHoodieView().sync();
        HoodieAvroHFileReader.readAllRecords(new HoodieAvroHFileReader(this.context.getHadoopConf().get(), new Path(((HoodieBaseFile) ((FileSlice) ((List) create.getSliceView().getLatestFileSlices("files").collect(Collectors.toList())).get(0)).getBaseFile().get()).getPath()), new CacheConfig(this.context.getHadoopConf().get()))).forEach(indexedRecord -> {
            if (z) {
                org.junit.jupiter.api.Assertions.assertNotNull(((GenericRecord) indexedRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
            } else {
                org.junit.jupiter.api.Assertions.assertNull(((GenericRecord) indexedRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
            }
        });
    }

    @Test
    public void testMetadataTableCompactionWithPendingInstants() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE, false);
        this.writeConfig = getWriteConfigBuilder(true, true, false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).enableFullScan(true).enableMetrics(false).withMaxNumDeltaCommitsBeforeCompaction(4).build()).build();
        initWriteConfigAndMetatableWriter(this.writeConfig, true);
        doWriteOperation(testTable, "0000001", WriteOperationType.INSERT);
        doWriteOperation(testTable, "0000002", WriteOperationType.INSERT);
        HoodieCommitMetadata doWriteOperation = testTable.doWriteOperation("0000003", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2, false, true);
        doWriteOperation(testTable, "0000004");
        org.junit.jupiter.api.Assertions.assertFalse(metadata(this.writeConfig, this.context).getLatestCompactionTime().isPresent());
        doWriteOperation(testTable, "0000005", WriteOperationType.INSERT);
        doWriteOperation(testTable, "0000006", WriteOperationType.INSERT);
        doWriteOperation(testTable, "0000007", WriteOperationType.INSERT);
        org.junit.jupiter.api.Assertions.assertFalse(metadata(this.writeConfig, this.context).getLatestCompactionTime().isPresent());
        testTable.moveInflightCommitToComplete("0000003", doWriteOperation);
        doWriteOperation(testTable, "0000008", WriteOperationType.INSERT);
        HoodieTableMetadata metadata = metadata(this.writeConfig, this.context);
        org.junit.jupiter.api.Assertions.assertTrue(metadata.getLatestCompactionTime().isPresent());
        org.junit.jupiter.api.Assertions.assertEquals(metadata.getLatestCompactionTime().get(), "0000007001");
        validateMetadata(testTable, true);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testMetadataTableWithPendingCompaction(boolean z) throws Exception {
        init(HoodieTableType.COPY_ON_WRITE, false);
        this.writeConfig = getWriteConfigBuilder(true, true, false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).enableFullScan(true).enableMetrics(false).withMaxNumDeltaCommitsBeforeCompaction(3).build()).build();
        initWriteConfigAndMetatableWriter(this.writeConfig, true);
        doWriteOperation(testTable, "0000001", WriteOperationType.INSERT);
        doWriteOperation(testTable, "0000002", WriteOperationType.INSERT);
        doWriteOperation(testTable, "0000003", WriteOperationType.INSERT);
        HoodieTableMetadata metadata = metadata(this.writeConfig, this.context);
        String str = "0000002001";
        org.junit.jupiter.api.Assertions.assertTrue(metadata.getLatestCompactionTime().isPresent());
        org.junit.jupiter.api.Assertions.assertEquals(metadata.getLatestCompactionTime().get(), str);
        validateMetadata(testTable);
        java.nio.file.Path resolve = Paths.get(this.metadataTableBasePath, ".hoodie").resolve(str + ".commit");
        java.nio.file.Path renameFileToTemp = FileCreateUtils.renameFileToTemp(resolve, str);
        this.metaClient.reloadActiveTimeline();
        testTable = HoodieMetadataTestTable.of(this.metaClient, this.metadataWriter);
        validateMetadata(testTable);
        if (z) {
            doWriteOperation(testTable, "0000004", WriteOperationType.INSERT);
        } else {
            FileCreateUtils.renameTempToMetaFile(renameFileToTemp, resolve);
        }
        validateMetadata(testTable);
        doWriteOperation(testTable, "0000005", WriteOperationType.INSERT);
        doWriteOperation(testTable, "0000006", WriteOperationType.UPSERT);
        validateMetadata(testTable);
        if (z) {
            HoodieTableMetadata metadata2 = metadata(this.writeConfig, this.context);
            org.junit.jupiter.api.Assertions.assertTrue(metadata2.getLatestCompactionTime().isPresent());
            org.junit.jupiter.api.Assertions.assertEquals(metadata2.getLatestCompactionTime().get(), "0000005001");
            FileCreateUtils.renameFileToTemp(Paths.get(this.metadataTableBasePath, ".hoodie").resolve("0000005001.commit"), "0000005001");
            validateMetadata(testTable);
            doWriteOperation(testTable, "0000007", WriteOperationType.INSERT);
            validateMetadata(testTable);
            doWriteOperation(testTable, "0000008", WriteOperationType.INSERT);
            doWriteOperation(testTable, "0000009", WriteOperationType.UPSERT);
            validateMetadata(testTable);
        }
    }

    @Test
    public void testMetadataRollbackWithCompaction() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE, false);
        this.writeConfig = getWriteConfigBuilder(false, true, false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withPopulateMetaFields(true).build()).build();
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(this.jsc), this.writeConfig);
        Throwable th = null;
        try {
            try {
                List generateInserts = this.dataGen.generateInserts("0000001", 100);
                sparkRDDWriteClient.startCommitWithTime("0000001");
                sparkRDDWriteClient.commit("0000001", sparkRDDWriteClient.insert(this.jsc.parallelize(generateInserts, 1), "0000001"));
                List generateUniqueUpdates = this.dataGen.generateUniqueUpdates("0000002", 20);
                sparkRDDWriteClient.startCommitWithTime("0000002");
                sparkRDDWriteClient.commit("0000002", sparkRDDWriteClient.upsert(this.jsc.parallelize(generateUniqueUpdates, 1), "0000002"));
                String str = "0000003";
                List generateUniqueUpdates2 = this.dataGen.generateUniqueUpdates("0000003", 20);
                sparkRDDWriteClient.startCommitWithTime("0000003");
                sparkRDDWriteClient.commit("0000003", sparkRDDWriteClient.upsert(this.jsc.parallelize(generateUniqueUpdates2, 1), "0000003"));
                sparkRDDWriteClient.rollback("0000003");
                this.metaClient.getFs().delete(new Path(this.metaClient.getMetaPath() + "/metadata/.hoodie/0000002.deltacommit"));
                List generateUniqueUpdates3 = this.dataGen.generateUniqueUpdates("0000003", 20);
                sparkRDDWriteClient.startCommitWithTime("0000003");
                sparkRDDWriteClient.commit("0000003", sparkRDDWriteClient.upsert(this.jsc.parallelize(generateUniqueUpdates3, 1), "0000003"));
                FileStatus[] listStatus = this.metaClient.getFs().listStatus(new Path(this.metaClient.getMetaPath() + "/metadata/.hoodie"));
                org.junit.jupiter.api.Assertions.assertTrue(((FileStatus) ((List) Arrays.stream(listStatus).filter(fileStatus -> {
                    return fileStatus.getPath().getName().equals(str + ".deltacommit");
                }).collect(Collectors.toList())).get(0)).getModificationTime() > ((FileStatus) ((List) Arrays.stream(listStatus).filter(fileStatus2 -> {
                    return fileStatus2.getPath().getName().endsWith(".rollback");
                }).collect(Collectors.toList())).get(0)).getModificationTime());
                if (sparkRDDWriteClient != null) {
                    if (0 == 0) {
                        sparkRDDWriteClient.close();
                        return;
                    }
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (sparkRDDWriteClient != null) {
                if (th != null) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th4;
        }
    }

    public static List<Arguments> testMetadataRecordKeyExcludeFromPayloadArgs() {
        return Arrays.asList(Arguments.of(new Object[]{HoodieTableType.COPY_ON_WRITE, false}), Arguments.of(new Object[]{HoodieTableType.MERGE_ON_READ, false}));
    }

    @MethodSource({"testMetadataRecordKeyExcludeFromPayloadArgs"})
    @ParameterizedTest
    public void testMetadataRecordKeyExcludeFromPayload(HoodieTableType hoodieTableType, boolean z) throws Exception {
        initPath();
        this.writeConfig = getWriteConfigBuilder(true, true, false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withPopulateMetaFields(z).withMaxNumDeltaCommitsBeforeCompaction(3).build()).build();
        init(hoodieTableType, this.writeConfig);
        doWriteOperation(testTable, "0000001", WriteOperationType.INSERT);
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.metadataTableBasePath).build();
        HoodieWriteConfig metadataWriteConfig = getMetadataWriteConfig(this.writeConfig);
        build.reloadActiveTimeline();
        HoodieSparkTable create = HoodieSparkTable.create(metadataWriteConfig, this.context, build);
        org.junit.jupiter.api.Assertions.assertDoesNotThrow(() -> {
            verifyMetadataRecordKeyExcludeFromPayloadLogFiles(create, build, "0000001", z);
        }, "Metadata table should have valid log files!");
        org.junit.jupiter.api.Assertions.assertThrows(IllegalStateException.class, () -> {
            verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(create, z);
        }, "Metadata table should not have a base file yet!");
        doWriteOperation(testTable, "0000002", WriteOperationType.UPSERT);
        doWriteOperation(testTable, "0000004", WriteOperationType.UPSERT);
        org.junit.jupiter.api.Assertions.assertDoesNotThrow(() -> {
            verifyMetadataRecordKeyExcludeFromPayloadLogFiles(create, build, "0000002", z);
        }, "Metadata table should have valid log files!");
        org.junit.jupiter.api.Assertions.assertDoesNotThrow(() -> {
            verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(create, z);
        }, "Metadata table should have a valid base file!");
        doWriteOperation(testTable, "0000005", WriteOperationType.UPSERT);
        doClean(testTable, "0000006", Arrays.asList("0000004"));
        doWriteOperation(testTable, "0000007", WriteOperationType.UPSERT);
        org.junit.jupiter.api.Assertions.assertDoesNotThrow(() -> {
            verifyMetadataRecordKeyExcludeFromPayloadLogFiles(create, build, "7", z);
        }, "Metadata table should have valid log files!");
        org.junit.jupiter.api.Assertions.assertDoesNotThrow(() -> {
            verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(create, z);
        }, "Metadata table should have a valid base file!");
        validateMetadata(testTable);
    }

    private void verifyMetadataRecordKeyExcludeFromPayloadLogFiles(HoodieTable hoodieTable, HoodieTableMetaClient hoodieTableMetaClient, String str, boolean z) throws IOException {
        hoodieTable.getHoodieView().sync();
        List list = (List) hoodieTable.getSliceView().getLatestFileSlices(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList());
        if (list.isEmpty()) {
            throw new IllegalStateException("LogFile slices are not available!");
        }
        List<HoodieLogFile> list2 = (List) ((FileSlice) list.get(0)).getLogFiles().map(hoodieLogFile -> {
            return hoodieLogFile;
        }).collect(Collectors.toList());
        List<String> list3 = (List) list2.stream().map(hoodieLogFile2 -> {
            return hoodieLogFile2.getPath().toString();
        }).collect(Collectors.toList());
        verifyMetadataRawRecords(hoodieTable, list2, z);
        verifyMetadataMergedRecords(hoodieTableMetaClient, list3, str, z);
    }

    private void verifyMetadataRawRecords(HoodieTable hoodieTable, List<HoodieLogFile> list, boolean z) throws IOException {
        for (HoodieLogFile hoodieLogFile : list) {
            FileStatus[] listStatus = this.fs.listStatus(hoodieLogFile.getPath());
            MessageType readSchemaFromLogFile = TableSchemaResolver.readSchemaFromLogFile(this.fs, hoodieLogFile.getPath());
            if (readSchemaFromLogFile != null) {
                HoodieLogFormat.Reader newReader = HoodieLogFormat.newReader(this.fs, new HoodieLogFile(listStatus[0].getPath()), new AvroSchemaConverter().convert(readSchemaFromLogFile));
                Throwable th = null;
                while (newReader.hasNext()) {
                    try {
                        HoodieDataBlock hoodieDataBlock = (HoodieLogBlock) newReader.next();
                        if (hoodieDataBlock instanceof HoodieDataBlock) {
                            ClosableIterator recordIterator = hoodieDataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO);
                            Throwable th2 = null;
                            try {
                                try {
                                    recordIterator.forEachRemaining(hoodieRecord -> {
                                        GenericRecord genericRecord = (GenericRecord) hoodieRecord.getData();
                                        if (z) {
                                            org.junit.jupiter.api.Assertions.assertNotNull(genericRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
                                            org.junit.jupiter.api.Assertions.assertNotNull(genericRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
                                        } else {
                                            org.junit.jupiter.api.Assertions.assertNull(genericRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
                                            org.junit.jupiter.api.Assertions.assertNull(genericRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
                                        }
                                        String valueOf = String.valueOf(genericRecord.get("key"));
                                        org.junit.jupiter.api.Assertions.assertFalse(valueOf.isEmpty());
                                        if (z) {
                                            org.junit.jupiter.api.Assertions.assertTrue(valueOf.equals(String.valueOf(genericRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD))));
                                        }
                                    });
                                    if (recordIterator != null) {
                                        if (0 != 0) {
                                            try {
                                                recordIterator.close();
                                            } catch (Throwable th3) {
                                                th2.addSuppressed(th3);
                                            }
                                        } else {
                                            recordIterator.close();
                                        }
                                    }
                                } finally {
                                }
                            } catch (Throwable th4) {
                                if (recordIterator != null) {
                                    if (th2 != null) {
                                        try {
                                            recordIterator.close();
                                        } catch (Throwable th5) {
                                            th2.addSuppressed(th5);
                                        }
                                    } else {
                                        recordIterator.close();
                                    }
                                }
                                throw th4;
                            }
                        }
                    } catch (Throwable th6) {
                        if (newReader != null) {
                            if (0 != 0) {
                                try {
                                    newReader.close();
                                } catch (Throwable th7) {
                                    th.addSuppressed(th7);
                                }
                            } else {
                                newReader.close();
                            }
                        }
                        throw th6;
                    }
                }
                if (newReader != null) {
                    if (0 != 0) {
                        try {
                            newReader.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    } else {
                        newReader.close();
                    }
                }
            }
        }
    }

    private void verifyMetadataMergedRecords(HoodieTableMetaClient hoodieTableMetaClient, List<String> list, String str, boolean z) {
        Schema addMetadataFields = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
        if (z) {
            addMetadataFields = HoodieAvroUtils.addMetadataFields(addMetadataFields);
        }
        for (HoodieRecord hoodieRecord : HoodieMetadataLogRecordReader.newBuilder().withFileSystem(hoodieTableMetaClient.getFs()).withBasePath(hoodieTableMetaClient.getBasePath()).withLogFilePaths(list).withLatestInstantTime(str).withPartition(MetadataPartitionType.FILES.getPartitionPath()).withReaderSchema(addMetadataFields).withMaxMemorySizeInBytes(100000L).withBufferSize(4096).withSpillableMapBasePath(this.tempDir.toString()).withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK).build().getRecords()) {
            org.junit.jupiter.api.Assertions.assertFalse(hoodieRecord.getRecordKey().isEmpty());
            org.junit.jupiter.api.Assertions.assertEquals(hoodieRecord.getKey().getRecordKey(), hoodieRecord.getRecordKey());
        }
    }

    private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable hoodieTable, boolean z) throws IOException {
        hoodieTable.getHoodieView().sync();
        List list = (List) hoodieTable.getSliceView().getLatestFileSlices(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList());
        if (!((FileSlice) list.get(0)).getBaseFile().isPresent()) {
            throw new IllegalStateException("Base file not available!");
        }
        HoodieAvroHFileReader.readAllRecords(new HoodieAvroHFileReader(this.context.getHadoopConf().get(), new Path(((HoodieBaseFile) ((FileSlice) list.get(0)).getBaseFile().get()).getPath()), new CacheConfig(this.context.getHadoopConf().get()))).forEach(indexedRecord -> {
            if (z) {
                org.junit.jupiter.api.Assertions.assertNotNull(((GenericRecord) indexedRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
            } else {
                org.junit.jupiter.api.Assertions.assertNull(((GenericRecord) indexedRecord).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
            }
            org.junit.jupiter.api.Assertions.assertFalse(((String) ((GenericRecord) indexedRecord).get("key")).isEmpty());
        });
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testRollbackOperations(HoodieTableType hoodieTableType) throws Exception {
        init(hoodieTableType);
        doWriteInsertAndUpsert(testTable);
        doWriteOperation(testTable, "0000003", WriteOperationType.UPSERT);
        doWriteOperation(testTable, "0000004");
        doRollbackAndValidate(testTable, "0000004", "0000005");
        for (int i = 6; i < 10; i++) {
            doWriteOperation(testTable, "000000" + i);
        }
        validateMetadata(testTable);
        doWriteOperation(testTable, "0000010");
        doRollbackAndValidate(testTable, "0000010", "0000011");
        if (HoodieTableType.MERGE_ON_READ.equals(hoodieTableType)) {
            doCompactionAndValidate(testTable, "0000012");
            doRollbackAndValidate(testTable, "0000012", "0000013");
        }
        doWriteOperation(testTable, "0000014", WriteOperationType.DELETE);
        doRollbackAndValidate(testTable, "0000014", "0000015");
        this.writeConfig = getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(false).build();
        doWriteOperation(testTable, "0000016");
        testTable.doRollback("0000016", "0000017");
        validateMetadata(testTable);
        this.writeConfig = getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(true).build();
        doWriteOperation(testTable, "0000018");
        testTable.doRollback("0000018", "0000019");
        validateMetadata(testTable, true);
    }

    @Test
    public void testRollbackOperationsNonPartitioned() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE);
        doWriteInsertAndUpsertNonPartitioned(testTable);
        doWriteOperationNonPartitioned(testTable, "0000003", WriteOperationType.UPSERT);
        doWriteOperationNonPartitioned(testTable, "0000004", WriteOperationType.UPSERT);
        doRollback(testTable, "0000004", "0000005");
        validateMetadata(testTable);
        for (int i = 6; i < 10; i++) {
            doWriteOperationNonPartitioned(testTable, "000000" + i, WriteOperationType.UPSERT);
        }
        validateMetadata(testTable);
    }

    @Test
    public void testManualRollbacks() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE, false);
        this.writeConfig = getWriteConfigBuilder(true, true, false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).archiveCommitsWith(2, 3).retainCommits(1).withMaxNumDeltaCommitsBeforeCompaction(4).withPopulateMetaFields(false).build()).withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).retainFileVersions(1).withAutoClean(false).withAsyncClean(true).build()).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(4, 5).build()).build();
        initWriteConfigAndMetatableWriter(this.writeConfig, true);
        doWriteInsertAndUpsert(testTable, "000001", "000002", false);
        for (int i = 3; i < 10; i++) {
            doWriteOperation(testTable, "00000" + i);
            archiveDataTable(this.writeConfig, this.metaClient);
        }
        validateMetadata(testTable);
        int i2 = 0;
        boolean z = false;
        Iterator it = ((List) this.metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants().collect(Collectors.toList())).iterator();
        while (it.hasNext()) {
            try {
                testTable.doRollback(((HoodieInstant) it.next()).getTimestamp(), String.valueOf(Time.now()));
                validateMetadata(testTable);
                i2++;
            } catch (HoodieMetadataException e) {
                z = true;
            }
        }
        org.junit.jupiter.api.Assertions.assertFalse(z, "Metadata table should not archive instants that are in dataset active timeline");
        org.junit.jupiter.api.Assertions.assertTrue(i2 >= Math.max(4, 2) / 2, "Rollbacks of non archived instants should work");
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testSync(HoodieTableType hoodieTableType) throws Exception {
        init(hoodieTableType, false);
        this.writeConfig = getWriteConfigBuilder(true, false, false).build();
        doPreBootstrapOperations(testTable, "00000001", "00000002");
        this.writeConfig = getWriteConfigBuilder(true, true, false).build();
        initWriteConfigAndMetatableWriter(this.writeConfig, true);
        syncTableMetadata(this.writeConfig);
        validateMetadata(testTable);
        doWriteOperation(testTable, "00000003", WriteOperationType.INSERT);
        doWriteOperation(testTable, "00000004", WriteOperationType.UPSERT);
        doWriteOperation(testTable, "00000005", WriteOperationType.UPSERT);
        if (HoodieTableType.MERGE_ON_READ.equals(hoodieTableType)) {
            doCompactionAndValidate(testTable, "00000006");
        }
        doWriteOperation(testTable, "00000008");
        doWriteOperation(testTable, "00000009", WriteOperationType.DELETE);
        doCleanAndValidate(testTable, "00000010", Arrays.asList("00000003", "00000004"));
        doWriteOperation(testTable, "00000011");
        doClusterAndValidate(testTable, "00000012");
        HoodieCommitMetadata doWriteOperation = testTable.doWriteOperation("00000007", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2, false, true);
        doWriteOperation(testTable, "00000013");
        validateMetadata(testTable, Collections.singletonList("00000007"));
        testTable.moveInflightCommitToComplete("00000007", doWriteOperation);
        validateMetadata(testTable);
        doWriteOperation(testTable, "00000014");
        validateMetadata(testTable, Collections.emptyList(), true);
    }

    private Long getNextCommitTime(long j) {
        return (j + 1) % 1000000000000L >= 60 ? Long.valueOf(Long.parseLong(HoodieActiveTimeline.createNewInstantTime())) : Long.valueOf(j + 1);
    }

    @MethodSource({"tableTypeAndEnableOperationArgs"})
    @ParameterizedTest
    public void testMetadataBootstrapLargeCommitList(HoodieTableType hoodieTableType, boolean z) throws Exception {
        init(hoodieTableType, true, true, true, false);
        long parseLong = Long.parseLong(HoodieActiveTimeline.createNewInstantTime());
        for (int i = 1; i < 25; i += 7) {
            long longValue = getNextCommitTime(parseLong).longValue();
            long longValue2 = getNextCommitTime(longValue).longValue();
            long longValue3 = getNextCommitTime(longValue2).longValue();
            long longValue4 = getNextCommitTime(longValue3).longValue();
            long longValue5 = getNextCommitTime(longValue4).longValue();
            long longValue6 = getNextCommitTime(longValue5).longValue();
            long longValue7 = getNextCommitTime(longValue6).longValue();
            parseLong = longValue7;
            doWriteOperation(testTable, Long.toString(longValue), WriteOperationType.INSERT, z);
            doWriteOperation(testTable, Long.toString(longValue2), WriteOperationType.UPSERT, z);
            doClean(testTable, Long.toString(longValue3), Arrays.asList(Long.toString(longValue)));
            doWriteOperation(testTable, Long.toString(longValue4), WriteOperationType.UPSERT, z);
            if (hoodieTableType == HoodieTableType.MERGE_ON_READ) {
                doCompaction(testTable, Long.toString(longValue5), z);
            }
            doWriteOperation(testTable, Long.toString(longValue6), WriteOperationType.UPSERT, z);
            doRollback(testTable, Long.toString(longValue6), Long.toString(longValue7));
        }
        validateMetadata(testTable, Collections.emptyList(), z);
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testFirstCommitRollback(HoodieTableType hoodieTableType) throws Exception {
        init(hoodieTableType);
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(this.jsc), getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(false).build());
        Throwable th = null;
        try {
            List generateInserts = this.dataGen.generateInserts("0000001", 20);
            sparkRDDWriteClient.startCommitWithTime("0000001");
            Assertions.assertNoWriteErrors(sparkRDDWriteClient.insert(this.jsc.parallelize(generateInserts, 1), "0000001").collect());
            validateMetadata(sparkRDDWriteClient);
            sparkRDDWriteClient.rollback("0000001");
            List generateInserts2 = this.dataGen.generateInserts("0000002", 10);
            sparkRDDWriteClient.startCommitWithTime("0000002");
            Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(this.jsc.parallelize(generateInserts2, 1), "0000002").collect());
            validateMetadata(sparkRDDWriteClient);
            if (sparkRDDWriteClient != null) {
                if (0 == 0) {
                    sparkRDDWriteClient.close();
                    return;
                }
                try {
                    sparkRDDWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (sparkRDDWriteClient != null) {
                if (0 != 0) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testMetadataPayloadSpuriousDeletes(boolean z) throws Exception {
        this.tableType = HoodieTableType.COPY_ON_WRITE;
        init(this.tableType, true, true, false, z);
        doWriteInsertAndUpsert(testTable);
        doWriteOperationAndValidate(testTable, "0000003");
        doWriteOperation(testTable, "0000004");
        HashMap hashMap = new HashMap();
        hashMap.put("p1", Collections.singletonList("f10"));
        hashMap.put("p2", Collections.singletonList("f12"));
        testTable.doRollbackWithExtraFiles("0000004", "0000005", hashMap);
        if (z) {
            validateMetadata(testTable);
        } else {
            org.junit.jupiter.api.Assertions.assertThrows(HoodieMetadataException.class, () -> {
                validateMetadata(testTable);
            });
        }
    }

    @EnumSource(HoodieTableType.class)
    @ParameterizedTest
    public void testTableOperationsWithRestore(HoodieTableType hoodieTableType) throws Exception {
        init(hoodieTableType);
        testTableOperationsImpl(new HoodieSparkEngineContext(this.jsc), getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(false).build());
    }

    @Test
    public void testColStatsPrefixLookup() throws IOException {
        this.tableType = HoodieTableType.COPY_ON_WRITE;
        initPath();
        initSparkContexts("TestHoodieMetadata");
        initFileSystem();
        this.fs.mkdirs(new Path(this.basePath));
        initTimelineService();
        initMetaClient(this.tableType);
        initTestDataGenerator();
        this.metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(this.basePath);
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(this.jsc), getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(false).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).withAutoClean(false).retainCommits(1).retainFileVersions(1).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0L).withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).enableFullScan(false).build()).build());
        Throwable th = null;
        try {
            String str = "0000001";
            List generateInserts = this.dataGen.generateInserts("0000001", 20);
            AtomicInteger atomicInteger = new AtomicInteger();
            List list = (List) generateInserts.stream().map(hoodieRecord -> {
                return new HoodieAvroRecord(new HoodieKey("key1_" + atomicInteger.getAndIncrement(), hoodieRecord.getPartitionPath()), (HoodieRecordPayload) hoodieRecord.getData());
            }).collect(Collectors.toList());
            sparkRDDWriteClient.startCommitWithTime("0000001");
            Assertions.assertNoWriteErrors(sparkRDDWriteClient.insert(this.jsc.parallelize(list, 1), "0000001").collect());
            String str2 = "0000002";
            sparkRDDWriteClient.startCommitWithTime("0000002");
            List generateInserts2 = this.dataGen.generateInserts("0000002", 20);
            AtomicInteger atomicInteger2 = new AtomicInteger();
            Assertions.assertNoWriteErrors(sparkRDDWriteClient.insert(this.jsc.parallelize((List) generateInserts2.stream().map(hoodieRecord2 -> {
                return new HoodieAvroRecord(new HoodieKey("key2_" + atomicInteger2.getAndIncrement(), hoodieRecord2.getPartitionPath()), (HoodieRecordPayload) hoodieRecord2.getData());
            }).collect(Collectors.toList()), 1), "0000002").collect());
            HashMap hashMap = new HashMap();
            this.metaClient.getActiveTimeline().getInstants().forEach(hoodieInstant -> {
                try {
                    HoodieCommitMetadata hoodieCommitMetadata = (HoodieCommitMetadata) HoodieCommitMetadata.fromBytes((byte[]) this.metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
                    String timestamp = hoodieInstant.getTimestamp();
                    if (!hashMap.containsKey(timestamp)) {
                        hashMap.put(timestamp, new HashMap());
                    }
                    hoodieCommitMetadata.getPartitionToWriteStats().entrySet().stream().forEach(entry -> {
                        String str3 = (String) entry.getKey();
                        List list2 = (List) entry.getValue();
                        String partitionIdentifier = HoodieTableMetadataUtil.getPartitionIdentifier(str3);
                        if (!((Map) hashMap.get(timestamp)).containsKey(partitionIdentifier)) {
                            ((Map) hashMap.get(timestamp)).put(partitionIdentifier, new ArrayList());
                        }
                        list2.forEach(hoodieWriteStat -> {
                            ((List) ((Map) hashMap.get(timestamp)).get(partitionIdentifier)).add(hoodieWriteStat.getPath());
                        });
                    });
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            HoodieTableMetadata metadata = metadata(sparkRDDWriteClient);
            ColumnIndexID columnIndexID = new ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD);
            List collectAsList = metadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString()), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true).collectAsList();
            org.junit.jupiter.api.Assertions.assertEquals(collectAsList.size(), 6);
            collectAsList.forEach(hoodieRecord3 -> {
            });
            PartitionIndexID partitionIndexID = new PartitionIndexID("2016/03/15");
            List collectAsList2 = metadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true).collectAsList();
            org.junit.jupiter.api.Assertions.assertEquals(collectAsList2.size(), 2);
            collectAsList2.forEach(hoodieRecord4 -> {
                String fileName = ((HoodieMetadataColumnStats) ((HoodieMetadataPayload) hoodieRecord4.getData()).getColumnStatMetadata().get()).getFileName();
                if (fileName.contains(str)) {
                    org.junit.jupiter.api.Assertions.assertTrue(((List) ((Map) hashMap.get(str)).get("2016/03/15")).contains("2016/03/15/" + fileName));
                } else {
                    org.junit.jupiter.api.Assertions.assertTrue(((List) ((Map) hashMap.get(str2)).get("2016/03/15")).contains("2016/03/15/" + fileName));
                }
            });
            List collectAsList3 = metadata.getRecordsByKeyPrefixes(Collections.singletonList(new ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD).asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true).collectAsList();
            org.junit.jupiter.api.Assertions.assertEquals(collectAsList3.size(), 2);
            collectAsList3.forEach(hoodieRecord5 -> {
                HoodieMetadataColumnStats hoodieMetadataColumnStats = (HoodieMetadataColumnStats) ((HoodieMetadataPayload) hoodieRecord5.getData()).getColumnStatMetadata().get();
                org.junit.jupiter.api.Assertions.assertEquals(hoodieMetadataColumnStats.getMinValue(), hoodieMetadataColumnStats.getMaxValue());
                String fileName = hoodieMetadataColumnStats.getFileName();
                if (fileName.contains(str)) {
                    org.junit.jupiter.api.Assertions.assertTrue(((List) ((Map) hashMap.get(str)).get("2016/03/15")).contains("2016/03/15/" + fileName));
                } else {
                    org.junit.jupiter.api.Assertions.assertTrue(((List) ((Map) hashMap.get(str2)).get("2016/03/15")).contains("2016/03/15/" + fileName));
                }
            });
            if (sparkRDDWriteClient != null) {
                if (0 == 0) {
                    sparkRDDWriteClient.close();
                    return;
                }
                try {
                    sparkRDDWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (sparkRDDWriteClient != null) {
                if (0 != 0) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testEagerRollbackinMDT() throws IOException {
        this.tableType = HoodieTableType.MERGE_ON_READ;
        initPath();
        init(this.tableType);
        HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(this.jsc);
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, this.writeConfig);
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        List generateInserts = this.dataGen.generateInserts(createNewInstantTime, 20);
        sparkRDDWriteClient.startCommitWithTime(createNewInstantTime);
        Assertions.assertNoWriteErrors(sparkRDDWriteClient.bulkInsert(this.jsc.parallelize(generateInserts, 1), createNewInstantTime).collect());
        String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
        sparkRDDWriteClient.startCommitWithTime(createNewInstantTime2);
        Assertions.assertNoWriteErrors(sparkRDDWriteClient.insert(this.jsc.parallelize(this.dataGen.generateInserts(createNewInstantTime2, 20), 1), createNewInstantTime2).collect());
        this.metaClient.getFs().delete(new Path(this.metaClient.getMetaPath() + "/metadata/.hoodie/" + createNewInstantTime2 + ".deltacommit"));
        SparkRDDWriteClient sparkRDDWriteClient2 = new SparkRDDWriteClient(hoodieSparkEngineContext, this.writeConfig);
        String createNewInstantTime3 = HoodieActiveTimeline.createNewInstantTime();
        sparkRDDWriteClient2.startCommitWithTime(createNewInstantTime3);
        Assertions.assertNoWriteErrors(sparkRDDWriteClient2.upsert(this.jsc.parallelize(this.dataGen.generateUniqueUpdates(createNewInstantTime3, 10), 1), createNewInstantTime3).collect());
        HoodieInstant hoodieInstant = (HoodieInstant) HoodieTableMetaClient.builder().setBasePath(this.metaClient.getMetaPath() + "/metadata/").setConf(this.metaClient.getHadoopConf()).build().getActiveTimeline().getRollbackTimeline().getInstants().get(0);
        FileStatus[] listStatus = this.metaClient.getFs().listStatus(new Path(this.metaClient.getMetaPath() + "/metadata/.hoodie"));
        org.junit.jupiter.api.Assertions.assertTrue(((FileStatus) ((List) Arrays.stream(listStatus).filter(fileStatus -> {
            return fileStatus.getPath().getName().equals(createNewInstantTime3 + ".deltacommit");
        }).collect(Collectors.toList())).get(0)).getModificationTime() > ((FileStatus) ((List) Arrays.stream(listStatus).filter(fileStatus2 -> {
            return fileStatus2.getPath().getName().equals(hoodieInstant.getTimestamp() + ".rollback");
        }).collect(Collectors.toList())).get(0)).getModificationTime());
    }

    private void testTableOperationsImpl(HoodieSparkEngineContext hoodieSparkEngineContext, HoodieWriteConfig hoodieWriteConfig) throws IOException {
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, hoodieWriteConfig);
        Throwable th = null;
        try {
            try {
                List generateInserts = this.dataGen.generateInserts("0000001", 20);
                sparkRDDWriteClient.startCommitWithTime("0000001");
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.bulkInsert(this.jsc.parallelize(generateInserts, 1), "0000001").collect());
                validateMetadata(sparkRDDWriteClient);
                sparkRDDWriteClient.startCommitWithTime("0000002");
                validateMetadata(sparkRDDWriteClient);
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.insert(this.jsc.parallelize(this.dataGen.generateInserts("0000002", 20), 1), "0000002").collect());
                validateMetadata(sparkRDDWriteClient);
                sparkRDDWriteClient.startCommitWithTime("0000003");
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(this.jsc.parallelize(this.dataGen.generateUniqueUpdates("0000003", 10), 1), "0000003").collect());
                sparkRDDWriteClient.startCommitWithTime("0000004");
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(this.jsc.parallelize(this.dataGen.generateUpdates("0000004", 10), 1), "0000004").collect());
                validateMetadata(sparkRDDWriteClient);
                if (this.metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
                    sparkRDDWriteClient.scheduleCompactionAtInstant("0000005", Option.empty());
                    sparkRDDWriteClient.compact("0000005");
                    validateMetadata(sparkRDDWriteClient);
                }
                sparkRDDWriteClient.startCommitWithTime("0000006");
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(this.jsc.parallelize(this.dataGen.generateUpdates("0000006", 5), 1), "0000006").collect());
                if (this.metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
                    sparkRDDWriteClient.scheduleCompactionAtInstant("0000007", Option.empty());
                    sparkRDDWriteClient.compact("0000007");
                    validateMetadata(sparkRDDWriteClient);
                }
                JavaRDD map = this.jsc.parallelize(this.dataGen.generateDeletes("0000009", 10), 1).map(hoodieRecord -> {
                    return hoodieRecord.getKey();
                });
                sparkRDDWriteClient.startCommitWithTime("0000009");
                sparkRDDWriteClient.delete(map, "0000009");
                sparkRDDWriteClient.clean("0000009");
                validateMetadata(sparkRDDWriteClient);
                sparkRDDWriteClient.restoreToInstant("0000006", hoodieWriteConfig.isMetadataTableEnabled());
                validateMetadata(sparkRDDWriteClient);
                if (sparkRDDWriteClient != null) {
                    if (0 == 0) {
                        sparkRDDWriteClient.close();
                        return;
                    }
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (sparkRDDWriteClient != null) {
                if (th != null) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testMetadataMultiWriter() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE);
        HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(this.jsc);
        Properties properties = new Properties();
        properties.setProperty("hoodie.write.lock.filesystem.path", this.basePath + "/.hoodie/.locks");
        properties.setProperty("hoodie.write.lock.wait_time_ms", "1000");
        properties.setProperty("hoodie.write.lock.client.num_retries", "20");
        HoodieWriteConfig build = getWriteConfigBuilder(true, true, false).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()).withProperties(properties).build();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.dataGen.getPartitionPaths().length);
        SparkRDDWriteClient[] sparkRDDWriteClientArr = new SparkRDDWriteClient[this.dataGen.getPartitionPaths().length];
        for (int i = 0; i < this.dataGen.getPartitionPaths().length; i++) {
            sparkRDDWriteClientArr[i] = new SparkRDDWriteClient(hoodieSparkEngineContext, build);
        }
        LinkedList linkedList = new LinkedList();
        for (int i2 = 0; i2 < this.dataGen.getPartitionPaths().length; i2++) {
            int i3 = i2;
            String str = "000000" + (i3 + 1);
            linkedList.add(newFixedThreadPool.submit(() -> {
                List generateInsertsForPartition = this.dataGen.generateInsertsForPartition(str, 100, this.dataGen.getPartitionPaths()[i3]);
                SparkRDDWriteClient sparkRDDWriteClient = sparkRDDWriteClientArr[i3];
                sparkRDDWriteClient.startCommitWithTime(str);
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.insert(this.jsc.parallelize(generateInsertsForPartition, 1), str).collect());
            }));
        }
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        HoodieTableMetaClient build2 = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.metadataTableBasePath).build();
        org.junit.jupiter.api.Assertions.assertEquals(build2.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 4);
        org.junit.jupiter.api.Assertions.assertTrue(build2.getActiveTimeline().containsInstant(new HoodieInstant(false, "deltacommit", "0000001")));
        org.junit.jupiter.api.Assertions.assertTrue(build2.getActiveTimeline().containsInstant(new HoodieInstant(false, "deltacommit", "0000002")));
        org.junit.jupiter.api.Assertions.assertTrue(build2.getActiveTimeline().containsInstant(new HoodieInstant(false, "deltacommit", "0000003")));
        org.junit.jupiter.api.Assertions.assertTrue(build2.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1);
        validateMetadata(sparkRDDWriteClientArr[0]);
    }

    @Test
    public void testMultiWriterForDoubleLocking() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE);
        HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(this.jsc);
        Properties properties = new Properties();
        properties.setProperty("hoodie.write.lock.filesystem.path", this.basePath + "/.hoodie/.locks");
        properties.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, getWriteConfigBuilder(true, true, false).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(true).retainCommits(4).build()).withAutoCommit(false).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()).withProperties(properties).build());
        String str = this.dataGen.getPartitionPaths()[0];
        for (int i = 0; i < 6; i++) {
            String str2 = "000000" + i;
            List generateInsertsForPartition = this.dataGen.generateInsertsForPartition(str2, 100, str);
            sparkRDDWriteClient.startCommitWithTime(str2);
            sparkRDDWriteClient.commit(str2, sparkRDDWriteClient.insert(this.jsc.parallelize(generateInsertsForPartition, 1), str2));
        }
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.metadataTableBasePath).build();
        LOG.warn("total commits in metadata table " + build.getActiveTimeline().getCommitsTimeline().countInstants());
        org.junit.jupiter.api.Assertions.assertEquals(build.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 8);
        org.junit.jupiter.api.Assertions.assertTrue(build.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1);
        validateMetadata(sparkRDDWriteClient);
    }

    @Test
    public void testReattemptOfFailedClusteringCommit() throws Exception {
        this.tableType = HoodieTableType.COPY_ON_WRITE;
        init(this.tableType);
        this.context = new HoodieSparkEngineContext(this.jsc);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getSmallInsertWriteConfig(2000, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", 10L, false));
        List generateInserts = this.dataGen.generateInserts("0000001", 20);
        hoodieWriteClient.startCommitWithTime("0000001");
        Assertions.assertNoWriteErrors(hoodieWriteClient.insert(this.jsc.parallelize(generateInserts, 1), "0000001").collect());
        validateMetadata(hoodieWriteClient);
        hoodieWriteClient.startCommitWithTime("0000002");
        Assertions.assertNoWriteErrors(hoodieWriteClient.insert(this.jsc.parallelize(this.dataGen.generateInserts("0000002", 20), 1), "0000002").collect());
        validateMetadata(hoodieWriteClient);
        SparkRDDWriteClient hoodieWriteClient2 = getHoodieWriteClient(getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER).withAutoCommit(false).withClusteringConfig(HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10).withClusteringSortColumns("_row_key").withInlineClustering(true).withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build()).build());
        String obj = hoodieWriteClient2.scheduleClustering(Option.empty()).get().toString();
        HoodieWriteMetadata cluster = hoodieWriteClient2.cluster(obj, true);
        HashSet hashSet = new HashSet();
        cluster.getPartitionToReplaceFileIds().entrySet().forEach(entry -> {
            ((List) entry.getValue()).stream().forEach(str -> {
                hashSet.add(new HoodieFileGroupId((String) entry.getKey(), str));
            });
        });
        hoodieWriteClient.startCommitWithTime("0000003");
        Assertions.assertNoWriteErrors(hoodieWriteClient.insert(this.jsc.parallelize(this.dataGen.generateInserts("0000003", 20), 1), "0000003").collect());
        validateMetadata(hoodieWriteClient);
        FileCreateUtils.deleteReplaceCommit(this.basePath, obj);
        HoodieWriteMetadata cluster2 = hoodieWriteClient2.cluster(obj, true);
        this.metaClient.reloadActiveTimeline();
        HashSet hashSet2 = new HashSet();
        cluster2.getPartitionToReplaceFileIds().entrySet().forEach(entry2 -> {
            ((List) entry2.getValue()).stream().forEach(str -> {
                hashSet2.add(new HoodieFileGroupId((String) entry2.getKey(), str));
            });
        });
        org.junit.jupiter.api.Assertions.assertEquals(hashSet, hashSet2);
        validateMetadata(hoodieWriteClient);
    }

    @Test
    public void testMetadataReadWithNoCompletedCommits() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE);
        HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(this.jsc);
        String[] strArr = {HoodieActiveTimeline.createNewInstantTime(), HoodieActiveTimeline.createNewInstantTime()};
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, getWriteConfig(true, true));
        Throwable th = null;
        try {
            try {
                List generateInserts = this.dataGen.generateInserts(strArr[0], 5);
                sparkRDDWriteClient.startCommitWithTime(strArr[0]);
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.bulkInsert(this.jsc.parallelize(generateInserts, 1), strArr[0]).collect());
                FileCreateUtils.deleteDeltaCommit(this.basePath + "/.hoodie/metadata/", strArr[0]);
                FileCreateUtils.deleteDeltaCommit(this.basePath + " /.hoodie/metadata/", "00000000000000");
                org.junit.jupiter.api.Assertions.assertEquals(((Set) getAllFiles(metadata(sparkRDDWriteClient)).stream().map(path -> {
                    return path.getName();
                }).map(str -> {
                    return FSUtils.getCommitTime(str);
                }).collect(Collectors.toSet())).size(), 0);
                if (sparkRDDWriteClient != null) {
                    if (0 == 0) {
                        sparkRDDWriteClient.close();
                        return;
                    }
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (sparkRDDWriteClient != null) {
                if (th != null) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testReader() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE);
        HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(this.jsc);
        String[] strArr = {HoodieActiveTimeline.createNewInstantTime(), HoodieActiveTimeline.createNewInstantTime(), HoodieActiveTimeline.createNewInstantTime(), HoodieActiveTimeline.createNewInstantTime()};
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, getWriteConfig(true, true));
        Throwable th = null;
        for (int i = 0; i < strArr.length; i++) {
            try {
                try {
                    List generateInserts = this.dataGen.generateInserts(strArr[i], 5);
                    sparkRDDWriteClient.startCommitWithTime(strArr[i]);
                    Assertions.assertNoWriteErrors(sparkRDDWriteClient.bulkInsert(this.jsc.parallelize(generateInserts, 1), strArr[i]).collect());
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (sparkRDDWriteClient != null) {
                    if (th != null) {
                        try {
                            sparkRDDWriteClient.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        sparkRDDWriteClient.close();
                    }
                }
                throw th3;
            }
        }
        Set set = (Set) getAllFiles(metadata(sparkRDDWriteClient)).stream().map(path -> {
            return path.getName();
        }).map(str -> {
            return FSUtils.getCommitTime(str);
        }).collect(Collectors.toSet());
        org.junit.jupiter.api.Assertions.assertEquals(set.size(), strArr.length);
        for (String str2 : strArr) {
            org.junit.jupiter.api.Assertions.assertTrue(set.contains(str2));
        }
        int i2 = 0;
        while (i2 < strArr.length) {
            FileCreateUtils.deleteCommit(this.basePath, strArr[i2]);
            Set set2 = (Set) getAllFiles(metadata(sparkRDDWriteClient)).stream().map(path2 -> {
                return path2.getName();
            }).map(str3 -> {
                return FSUtils.getCommitTime(str3);
            }).collect(Collectors.toSet());
            org.junit.jupiter.api.Assertions.assertEquals(set2.size(), strArr.length - 1);
            int i3 = 0;
            while (i3 < strArr.length) {
                org.junit.jupiter.api.Assertions.assertTrue(i3 == i2 || set2.contains(strArr[i3]));
                i3++;
            }
            FileCreateUtils.createCommit(this.basePath, strArr[i2]);
            i2++;
        }
        FileCreateUtils.deleteCommit(this.basePath, strArr[0]);
        FileCreateUtils.deleteCommit(this.basePath, strArr[2]);
        Set set3 = (Set) getAllFiles(metadata(sparkRDDWriteClient)).stream().map(path3 -> {
            return path3.getName();
        }).map(str4 -> {
            return FSUtils.getCommitTime(str4);
        }).collect(Collectors.toSet());
        org.junit.jupiter.api.Assertions.assertEquals(set3.size(), strArr.length - 2);
        int i4 = 0;
        while (i4 < strArr.length) {
            org.junit.jupiter.api.Assertions.assertTrue(i4 == 0 || i4 == 2 || set3.contains(strArr[i4]));
            i4++;
        }
        for (String str5 : strArr) {
            FileCreateUtils.deleteCommit(this.basePath, str5);
        }
        org.junit.jupiter.api.Assertions.assertEquals(((Set) getAllFiles(metadata(sparkRDDWriteClient)).stream().map(path4 -> {
            return path4.getName();
        }).map(str6 -> {
            return FSUtils.getCommitTime(str6);
        }).collect(Collectors.toSet())).size(), 0);
        if (sparkRDDWriteClient != null) {
            if (0 == 0) {
                sparkRDDWriteClient.close();
                return;
            }
            try {
                sparkRDDWriteClient.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    @Disabled
    public void testCleaningArchivingAndCompaction() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE, false);
        HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(this.jsc);
        HoodieWriteConfig build = getWriteConfigBuilder(true, true, false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).archiveCommitsWith(40, 60).retainCommits(1).withMaxNumDeltaCommitsBeforeCompaction(3).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(false).build()).withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 4).build()).build();
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, build);
        Throwable th = null;
        for (int i = 0; i < 1; i++) {
            try {
                try {
                    String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
                    List generateInserts = this.dataGen.generateInserts(createNewInstantTime, 5);
                    sparkRDDWriteClient.startCommitWithTime(createNewInstantTime);
                    sparkRDDWriteClient.insert(this.jsc.parallelize(generateInserts, 1), createNewInstantTime).collect();
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (sparkRDDWriteClient != null) {
                    if (th != null) {
                        try {
                            sparkRDDWriteClient.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        sparkRDDWriteClient.close();
                    }
                }
                throw th3;
            }
        }
        HoodieTableMetaClient build2 = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.metadataTableBasePath).build();
        HoodieTableMetaClient build3 = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(build.getBasePath()).build();
        HoodieActiveTimeline reloadActiveTimeline = build2.reloadActiveTimeline();
        org.junit.jupiter.api.Assertions.assertEquals(reloadActiveTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 0);
        org.junit.jupiter.api.Assertions.assertEquals(reloadActiveTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 2);
        org.junit.jupiter.api.Assertions.assertEquals(build3.getArchivedTimeline().reload().countInstants(), 0);
        String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
        List generateInserts2 = this.dataGen.generateInserts(createNewInstantTime2, 5);
        sparkRDDWriteClient.startCommitWithTime(createNewInstantTime2);
        sparkRDDWriteClient.insert(this.jsc.parallelize(generateInserts2, 1), createNewInstantTime2).collect();
        HoodieActiveTimeline reloadActiveTimeline2 = build2.reloadActiveTimeline();
        org.junit.jupiter.api.Assertions.assertEquals(reloadActiveTimeline2.getCommitTimeline().filterCompletedInstants().countInstants(), 1);
        org.junit.jupiter.api.Assertions.assertEquals(reloadActiveTimeline2.getCommitsTimeline().filterCompletedInstants().countInstants(), 4);
        org.junit.jupiter.api.Assertions.assertEquals(build3.getArchivedTimeline().reload().countInstants(), 0);
        String str = createNewInstantTime2;
        for (int i2 = 0; i2 < 4; i2++) {
            String createNewInstantTime3 = HoodieActiveTimeline.createNewInstantTime();
            List generateInserts3 = this.dataGen.generateInserts(createNewInstantTime3, 5);
            sparkRDDWriteClient.startCommitWithTime(createNewInstantTime3);
            sparkRDDWriteClient.insert(this.jsc.parallelize(generateInserts3, 1), createNewInstantTime3).collect();
            if (i2 == 0) {
                FileCreateUtils.deleteCommit(this.basePath, createNewInstantTime3);
                FileCreateUtils.createInflightCommit(this.basePath, createNewInstantTime3);
                str = createNewInstantTime3;
            }
        }
        HoodieActiveTimeline reloadActiveTimeline3 = build2.reloadActiveTimeline();
        org.junit.jupiter.api.Assertions.assertEquals(reloadActiveTimeline3.getCommitTimeline().filterCompletedInstants().countInstants(), 1);
        org.junit.jupiter.api.Assertions.assertEquals(reloadActiveTimeline3.getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 10);
        FileCreateUtils.createCommit(this.basePath, str);
        String createNewInstantTime4 = HoodieActiveTimeline.createNewInstantTime();
        List generateInserts4 = this.dataGen.generateInserts(createNewInstantTime4, 5);
        sparkRDDWriteClient.startCommitWithTime(createNewInstantTime4);
        sparkRDDWriteClient.insert(this.jsc.parallelize(generateInserts4, 1), createNewInstantTime4).collect();
        HoodieActiveTimeline reloadActiveTimeline4 = build2.reloadActiveTimeline();
        org.junit.jupiter.api.Assertions.assertEquals(reloadActiveTimeline4.getCommitTimeline().filterCompletedInstants().countInstants(), 2);
        org.junit.jupiter.api.Assertions.assertEquals(reloadActiveTimeline4.getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 12);
        org.junit.jupiter.api.Assertions.assertTrue(build3.getArchivedTimeline().reload().countInstants() > 0);
        validateMetadata(sparkRDDWriteClient);
        if (sparkRDDWriteClient != null) {
            if (0 == 0) {
                sparkRDDWriteClient.close();
                return;
            }
            try {
                sparkRDDWriteClient.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    @Test
    public void testUpgradeDowngrade() throws IOException {
        init(HoodieTableType.COPY_ON_WRITE, false);
        HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(this.jsc);
        String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
        HoodieWriteConfig writeConfig = getWriteConfig(true, true);
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, writeConfig);
        Throwable th = null;
        try {
            try {
                List generateInserts = this.dataGen.generateInserts(createNewInstantTime, 5);
                sparkRDDWriteClient.startCommitWithTime(createNewInstantTime);
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.bulkInsert(this.jsc.parallelize(generateInserts, 1), createNewInstantTime).collect());
                if (sparkRDDWriteClient != null) {
                    if (0 != 0) {
                        try {
                            sparkRDDWriteClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        sparkRDDWriteClient.close();
                    }
                }
                org.junit.jupiter.api.Assertions.assertTrue(this.fs.exists(new Path(this.metadataTableBasePath)), "Metadata table should exist");
                FileStatus fileStatus = this.fs.getFileStatus(new Path(this.metadataTableBasePath));
                changeTableVersion(HoodieTableVersion.TWO);
                String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
                this.metaClient.reloadActiveTimeline();
                FileStatus fileStatus2 = this.fs.getFileStatus(new Path(this.metadataTableBasePath));
                sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, getWriteConfig(true, true));
                Throwable th3 = null;
                try {
                    try {
                        List generateInserts2 = this.dataGen.generateInserts(createNewInstantTime2, 5);
                        sparkRDDWriteClient.startCommitWithTime(createNewInstantTime2);
                        Assertions.assertNoWriteErrors(sparkRDDWriteClient.bulkInsert(this.jsc.parallelize(generateInserts2, 1), createNewInstantTime2).collect());
                        if (sparkRDDWriteClient != null) {
                            if (0 != 0) {
                                try {
                                    sparkRDDWriteClient.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                sparkRDDWriteClient.close();
                            }
                        }
                        org.junit.jupiter.api.Assertions.assertTrue(this.fs.exists(new Path(this.metadataTableBasePath)), "Metadata table should exist");
                        org.junit.jupiter.api.Assertions.assertTrue(this.fs.getFileStatus(new Path(this.metadataTableBasePath)).getModificationTime() > fileStatus2.getModificationTime());
                        initMetaClient();
                        org.junit.jupiter.api.Assertions.assertEquals(this.metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FIVE.versionCode());
                        org.junit.jupiter.api.Assertions.assertTrue(this.fs.exists(new Path(this.metadataTableBasePath)), "Metadata table should exist");
                        org.junit.jupiter.api.Assertions.assertTrue(fileStatus.getModificationTime() < this.fs.getFileStatus(new Path(this.metadataTableBasePath)).getModificationTime());
                        new UpgradeDowngrade(this.metaClient, writeConfig, this.context, SparkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.TWO, (String) null);
                        org.junit.jupiter.api.Assertions.assertEquals(this.metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.TWO.versionCode());
                        org.junit.jupiter.api.Assertions.assertFalse(this.fs.exists(new Path(this.metadataTableBasePath)), "Metadata table should not exist");
                    } finally {
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, InterruptedException {
        FileStatus fileStatus;
        String createNewInstantTime;
        Throwable th;
        init(HoodieTableType.COPY_ON_WRITE, false);
        HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(this.jsc);
        String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
        Properties properties = new Properties();
        properties.setProperty("hoodie.write.lock.filesystem.path", this.basePath + "/.hoodie/.locks");
        properties.setProperty("hoodie.write.lock.client.num_retries", "3");
        properties.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        HoodieWriteConfig build = getWriteConfigBuilder(false, true, false).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()).withProperties(properties).build();
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, build);
        Throwable th2 = null;
        try {
            try {
                List generateInserts = this.dataGen.generateInserts(createNewInstantTime2, 5);
                sparkRDDWriteClient.startCommitWithTime(createNewInstantTime2);
                sparkRDDWriteClient.commit(createNewInstantTime2, sparkRDDWriteClient.insert(this.jsc.parallelize(generateInserts, 1), createNewInstantTime2));
                if (sparkRDDWriteClient != null) {
                    if (0 != 0) {
                        try {
                            sparkRDDWriteClient.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        sparkRDDWriteClient.close();
                    }
                }
                org.junit.jupiter.api.Assertions.assertTrue(this.fs.exists(new Path(this.metadataTableBasePath)), "Metadata table should exist");
                fileStatus = this.fs.getFileStatus(new Path(this.metadataTableBasePath));
                this.metaClient.reloadActiveTimeline();
                createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
                sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, build);
                th = null;
            } finally {
            }
            try {
                try {
                    List generateInserts2 = this.dataGen.generateInserts(createNewInstantTime, 5);
                    sparkRDDWriteClient.startCommitWithTime(createNewInstantTime);
                    sparkRDDWriteClient.insert(this.jsc.parallelize(generateInserts2, 1), createNewInstantTime);
                    if (sparkRDDWriteClient != null) {
                        if (0 != 0) {
                            try {
                                sparkRDDWriteClient.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            sparkRDDWriteClient.close();
                        }
                    }
                    changeTableVersion(HoodieTableVersion.TWO);
                    HoodieWriteConfig build2 = getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(false).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()).withProperties(properties).withEmbeddedTimelineServerEnabled(false).build();
                    this.metaClient.reloadActiveTimeline();
                    String createNewInstantTime3 = HoodieActiveTimeline.createNewInstantTime();
                    sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, build2);
                    Throwable th5 = null;
                    try {
                        try {
                            List generateInserts3 = this.dataGen.generateInserts(createNewInstantTime3, 5);
                            sparkRDDWriteClient.startCommitWithTime(createNewInstantTime3);
                            Assertions.assertNoWriteErrors(sparkRDDWriteClient.insert(this.jsc.parallelize(generateInserts3, 1), createNewInstantTime3).collect());
                            if (sparkRDDWriteClient != null) {
                                if (0 != 0) {
                                    try {
                                        sparkRDDWriteClient.close();
                                    } catch (Throwable th6) {
                                        th5.addSuppressed(th6);
                                    }
                                } else {
                                    sparkRDDWriteClient.close();
                                }
                            }
                            initMetaClient();
                            org.junit.jupiter.api.Assertions.assertEquals(this.metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FIVE.versionCode());
                            org.junit.jupiter.api.Assertions.assertTrue(this.fs.exists(new Path(this.metadataTableBasePath)), "Metadata table should exist");
                            org.junit.jupiter.api.Assertions.assertTrue(fileStatus.getModificationTime() < this.fs.getFileStatus(new Path(this.metadataTableBasePath)).getModificationTime());
                        } finally {
                        }
                    } finally {
                        if (sparkRDDWriteClient != null) {
                            if (th5 != null) {
                                try {
                                    sparkRDDWriteClient.close();
                                } catch (Throwable th7) {
                                    th5.addSuppressed(th7);
                                }
                            } else {
                                sparkRDDWriteClient.close();
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testRollbackOfPartiallyFailedCommitWithNewPartitions() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE);
        HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(this.jsc);
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false, false).build(), true);
        Throwable th = null;
        try {
            try {
                String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
                sparkRDDWriteClient.startCommitWithTime(createNewInstantTime);
                List<HoodieRecord> generateInserts = this.dataGen.generateInserts(createNewInstantTime, 10);
                ArrayList arrayList = new ArrayList();
                for (HoodieRecord hoodieRecord : generateInserts) {
                    if (hoodieRecord.getPartitionPath().equals("2016/03/15") || hoodieRecord.getPartitionPath().equals("2015/03/16")) {
                        arrayList.add(hoodieRecord);
                    }
                }
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(this.jsc.parallelize(arrayList, 1), createNewInstantTime).collect());
                validateMetadata(sparkRDDWriteClient);
                String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
                sparkRDDWriteClient.startCommitWithTime(createNewInstantTime2);
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.insert(this.jsc.parallelize(this.dataGen.generateInserts(createNewInstantTime2, 20), 1), createNewInstantTime2).collect());
                validateMetadata(sparkRDDWriteClient);
                org.junit.jupiter.api.Assertions.assertTrue(this.fs.delete(new Path(this.basePath + "/.hoodie", HoodieTimeline.makeCommitFileName(createNewInstantTime2)), false));
                if (sparkRDDWriteClient != null) {
                    if (0 != 0) {
                        try {
                            sparkRDDWriteClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        sparkRDDWriteClient.close();
                    }
                }
                sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false, false).build(), true);
                Throwable th3 = null;
                try {
                    try {
                        String startCommit = sparkRDDWriteClient.startCommit();
                        Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(this.jsc.parallelize(this.dataGen.generateInserts(startCommit, 20), 1), startCommit).collect());
                        validateMetadata(sparkRDDWriteClient);
                        if (sparkRDDWriteClient != null) {
                            if (0 == 0) {
                                sparkRDDWriteClient.close();
                                return;
                            }
                            try {
                                sparkRDDWriteClient.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        }
                    } catch (Throwable th5) {
                        th3 = th5;
                        throw th5;
                    }
                } finally {
                }
            } catch (Throwable th6) {
                th = th6;
                throw th6;
            }
        } finally {
        }
    }

    @Test
    public void testDeletePartitions() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE);
        SparkRDDWriteClient hoodieWriteClient = getHoodieWriteClient(getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER).withCleanConfig(HoodieCleanConfig.newBuilder().withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build());
        Throwable th = null;
        try {
            try {
                String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
                hoodieWriteClient.startCommitWithTime(createNewInstantTime);
                List<HoodieRecord> generateInserts = this.dataGen.generateInserts(createNewInstantTime, 10);
                ArrayList arrayList = new ArrayList();
                for (HoodieRecord hoodieRecord : generateInserts) {
                    if (hoodieRecord.getPartitionPath().equals("2016/03/15") || hoodieRecord.getPartitionPath().equals("2015/03/16")) {
                        arrayList.add(hoodieRecord);
                    }
                }
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(this.jsc.parallelize(arrayList, 1), createNewInstantTime).collect());
                validateMetadata(hoodieWriteClient);
                String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime(5000L);
                hoodieWriteClient.startCommitWithTime(createNewInstantTime2);
                hoodieWriteClient.deletePartitions(Collections.singletonList("2016/03/15"), createNewInstantTime2);
                String createNewInstantTime3 = HoodieActiveTimeline.createNewInstantTime(5000L);
                hoodieWriteClient.startCommitWithTime(createNewInstantTime3);
                List<HoodieRecord> generateInserts2 = this.dataGen.generateInserts(createNewInstantTime3, 10);
                ArrayList arrayList2 = new ArrayList();
                for (HoodieRecord hoodieRecord2 : generateInserts2) {
                    if (hoodieRecord2.getPartitionPath().equals("2015/03/16")) {
                        arrayList2.add(hoodieRecord2);
                    }
                }
                Assertions.assertNoWriteErrors(hoodieWriteClient.upsert(this.jsc.parallelize(arrayList2, 1), createNewInstantTime3).collect());
                hoodieWriteClient.clean(HoodieActiveTimeline.createNewInstantTime(5000L));
                validateMetadata(hoodieWriteClient);
                org.junit.jupiter.api.Assertions.assertEquals(1, metadata(hoodieWriteClient).getAllPartitionPaths().size());
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testErrorCases() throws Exception {
        Throwable th;
        init(HoodieTableType.COPY_ON_WRITE);
        HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(this.jsc);
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false, false).build(), true);
        Throwable th2 = null;
        try {
            try {
                String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
                sparkRDDWriteClient.startCommitWithTime(createNewInstantTime);
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(this.jsc.parallelize(this.dataGen.generateInserts(createNewInstantTime, 10), 1), createNewInstantTime).collect());
                validateMetadata(sparkRDDWriteClient);
                String createNewInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
                sparkRDDWriteClient.startCommitWithTime(createNewInstantTime2);
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.bulkInsert(this.jsc.parallelize(this.dataGen.generateInserts(createNewInstantTime2, 5), 1), createNewInstantTime2).collect());
                validateMetadata(sparkRDDWriteClient);
                org.junit.jupiter.api.Assertions.assertTrue(this.fs.delete(new Path(this.basePath + "/.hoodie", HoodieTimeline.makeCommitFileName(createNewInstantTime2)), false));
                if (sparkRDDWriteClient != null) {
                    if (0 != 0) {
                        try {
                            sparkRDDWriteClient.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        sparkRDDWriteClient.close();
                    }
                }
                sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false, false).build(), true);
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    String startCommit = sparkRDDWriteClient.startCommit();
                    Assertions.assertNoWriteErrors(sparkRDDWriteClient.upsert(this.jsc.parallelize(this.dataGen.generateInserts(startCommit, 5), 1), startCommit).collect());
                    validateMetadata(sparkRDDWriteClient);
                    if (sparkRDDWriteClient != null) {
                        if (0 == 0) {
                            sparkRDDWriteClient.close();
                            return;
                        }
                        try {
                            sparkRDDWriteClient.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testNonPartitioned() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE, false);
        HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(this.jsc);
        HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator(new String[]{""});
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(hoodieSparkEngineContext, getWriteConfig(true, true));
        Throwable th = null;
        try {
            List generateInserts = hoodieTestDataGenerator.generateInserts("0000001", 10);
            sparkRDDWriteClient.startCommitWithTime("0000001");
            sparkRDDWriteClient.bulkInsert(this.jsc.parallelize(generateInserts, 1), "0000001").collect();
            validateMetadata(sparkRDDWriteClient);
            org.junit.jupiter.api.Assertions.assertTrue(metadata(sparkRDDWriteClient).getAllPartitionPaths().contains(""), "Must contain empty partition");
            if (sparkRDDWriteClient != null) {
                if (0 == 0) {
                    sparkRDDWriteClient.close();
                    return;
                }
                try {
                    sparkRDDWriteClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (sparkRDDWriteClient != null) {
                if (0 != 0) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testMetadataMetrics() throws Exception {
        init(HoodieTableType.COPY_ON_WRITE, false);
        SparkRDDWriteClient sparkRDDWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(this.jsc), getWriteConfigBuilder(true, true, true).build());
        Throwable th = null;
        try {
            try {
                String createNewInstantTime = HoodieActiveTimeline.createNewInstantTime();
                List generateInserts = this.dataGen.generateInserts(createNewInstantTime, 20);
                sparkRDDWriteClient.startCommitWithTime(createNewInstantTime);
                Assertions.assertNoWriteErrors(sparkRDDWriteClient.insert(this.jsc.parallelize(generateInserts, 1), createNewInstantTime).collect());
                validateMetadata(sparkRDDWriteClient);
                Registry registry = Registry.getRegistry("HoodieMetadata");
                org.junit.jupiter.api.Assertions.assertTrue(registry.getAllCounts().containsKey("initialize.count"));
                org.junit.jupiter.api.Assertions.assertTrue(registry.getAllCounts().containsKey("initialize.totalDuration"));
                org.junit.jupiter.api.Assertions.assertTrue(((Long) registry.getAllCounts().get("initialize.count")).longValue() >= 1);
                String str = MetadataPartitionType.FILES.getPartitionPath() + ".";
                org.junit.jupiter.api.Assertions.assertTrue(registry.getAllCounts().containsKey(str + "baseFileCount"));
                org.junit.jupiter.api.Assertions.assertTrue(registry.getAllCounts().containsKey(str + "logFileCount"));
                org.junit.jupiter.api.Assertions.assertTrue(registry.getAllCounts().containsKey(str + "totalBaseFileSizeInBytes"));
                org.junit.jupiter.api.Assertions.assertTrue(registry.getAllCounts().containsKey(str + "totalLogFileSizeInBytes"));
                if (sparkRDDWriteClient != null) {
                    if (0 == 0) {
                        sparkRDDWriteClient.close();
                        return;
                    }
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (sparkRDDWriteClient != null) {
                if (th != null) {
                    try {
                        sparkRDDWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    sparkRDDWriteClient.close();
                }
            }
            throw th4;
        }
    }

    private void doPreBootstrapOperations(HoodieTestTable hoodieTestTable) throws Exception {
        doPreBootstrapOperations(hoodieTestTable, "0000001", "0000002");
    }

    private void doPreBootstrapOperations(HoodieTestTable hoodieTestTable, String str, String str2) throws Exception {
        hoodieTestTable.doWriteOperation(str, WriteOperationType.INSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 2, true);
        hoodieTestTable.doWriteOperation(str2, WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), 2, true);
        validateMetadata(hoodieTestTable);
    }

    private void doWriteInsertAndUpsertNonPartitioned(HoodieTestTable hoodieTestTable) throws Exception {
        doWriteInsertAndUpsert(hoodieTestTable, "0000001", "0000002", true);
    }

    private void doWriteInsertAndUpsert(HoodieTestTable hoodieTestTable) throws Exception {
        doWriteInsertAndUpsert(hoodieTestTable, "0000001", "0000002", false);
    }

    private HoodieWriteConfig getSmallInsertWriteConfig(int i, String str, long j, boolean z) {
        return getConfigBuilder(str, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(j).insertSplitSize(i).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(this.dataGen.getEstimatedFileSizeInBytes(200)).parquetMaxFileSize(this.dataGen.getEstimatedFileSizeInBytes(200)).build()).withMergeAllowDuplicateOnInserts(z).build();
    }

    public HoodieWriteConfig.Builder getConfigBuilder(String str, HoodieIndex.IndexType indexType, HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy) {
        return HoodieWriteConfig.newBuilder().withPath(this.basePath).withSchema(str).withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).withWriteStatusClass(MetadataMergeWriteStatus.class).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(hoodieFailedWritesCleaningPolicy).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1048576L).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1048576L).parquetMaxFileSize(1048576L).orcMaxFileSize(1048576L).build()).forTable("test-trip-table").withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()).withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withEnableBackupForRemoteFileSystemView(false).withRemoteServerPort(Integer.valueOf(timelineServicePort)).withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
    }

    private void validateMetadata(SparkRDDWriteClient sparkRDDWriteClient) throws IOException {
        SparkRDDWriteClient sparkRDDWriteClient2;
        HoodieWriteConfig config = sparkRDDWriteClient.getConfig();
        if (config.isEmbeddedTimelineServerEnabled()) {
            sparkRDDWriteClient.close();
            sparkRDDWriteClient2 = new SparkRDDWriteClient(sparkRDDWriteClient.getEngineContext(), sparkRDDWriteClient.getConfig());
        } else {
            sparkRDDWriteClient2 = sparkRDDWriteClient;
        }
        HoodieTableMetadata metadata = metadata(sparkRDDWriteClient2);
        org.junit.jupiter.api.Assertions.assertNotNull(metadata, "MetadataReader should have been initialized");
        if (config.isMetadataTableEnabled()) {
            HoodieTimer start = HoodieTimer.start();
            HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(this.jsc);
            List allPartitionPaths = new FileSystemBackedTableMetadata(hoodieSparkEngineContext, new SerializableConfiguration(this.hadoopConf), config.getBasePath(), config.shouldAssumeDatePartitioning().booleanValue()).getAllPartitionPaths();
            List allPartitionPaths2 = metadata.getAllPartitionPaths();
            Collections.sort(allPartitionPaths);
            Collections.sort(allPartitionPaths2);
            org.junit.jupiter.api.Assertions.assertEquals(allPartitionPaths.size(), allPartitionPaths2.size(), "Partitions should match");
            org.junit.jupiter.api.Assertions.assertTrue(allPartitionPaths.equals(allPartitionPaths2), "Partitions should match");
            this.metaClient = HoodieTableMetaClient.reload(this.metaClient);
            HoodieSparkTable create = HoodieSparkTable.create(config, hoodieSparkEngineContext);
            SyncableFileSystemView hoodieView = create.getHoodieView();
            Map allFilesInPartitions = metadata.getAllFilesInPartitions((List) allPartitionPaths.stream().map(str -> {
                return this.basePath + "/" + str;
            }).collect(Collectors.toList()));
            org.junit.jupiter.api.Assertions.assertEquals(allPartitionPaths.size(), allFilesInPartitions.size());
            allPartitionPaths.forEach(str2 -> {
                try {
                    Path path = str2.equals("") ? new Path(this.basePath) : new Path(this.basePath, str2);
                    FileStatus[] allDataFilesInPartition = FSUtils.getAllDataFilesInPartition(this.fs, path);
                    FileStatus[] allFilesInPartition = metadata.getAllFilesInPartition(path);
                    List<String> list = (List) Arrays.stream(allDataFilesInPartition).map(fileStatus -> {
                        return fileStatus.getPath().getName();
                    }).collect(Collectors.toList());
                    List<String> list2 = (List) Arrays.stream(allFilesInPartition).map(fileStatus2 -> {
                        return fileStatus2.getPath().getName();
                    }).collect(Collectors.toList());
                    Collections.sort(list);
                    Collections.sort(list2);
                    org.junit.jupiter.api.Assertions.assertEquals(allDataFilesInPartition.length, ((FileStatus[]) allFilesInPartitions.get(path.toString())).length);
                    Arrays.stream(allFilesInPartition).forEach(fileStatus3 -> {
                        org.junit.jupiter.api.Assertions.assertTrue(fileStatus3.getLen() > 0);
                    });
                    if (list.size() != list2.size() || !list.equals(list2)) {
                        LOG.info("*** File system listing = " + Arrays.toString(list.toArray()));
                        LOG.info("*** Metadata listing = " + Arrays.toString(list2.toArray()));
                        for (String str2 : list) {
                            if (!list2.contains(str2)) {
                                LOG.error(str2 + "FsFilename " + str2 + " not found in Meta data");
                            }
                        }
                        for (String str3 : list2) {
                            if (!list.contains(str3)) {
                                LOG.error(str2 + "Metadata file " + str3 + " not found in original FS");
                            }
                        }
                    }
                    Arrays.stream(allFilesInPartition).forEach(fileStatus4 -> {
                        org.junit.jupiter.api.Assertions.assertTrue(fileStatus4.getBlockSize() > 0);
                    });
                    List list3 = (List) Arrays.stream(allDataFilesInPartition).map((v0) -> {
                        return v0.getBlockSize();
                    }).collect(Collectors.toList());
                    Collections.sort(list3);
                    List list4 = (List) Arrays.stream(allFilesInPartition).map((v0) -> {
                        return v0.getBlockSize();
                    }).collect(Collectors.toList());
                    Collections.sort(list4);
                    org.junit.jupiter.api.Assertions.assertEquals(list3, list4);
                    org.junit.jupiter.api.Assertions.assertEquals(list.size(), list2.size(), "Files within partition " + str2 + " should match");
                    org.junit.jupiter.api.Assertions.assertTrue(list.equals(list2), "Files within partition " + str2 + " should match");
                    List list5 = (List) hoodieView.getAllFileGroups(str2).collect(Collectors.toList());
                    list5.addAll((Collection) hoodieView.getAllReplacedFileGroups(str2).collect(Collectors.toList()));
                    list5.forEach(hoodieFileGroup -> {
                        LogManager.getLogger(TestHoodieBackedMetadata.class).info(hoodieFileGroup);
                    });
                    list5.forEach(hoodieFileGroup2 -> {
                        hoodieFileGroup2.getAllBaseFiles().forEach(hoodieBaseFile -> {
                            LogManager.getLogger(TestHoodieBackedMetadata.class).info(hoodieBaseFile);
                        });
                    });
                    list5.forEach(hoodieFileGroup3 -> {
                        hoodieFileGroup3.getAllFileSlices().forEach(fileSlice -> {
                            LogManager.getLogger(TestHoodieBackedMetadata.class).info(fileSlice);
                        });
                    });
                    org.junit.jupiter.api.Assertions.assertEquals(list2.size(), list5.stream().mapToLong(hoodieFileGroup4 -> {
                        return hoodieFileGroup4.getAllBaseFiles().count() + hoodieFileGroup4.getAllFileSlices().mapToLong(fileSlice -> {
                            return fileSlice.getLogFiles().count();
                        }).sum();
                    }).sum());
                } catch (IOException e) {
                    e.printStackTrace();
                    org.junit.jupiter.api.Assertions.assertTrue(false, "Exception should not be raised: " + e);
                }
            });
            HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(sparkRDDWriteClient2);
            org.junit.jupiter.api.Assertions.assertNotNull(metadataWriter, "MetadataWriter should have been initialized");
            HoodieWriteConfig writeConfig = metadataWriter.getWriteConfig();
            org.junit.jupiter.api.Assertions.assertFalse(writeConfig.isMetadataTableEnabled(), "No metadata table for metadata table");
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.hadoopConf).setBasePath(this.metadataTableBasePath).build();
            org.junit.jupiter.api.Assertions.assertEquals(build.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
            org.junit.jupiter.api.Assertions.assertEquals(build.getTableConfig().getBaseFileFormat(), HoodieFileFormat.HFILE, "Metadata Table base file format should be HFile");
            List allPartitionPaths3 = FSUtils.getAllPartitionPaths(hoodieSparkEngineContext, HoodieTableMetadata.getMetadataTableBasePath(this.basePath), false, false);
            org.junit.jupiter.api.Assertions.assertEquals(metadataWriter.getEnabledPartitionTypes().size(), allPartitionPaths3.size());
            HashMap hashMap = new HashMap();
            metadataWriter.getEnabledPartitionTypes().forEach(metadataPartitionType -> {
            });
            int cleanerFileVersionsRetained = writeConfig.getCleanerFileVersionsRetained() + 1;
            HoodieTableFileSystemView hoodieTableFileSystemView = new HoodieTableFileSystemView(build, build.getActiveTimeline());
            allPartitionPaths3.forEach(str3 -> {
                List list = (List) hoodieTableFileSystemView.getLatestFileSlices(str3).collect(Collectors.toList());
                org.junit.jupiter.api.Assertions.assertTrue(list.stream().map((v0) -> {
                    return v0.getBaseFile();
                }).count() <= ((long) ((MetadataPartitionType) hashMap.get(str3)).getFileGroupCount()), "Should have a single latest base file per file group");
                org.junit.jupiter.api.Assertions.assertTrue(list.size() <= ((MetadataPartitionType) hashMap.get(str3)).getFileGroupCount(), "Should have a single latest file slice per file group");
                org.junit.jupiter.api.Assertions.assertTrue(list.size() <= cleanerFileVersionsRetained * ((MetadataPartitionType) hashMap.get(str3)).getFileGroupCount(), "Should limit file slice to " + cleanerFileVersionsRetained + " per file group, but was " + list.size());
                List<HoodieLogFile> list2 = (List) ((FileSlice) list.get(0)).getLogFiles().collect(Collectors.toList());
                try {
                    if (MetadataPartitionType.FILES.getPartitionPath().equals(str3)) {
                        verifyMetadataRawRecords(create, list2, false);
                    }
                    if (MetadataPartitionType.COLUMN_STATS.getPartitionPath().equals(str3)) {
                        verifyMetadataColumnStatsRecords(list2);
                    }
                } catch (IOException e) {
                    LOG.error("Metadata record validation failed", e);
                    org.junit.jupiter.api.Assertions.fail("Metadata record validation failed");
                }
            });
            LOG.info("Validation time=" + start.endTimer());
        }
    }

    private void verifyMetadataColumnStatsRecords(List<HoodieLogFile> list) throws IOException {
        for (HoodieLogFile hoodieLogFile : list) {
            FileStatus[] listStatus = this.fs.listStatus(hoodieLogFile.getPath());
            MessageType readSchemaFromLogFile = TableSchemaResolver.readSchemaFromLogFile(this.fs, hoodieLogFile.getPath());
            if (readSchemaFromLogFile != null) {
                HoodieLogFormat.Reader newReader = HoodieLogFormat.newReader(this.fs, new HoodieLogFile(listStatus[0].getPath()), new AvroSchemaConverter().convert(readSchemaFromLogFile));
                Throwable th = null;
                while (newReader.hasNext()) {
                    try {
                        try {
                            HoodieDataBlock hoodieDataBlock = (HoodieLogBlock) newReader.next();
                            if (hoodieDataBlock instanceof HoodieDataBlock) {
                                ClosableIterator recordIterator = hoodieDataBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO);
                                Throwable th2 = null;
                                try {
                                    try {
                                        recordIterator.forEachRemaining(hoodieRecord -> {
                                            GenericRecord genericRecord = (GenericRecord) ((GenericRecord) hoodieRecord.getData()).get("ColumnStatsMetadata");
                                            org.junit.jupiter.api.Assertions.assertNotNull(genericRecord);
                                            org.junit.jupiter.api.Assertions.assertNotNull(genericRecord.get("columnName"));
                                            org.junit.jupiter.api.Assertions.assertNotNull(genericRecord.get("nullCount"));
                                        });
                                        if (recordIterator != null) {
                                            if (0 != 0) {
                                                try {
                                                    recordIterator.close();
                                                } catch (Throwable th3) {
                                                    th2.addSuppressed(th3);
                                                }
                                            } else {
                                                recordIterator.close();
                                            }
                                        }
                                    } finally {
                                    }
                                } finally {
                                }
                            }
                        } finally {
                        }
                    } catch (Throwable th4) {
                        if (newReader != null) {
                            if (th != null) {
                                try {
                                    newReader.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                newReader.close();
                            }
                        }
                        throw th4;
                    }
                }
                if (newReader != null) {
                    if (0 != 0) {
                        try {
                            newReader.close();
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                        }
                    } else {
                        newReader.close();
                    }
                }
            }
        }
    }

    private List<Path> getAllFiles(HoodieTableMetadata hoodieTableMetadata) throws Exception {
        LinkedList linkedList = new LinkedList();
        Iterator it = hoodieTableMetadata.getAllPartitionPaths().iterator();
        while (it.hasNext()) {
            for (FileStatus fileStatus : hoodieTableMetadata.getAllFilesInPartition(new Path(this.basePath, (String) it.next()))) {
                linkedList.add(fileStatus.getPath());
            }
        }
        return linkedList;
    }

    private HoodieBackedTableMetadataWriter metadataWriter(SparkRDDWriteClient sparkRDDWriteClient) {
        return SparkHoodieBackedTableMetadataWriter.create(this.hadoopConf, sparkRDDWriteClient.getConfig(), new HoodieSparkEngineContext(this.jsc));
    }

    private HoodieTableMetadata metadata(SparkRDDWriteClient sparkRDDWriteClient) {
        HoodieWriteConfig config = sparkRDDWriteClient.getConfig();
        return HoodieTableMetadata.create(sparkRDDWriteClient.getEngineContext(), config.getMetadataConfig(), config.getBasePath(), config.getSpillableMapBasePath());
    }

    private void changeTableVersion(HoodieTableVersion hoodieTableVersion) throws IOException {
        this.metaClient.getTableConfig().setTableVersion(hoodieTableVersion);
        FSDataOutputStream create = this.metaClient.getFs().create(new Path(this.metaClient.getMetaPath() + "/hoodie.properties"));
        Throwable th = null;
        try {
            try {
                this.metaClient.getTableConfig().getProps().store(create, "");
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    protected HoodieTableType getTableType() {
        return this.tableType;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 819302376:
                if (implMethodName.equals("lambda$testTableOperationsImpl$ee26d5e2$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/client/functional/TestHoodieBackedMetadata") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/common/model/HoodieRecord;)Lorg/apache/hudi/common/model/HoodieKey;")) {
                    return hoodieRecord -> {
                        return hoodieRecord.getKey();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
