package io.trino.tests.product.deltalake;

import com.amazonaws.services.s3.AmazonS3;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.trino.tempto.BeforeMethodWithContext;
import io.trino.tempto.assertions.QueryAssert;
import io.trino.tempto.query.QueryExecutor;
import io.trino.testing.TestingNames;
import io.trino.testng.services.Flaky;
import io.trino.tests.product.TestGroups;
import io.trino.tests.product.deltalake.util.DeltaLakeTestUtils;
import io.trino.tests.product.utils.QueryExecutors;
import org.assertj.core.api.Assertions;
import org.testng.annotations.Test;

/* loaded from: input_file:io/trino/tests/product/deltalake/TestDeltaLakeChangeDataFeedCompatibility.class */
public class TestDeltaLakeChangeDataFeedCompatibility extends BaseTestDeltaLakeS3Storage {

    @Named("s3.server_type")
    @Inject
    private String s3ServerType;
    private AmazonS3 s3Client;

    @BeforeMethodWithContext
    public void setup() {
        super.setUp();
        this.s3Client = new S3ClientFactory().createS3Client(this.s3ServerType);
    }

    @Flaky(issue = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH)
    @Test(groups = {TestGroups.DELTA_LAKE_DATABRICKS, TestGroups.DELTA_LAKE_OSS, TestGroups.DELTA_LAKE_EXCLUDE_73, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testUpdateTableWithCdf() {
        String str = "test_updates_to_table_with_cdf_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onTrino().executeQuery("CREATE TABLE delta.default." + str + " (col1 VARCHAR, updated_column INT) WITH (location = 's3://" + this.bucketName + "/databricks-compatibility-test-" + str + "', change_data_feed_enabled = true)", new QueryExecutor.QueryParam[0]);
            Assertions.assertThat(QueryExecutors.onTrino().executeQuery("SHOW CREATE TABLE " + str, new QueryExecutor.QueryParam[0]).getOnlyValue().toString()).contains(new CharSequence[]{"change_data_feed_enabled = true"});
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue1', 1)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue2', 2)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue3', 3)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + str + " SET updated_column = 5 WHERE col1 = 'testValue3'", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + str + " SET updated_column = 4, col1 = 'testValue4' WHERE col1 = 'testValue3'", new QueryExecutor.QueryParam[0]);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT col1, updated_column, _change_type, _commit_version FROM table_changes('default." + str + "', 0)", new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"testValue1", 1, "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue2", 2, "insert", 2L}), QueryAssert.Row.row(new Object[]{"testValue3", 3, "insert", 3L}), QueryAssert.Row.row(new Object[]{"testValue3", 3, "update_preimage", 4L}), QueryAssert.Row.row(new Object[]{"testValue3", 5, "update_postimage", 4L}), QueryAssert.Row.row(new Object[]{"testValue3", 5, "update_preimage", 5L}), QueryAssert.Row.row(new Object[]{"testValue4", 4, "update_postimage", 5L})});
            QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + str, new QueryExecutor.QueryParam[0]);
        } catch (Throwable th) {
            QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + str, new QueryExecutor.QueryParam[0]);
            throw th;
        }
    }

    @Flaky(issue = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH)
    @Test(groups = {TestGroups.DELTA_LAKE_DATABRICKS, TestGroups.DELTA_LAKE_OSS, TestGroups.DELTA_LAKE_EXCLUDE_73, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testUpdatePartitionedTableWithCdf() {
        String str = "test_updates_to_partitioned_table_with_cdf_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + str + " (updated_column STRING, partitioning_column_1 INT, partitioning_column_2 STRING) USING DELTA PARTITIONED BY (partitioning_column_1, partitioning_column_2) LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + str + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue1', 1, 'partition1')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue2', 2, 'partition2')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue3', 3, 'partition3')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + str + " SET updated_column = 'testValue5' WHERE partitioning_column_1 = 3", new QueryExecutor.QueryParam[0]);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT updated_column, partitioning_column_1, partitioning_column_2, _change_type, _commit_version FROM table_changes('default." + str + "', 0)", new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"testValue1", 1, "partition1", "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue2", 2, "partition2", "insert", 2L}), QueryAssert.Row.row(new Object[]{"testValue3", 3, "partition3", "insert", 3L}), QueryAssert.Row.row(new Object[]{"testValue3", 3, "partition3", "update_preimage", 4L}), QueryAssert.Row.row(new Object[]{"testValue5", 3, "partition3", "update_postimage", 4L})});
        } finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str);
        }
    }

    @Flaky(issue = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH)
    @Test(groups = {TestGroups.DELTA_LAKE_DATABRICKS, TestGroups.DELTA_LAKE_OSS, TestGroups.DELTA_LAKE_EXCLUDE_73, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testUpdateTableWithManyRowsInsertedInTheSameQueryAndCdfEnabled() {
        String str = "test_updates_to_table_with_many_rows_inserted_in_one_query_cdf_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + str + " (col1 STRING, updated_column INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + str + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue1', 1), ('testValue2', 2), ('testValue3', 3)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + str + " SET updated_column = 5 WHERE col1 = 'testValue3'", new QueryExecutor.QueryParam[0]);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT col1, updated_column, _change_type, _commit_version FROM table_changes('default." + str + "', 0)", new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"testValue1", 1, "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue2", 2, "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue3", 3, "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue3", 3, "update_preimage", 2L}), QueryAssert.Row.row(new Object[]{"testValue3", 5, "update_postimage", 2L})});
        } finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str);
        }
    }

    @Flaky(issue = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH)
    @Test(groups = {TestGroups.DELTA_LAKE_DATABRICKS, TestGroups.DELTA_LAKE_OSS, TestGroups.DELTA_LAKE_EXCLUDE_73, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testUpdatePartitionedTableWithManyRowsInsertedInTheSameRequestAndCdfEnabled() {
        String str = "test_updates_to_partitioned_table_with_many_rows_inserted_in_one_query_cdf_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + str + " (updated_column STRING, partitioning_column_1 INT, partitioning_column_2 STRING) USING DELTA PARTITIONED BY (partitioning_column_1, partitioning_column_2) LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + str + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue1', 1, 'partition1'), ('testValue2', 2, 'partition2'), ('testValue3', 3, 'partition3')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + str + " SET updated_column = 'testValue5' WHERE partitioning_column_1 = 3", new QueryExecutor.QueryParam[0]);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT updated_column, partitioning_column_1, partitioning_column_2, _change_type, _commit_version FROM table_changes('default." + str + "', 0)", new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"testValue1", 1, "partition1", "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue2", 2, "partition2", "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue3", 3, "partition3", "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue3", 3, "partition3", "update_preimage", 2L}), QueryAssert.Row.row(new Object[]{"testValue5", 3, "partition3", "update_postimage", 2L})});
        } finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str);
        }
    }

    @Flaky(issue = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH)
    @Test(groups = {TestGroups.DELTA_LAKE_DATABRICKS, TestGroups.DELTA_LAKE_OSS, TestGroups.DELTA_LAKE_EXCLUDE_73, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testUpdatePartitionedTableCdfEnabledAndPartitioningColumnUpdated() {
        String str = "test_updates_partitioning_column_in_table_with_cdf_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + str + " (updated_column STRING, partitioning_column_1 INT, partitioning_column_2 STRING) USING DELTA PARTITIONED BY (partitioning_column_1, partitioning_column_2) LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + str + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue1', 1, 'partition1'), ('testValue2', 2, 'partition2'), ('testValue3', 3, 'partition3')", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + str + " SET partitioning_column_1 = 5 WHERE partitioning_column_2 = 'partition1'", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + str + " SET partitioning_column_1 = 4, updated_column = 'testValue4' WHERE partitioning_column_2 = 'partition2'", new QueryExecutor.QueryParam[0]);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT updated_column, partitioning_column_1, partitioning_column_2, _change_type, _commit_version FROM table_changes('default." + str + "', 0)", new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"testValue1", 1, "partition1", "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue2", 2, "partition2", "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue3", 3, "partition3", "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue1", 1, "partition1", "update_preimage", 2L}), QueryAssert.Row.row(new Object[]{"testValue1", 5, "partition1", "update_postimage", 2L}), QueryAssert.Row.row(new Object[]{"testValue2", 2, "partition2", "update_preimage", 3L}), QueryAssert.Row.row(new Object[]{"testValue4", 4, "partition2", "update_postimage", 3L})});
        } finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str);
        }
    }

    @Flaky(issue = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH)
    @Test(groups = {TestGroups.DELTA_LAKE_DATABRICKS, TestGroups.DELTA_LAKE_OSS, TestGroups.DELTA_LAKE_EXCLUDE_73, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testUpdateTableWithCdfEnabledAfterTableIsAlreadyCreated() {
        String str = "test_updates_to_table_with_cdf_enabled_later_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + str + " (col1 STRING, updated_column INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + str + "'", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue1', 1)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue2', 2)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue3', 3)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + str + " SET updated_column = 5 WHERE col1 = 'testValue3'", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("ALTER TABLE default." + str + " SET TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            long longValue = ((Long) QueryExecutors.onDelta().executeQuery("DESCRIBE HISTORY default." + str + " LIMIT 1", new QueryExecutor.QueryParam[0]).row(0).get(0)).longValue();
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + str + " SET updated_column = 4 WHERE col1 = 'testValue3'", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertQueryFailure(() -> {
                return QueryExecutors.onDelta().executeQuery("SELECT col1, updated_column, _change_type, _commit_version FROM table_changes('default." + str + "', 0)", new QueryExecutor.QueryParam[0]);
            }).hasMessageMatching("(?s)(.*Error getting change data for range \\[0 , 6] as change data was not\nrecorded for version \\[0].*)");
            QueryExecutors.onTrino().executeQuery("INSERT INTO delta.default." + str + " VALUES ('testValue6', 6)", new QueryExecutor.QueryParam[0]);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery(String.format("SELECT col1, updated_column, _change_type, _commit_version FROM table_changes('default.%s', %d)", str, Long.valueOf(longValue)), new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"testValue3", 5, "update_preimage", 6L}), QueryAssert.Row.row(new Object[]{"testValue3", 4, "update_postimage", 6L}), QueryAssert.Row.row(new Object[]{"testValue6", 6, "insert", 7L})});
            long longValue2 = ((Long) QueryExecutors.onDelta().executeQuery("DESCRIBE HISTORY default." + str + " LIMIT 1", new QueryExecutor.QueryParam[0]).row(0).get(0)).longValue();
            QueryExecutors.onDelta().executeQuery("ALTER TABLE default." + str + " SET TBLPROPERTIES (delta.enableChangeDataFeed = false)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("INSERT INTO delta.default." + str + " VALUES ('testValue7', 7)", new QueryExecutor.QueryParam[0]);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT col1, updated_column, _change_type, _commit_version " + String.format("FROM table_changes('default.%s', %d, %d)", str, Long.valueOf(longValue), Long.valueOf(longValue2)), new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"testValue3", 5, "update_preimage", 6L}), QueryAssert.Row.row(new Object[]{"testValue3", 4, "update_postimage", 6L}), QueryAssert.Row.row(new Object[]{"testValue6", 6, "insert", 7L})});
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str);
        } catch (Throwable th) {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str);
            throw th;
        }
    }

    @Flaky(issue = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH)
    @Test(groups = {TestGroups.DELTA_LAKE_DATABRICKS, TestGroups.DELTA_LAKE_OSS, TestGroups.DELTA_LAKE_EXCLUDE_73, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testDeleteFromTableWithCdf() {
        String str = "test_deletes_from_table_with_cdf_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + str + " (col1 STRING, updated_column INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + str + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES('testValue1', 1)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES('testValue2', 2)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES('testValue3', 3)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("DELETE FROM delta.default." + str + " WHERE col1 = 'testValue3'", new QueryExecutor.QueryParam[0]);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT col1, updated_column, _change_type, _commit_version FROM table_changes('default." + str + "', 0)", new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"testValue1", 1, "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue2", 2, "insert", 2L}), QueryAssert.Row.row(new Object[]{"testValue3", 3, "insert", 3L}), QueryAssert.Row.row(new Object[]{"testValue3", 3, "delete", 4L})});
        } finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str);
        }
    }

    @Flaky(issue = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH)
    @Test(groups = {TestGroups.DELTA_LAKE_DATABRICKS, TestGroups.DELTA_LAKE_OSS, TestGroups.DELTA_LAKE_EXCLUDE_73, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testMergeUpdateIntoTableWithCdfEnabled() {
        String str = "test_merge_update_into_table_with_cdf_" + TestingNames.randomNameSuffix();
        String str2 = "test_merge_update_into_table_with_cdf_data_table_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + str + " (nationkey INT, name STRING, regionkey INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + str + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + str2 + " (nationkey INT, name STRING, regionkey INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + str2 + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES (1, 'nation1', 100)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES (2, 'nation2', 200)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES (3, 'nation3', 300)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str2 + " VALUES (1000, 'nation1000', 1000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str2 + " VALUES (2, 'nation2', 20000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str2 + " VALUES (3000, 'nation3000', 3000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("MERGE INTO delta.default." + str + " cdf USING delta.default." + str2 + " n ON (cdf.nationkey = n.nationkey) WHEN MATCHED THEN UPDATE SET nationkey = (cdf.nationkey + n.nationkey + n.regionkey) WHEN NOT MATCHED THEN INSERT (nationkey, name, regionkey) VALUES (n.nationkey, n.name, n.regionkey)", new QueryExecutor.QueryParam[0]);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT * FROM " + str, new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{1000, "nation1000", 1000}), QueryAssert.Row.row(new Object[]{3000, "nation3000", 3000}), QueryAssert.Row.row(new Object[]{1, "nation1", 100}), QueryAssert.Row.row(new Object[]{3, "nation3", 300}), QueryAssert.Row.row(new Object[]{20004, "nation2", 200})});
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT nationkey, name, regionkey, _change_type, _commit_version FROM table_changes('default." + str + "', 0)", new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{1, "nation1", 100, "insert", 1}), QueryAssert.Row.row(new Object[]{2, "nation2", 200, "insert", 2}), QueryAssert.Row.row(new Object[]{3, "nation3", 300, "insert", 3}), QueryAssert.Row.row(new Object[]{1000, "nation1000", 1000, "insert", 4}), QueryAssert.Row.row(new Object[]{3000, "nation3000", 3000, "insert", 4}), QueryAssert.Row.row(new Object[]{2, "nation2", 200, "update_preimage", 4}), QueryAssert.Row.row(new Object[]{20004, "nation2", 200, "update_postimage", 4})});
        } finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str);
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str2);
        }
    }

    @Flaky(issue = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH)
    @Test(groups = {TestGroups.DELTA_LAKE_DATABRICKS, TestGroups.DELTA_LAKE_OSS, TestGroups.DELTA_LAKE_EXCLUDE_73, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testMergeDeleteIntoTableWithCdfEnabled() {
        String str = "test_merge_delete_into_table_with_cdf_" + TestingNames.randomNameSuffix();
        String str2 = "test_merge_delete_into_table_with_cdf_data_table_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + str + " (nationkey INT, name STRING, regionkey INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + str + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + str2 + " (nationkey INT, name STRING, regionkey INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + str2 + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES (1, 'nation1', 100)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES (2, 'nation2', 200)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES (3, 'nation3', 300)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str2 + " VALUES (1000, 'nation1000', 1000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str2 + " VALUES (2, 'nation2', 20000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str2 + " VALUES (3000, 'nation3000', 3000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("MERGE INTO delta.default." + str + " cdf USING delta.default." + str2 + " n ON (cdf.nationkey = n.nationkey) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (nationkey, name, regionkey) VALUES (n.nationkey, n.name, n.regionkey)", new QueryExecutor.QueryParam[0]);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT * FROM " + str, new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{1000, "nation1000", 1000}), QueryAssert.Row.row(new Object[]{3000, "nation3000", 3000}), QueryAssert.Row.row(new Object[]{1, "nation1", 100}), QueryAssert.Row.row(new Object[]{3, "nation3", 300})});
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT nationkey, name, regionkey, _change_type, _commit_version FROM table_changes('default." + str + "', 0)", new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{1, "nation1", 100, "insert", 1}), QueryAssert.Row.row(new Object[]{2, "nation2", 200, "insert", 2}), QueryAssert.Row.row(new Object[]{3, "nation3", 300, "insert", 3}), QueryAssert.Row.row(new Object[]{1000, "nation1000", 1000, "insert", 4}), QueryAssert.Row.row(new Object[]{3000, "nation3000", 3000, "insert", 4}), QueryAssert.Row.row(new Object[]{2, "nation2", 200, "delete", 4})});
        } finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str);
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str2);
        }
    }

    @Flaky(issue = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH)
    @Test(groups = {TestGroups.DELTA_LAKE_DATABRICKS, TestGroups.DELTA_LAKE_OSS, TestGroups.DELTA_LAKE_EXCLUDE_73, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testMergeMixedDeleteAndUpdateIntoTableWithCdfEnabled() {
        String str = "test_merge_mixed_delete_and_update_into_table_with_cdf_" + TestingNames.randomNameSuffix();
        String str2 = "test_merge_mixed_delete_and_update_into_table_with_cdf_data_table_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + str + " (page_id INT, page_url STRING, views INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + str + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + str2 + " (page_id INT, page_url STRING, views INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + str2 + "'", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES (1, 'pageUrl1', 100)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES (2, 'pageUrl2', 200)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES (3, 'pageUrl3', 300)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES (4, 'pageUrl4', 400)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str2 + " VALUES (1000, 'pageUrl1000', 1000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str2 + " VALUES (2, 'pageUrl2', 20000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str2 + " VALUES (3000, 'pageUrl3000', 3000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str2 + " VALUES (4, 'pageUrl4000', 4000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("MERGE INTO delta.default." + str + " targetTable USING delta.default." + str2 + " sourceTable ON (targetTable.page_id = sourceTable.page_id) WHEN MATCHED AND targetTable.page_id = 2 THEN DELETE WHEN MATCHED AND targetTable.page_id > 2 THEN UPDATE SET views = (targetTable.views + sourceTable.views) WHEN NOT MATCHED THEN INSERT (page_id, page_url, views) VALUES (sourceTable.page_id, sourceTable.page_url, sourceTable.views)", new QueryExecutor.QueryParam[0]);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT * FROM " + str, new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{1000, "pageUrl1000", 1000}), QueryAssert.Row.row(new Object[]{3000, "pageUrl3000", 3000}), QueryAssert.Row.row(new Object[]{4, "pageUrl4", 4400}), QueryAssert.Row.row(new Object[]{1, "pageUrl1", 100}), QueryAssert.Row.row(new Object[]{3, "pageUrl3", 300})});
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT page_id, page_url, views, _change_type, _commit_version FROM table_changes('default." + str + "', 0)", new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{1, "pageUrl1", 100, "insert", 1}), QueryAssert.Row.row(new Object[]{2, "pageUrl2", 200, "insert", 2}), QueryAssert.Row.row(new Object[]{3, "pageUrl3", 300, "insert", 3}), QueryAssert.Row.row(new Object[]{4, "pageUrl4", 400, "insert", 4}), QueryAssert.Row.row(new Object[]{1000, "pageUrl1000", 1000, "insert", 5}), QueryAssert.Row.row(new Object[]{3000, "pageUrl3000", 3000, "insert", 5}), QueryAssert.Row.row(new Object[]{2, "pageUrl2", 200, "delete", 5}), QueryAssert.Row.row(new Object[]{4, "pageUrl4", 4400, "update_postimage", 5}), QueryAssert.Row.row(new Object[]{4, "pageUrl4", 400, "update_preimage", 5})});
        } finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str);
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str2);
        }
    }

    @Flaky(issue = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH)
    @Test(groups = {TestGroups.DELTA_LAKE_DATABRICKS, TestGroups.DELTA_LAKE_OSS, TestGroups.DELTA_LAKE_EXCLUDE_73, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testDeleteFromNullPartitionWithCdfEnabled() {
        String str = "test_delete_from_null_partition_with_cdf_enabled" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + str + " (updated_column STRING, partitioning_column_1 INT, partitioning_column_2 STRING) USING DELTA PARTITIONED BY (partitioning_column_1, partitioning_column_2) LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + str + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue1', 1, 'partition1'), ('testValue2', 2, 'partition2'), ('testValue3', 3, NULL)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("DELETE FROM delta.default." + str + " WHERE partitioning_column_2 IS NULL", new QueryExecutor.QueryParam[0]);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onTrino().executeQuery("SELECT * FROM delta.default." + str, new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"testValue1", 1, "partition1"}), QueryAssert.Row.row(new Object[]{"testValue2", 2, "partition2"})});
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT updated_column, partitioning_column_1, partitioning_column_2, _change_type, _commit_version FROM table_changes('default." + str + "', 0)", new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"testValue1", 1, "partition1", "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue2", 2, "partition2", "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue3", 3, null, "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue3", 3, null, "delete", 2L})});
        } finally {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str);
        }
    }

    @Flaky(issue = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH)
    @Test(groups = {TestGroups.DELTA_LAKE_DATABRICKS, TestGroups.DELTA_LAKE_OSS, TestGroups.DELTA_LAKE_EXCLUDE_73, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testTurningOnAndOffCdfFromTrino() {
        String str = "test_turning_cdf_on_and_off_from_trino" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onTrino().executeQuery("CREATE TABLE delta.default." + str + " (col1 VARCHAR, updated_column INT) WITH (location = 's3://" + this.bucketName + "/databricks-compatibility-test-" + str + "', change_data_feed_enabled = true)", new QueryExecutor.QueryParam[0]);
            Assertions.assertThat(QueryExecutors.onTrino().executeQuery("SHOW CREATE TABLE " + str, new QueryExecutor.QueryParam[0]).getOnlyValue().toString()).contains(new CharSequence[]{"change_data_feed_enabled = true"});
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue1', 1)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("UPDATE default." + str + " SET updated_column = 10 WHERE col1 = 'testValue1'", new QueryExecutor.QueryParam[0]);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT col1, updated_column, _change_type, _commit_version FROM table_changes('default." + str + "', 0, 2)", new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"testValue1", 1, "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue1", 1, "update_preimage", 2L}), QueryAssert.Row.row(new Object[]{"testValue1", 10, "update_postimage", 2L})});
            QueryExecutors.onTrino().executeQuery("ALTER TABLE delta.default." + str + " SET PROPERTIES change_data_feed_enabled = false", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue2', 2)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("UPDATE default." + str + " SET updated_column = 20 WHERE col1 = 'testValue2'", new QueryExecutor.QueryParam[0]);
            QueryAssert.assertQueryFailure(() -> {
                return QueryExecutors.onDelta().executeQuery("SELECT col1, updated_column, _change_type, _commit_version FROM table_changes('default." + str + "', 4, 5)", new QueryExecutor.QueryParam[0]);
            }).hasMessageMatching("(?s)(.*Error getting change data for range \\[4 , 5] as change data was not\nrecorded for version \\[4].*)");
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT col1, updated_column, _change_type, _commit_version FROM table_changes('default." + str + "', 0, 2)", new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"testValue1", 1, "insert", 1L}), QueryAssert.Row.row(new Object[]{"testValue1", 1, "update_preimage", 2L}), QueryAssert.Row.row(new Object[]{"testValue1", 10, "update_postimage", 2L})});
            QueryExecutors.onTrino().executeQuery("ALTER TABLE delta.default." + str + " SET PROPERTIES change_data_feed_enabled = true", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue3', 3)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("UPDATE default." + str + " SET updated_column = 30 WHERE col1 = 'testValue3'", new QueryExecutor.QueryParam[0]);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT col1, updated_column, _change_type, _commit_version FROM table_changes('default." + str + "', 7, 8)", new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"testValue3", 3, "insert", 7L}), QueryAssert.Row.row(new Object[]{"testValue3", 3, "update_preimage", 8L}), QueryAssert.Row.row(new Object[]{"testValue3", 30, "update_postimage", 8L})});
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT * FROM " + str, new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"testValue1", 10}), QueryAssert.Row.row(new Object[]{"testValue2", 20}), QueryAssert.Row.row(new Object[]{"testValue3", 30})});
            QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + str, new QueryExecutor.QueryParam[0]);
        } catch (Throwable th) {
            QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + str, new QueryExecutor.QueryParam[0]);
            throw th;
        }
    }

    @Flaky(issue = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH)
    @Test(groups = {TestGroups.DELTA_LAKE_DATABRICKS, TestGroups.DELTA_LAKE_OSS, TestGroups.DELTA_LAKE_EXCLUDE_73, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testThatCdfDoesntWorkWhenPropertyIsNotSet() {
        String str = "test_cdf_doesnt_work_when_property_is_not_set_1_" + TestingNames.randomNameSuffix();
        String str2 = "test_cdf_doesnt_work_when_property_is_not_set_2_" + TestingNames.randomNameSuffix();
        assertThereIsNoCdfFileGenerated(str, "");
        assertThereIsNoCdfFileGenerated(str2, "change_data_feed_enabled = false");
    }

    @Flaky(issue = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DeltaLakeTestUtils.DATABRICKS_COMMUNICATION_FAILURE_MATCH)
    @Test(groups = {TestGroups.DELTA_LAKE_DATABRICKS, TestGroups.DELTA_LAKE_OSS, TestGroups.DELTA_LAKE_EXCLUDE_73, TestGroups.PROFILE_SPECIFIC_TESTS})
    public void testTrinoCanReadCdfEntriesGeneratedByDelta() {
        String str = "test_trino_can_read_cdf_entries_generated_by_delta_target_" + TestingNames.randomNameSuffix();
        String str2 = "test_trino_can_read_cdf_entries_generated_by_delta_source_" + TestingNames.randomNameSuffix();
        try {
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + str + " (page_id INT, page_url STRING, views INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + str + "'TBLPROPERTIES (delta.enableChangeDataFeed = true)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("CREATE TABLE default." + str2 + " (page_id INT, page_url STRING, views INT) USING DELTA LOCATION 's3://" + this.bucketName + "/databricks-compatibility-test-" + str2 + "'", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES (1, 'pageUrl1', 100), (2, 'pageUrl2', 200), (3, 'pageUrl3', 300)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES (4, 'pageUrl4', 400)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str2 + " VALUES (1000, 'pageUrl1000', 1000), (2, 'pageUrl2', 20000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str2 + " VALUES (3000, 'pageUrl3000', 3000), (4, 'pageUrl4000', 4000)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("MERGE INTO default." + str + " targetTable USING default." + str2 + " sourceTable ON (targetTable.page_id = sourceTable.page_id) WHEN MATCHED AND targetTable.page_id = 2 THEN DELETE WHEN MATCHED AND targetTable.page_id > 2 THEN UPDATE SET views = (targetTable.views + sourceTable.views) WHEN NOT MATCHED THEN INSERT (page_id, page_url, views) VALUES (sourceTable.page_id, sourceTable.page_url, sourceTable.views)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("UPDATE default." + str + " SET page_url = 'pageUrl30' WHERE page_id = 3", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("DELETE FROM default." + str + " WHERE page_url = 'pageUrl1'", new QueryExecutor.QueryParam[0]);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT * FROM " + str, new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{1000, "pageUrl1000", 1000}), QueryAssert.Row.row(new Object[]{3000, "pageUrl3000", 3000}), QueryAssert.Row.row(new Object[]{4, "pageUrl4", 4400}), QueryAssert.Row.row(new Object[]{3, "pageUrl30", 300})});
            QueryAssert.Row[] rowArr = {QueryAssert.Row.row(new Object[]{1, "pageUrl1", 100, "insert", 1}), QueryAssert.Row.row(new Object[]{2, "pageUrl2", 200, "insert", 1}), QueryAssert.Row.row(new Object[]{3, "pageUrl3", 300, "insert", 1}), QueryAssert.Row.row(new Object[]{4, "pageUrl4", 400, "insert", 2}), QueryAssert.Row.row(new Object[]{1000, "pageUrl1000", 1000, "insert", 3}), QueryAssert.Row.row(new Object[]{3000, "pageUrl3000", 3000, "insert", 3}), QueryAssert.Row.row(new Object[]{2, "pageUrl2", 200, "delete", 3}), QueryAssert.Row.row(new Object[]{4, "pageUrl4", 4400, "update_postimage", 3}), QueryAssert.Row.row(new Object[]{4, "pageUrl4", 400, "update_preimage", 3}), QueryAssert.Row.row(new Object[]{3, "pageUrl3", 300, "update_preimage", 4}), QueryAssert.Row.row(new Object[]{3, "pageUrl30", 300, "update_postimage", 4}), QueryAssert.Row.row(new Object[]{1, "pageUrl1", 100, "delete", 5})};
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onTrino().executeQuery("SELECT page_id, page_url, views, _change_type, _commit_version FROM TABLE(delta.system.table_changes('default', '" + str + "'))", new QueryExecutor.QueryParam[0]))).containsOnly(rowArr);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT page_id, page_url, views, _change_type, _commit_version FROM table_changes('default." + str + "', 0)", new QueryExecutor.QueryParam[0]))).containsOnly(rowArr);
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str);
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str2);
        } catch (Throwable th) {
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str);
            DeltaLakeTestUtils.dropDeltaTableWithRetry("default." + str2);
            throw th;
        }
    }

    private void assertThereIsNoCdfFileGenerated(String str, String str2) {
        try {
            QueryExecutors.onTrino().executeQuery("CREATE TABLE delta.default." + str + " (col1 VARCHAR, updated_column INT) WITH (location = 's3://" + this.bucketName + "/databricks-compatibility-test-" + str + "'" + (str2.isEmpty() ? "" : ", " + str2) + ")", new QueryExecutor.QueryParam[0]);
            if (str2.isEmpty()) {
                Assertions.assertThat(QueryExecutors.onTrino().executeQuery("SHOW CREATE TABLE " + str, new QueryExecutor.QueryParam[0]).getOnlyValue().toString()).doesNotContain(new CharSequence[]{"change_data_feed_enabled"});
            } else {
                Assertions.assertThat(QueryExecutors.onTrino().executeQuery("SHOW CREATE TABLE " + str, new QueryExecutor.QueryParam[0]).getOnlyValue().toString()).contains(new CharSequence[]{str2});
            }
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue1', 1)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue2', 2)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("INSERT INTO default." + str + " VALUES ('testValue3', 3)", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onTrino().executeQuery("UPDATE delta.default." + str + " SET updated_column = 5 WHERE col1 = 'testValue3'", new QueryExecutor.QueryParam[0]);
            QueryExecutors.onDelta().executeQuery("UPDATE default." + str + " SET updated_column = 4 WHERE col1 = 'testValue2'", new QueryExecutor.QueryParam[0]);
            ((QueryAssert) Assertions.assertThat(QueryExecutors.onDelta().executeQuery("SELECT * FROM default." + str, new QueryExecutor.QueryParam[0]))).containsOnly(new QueryAssert.Row[]{QueryAssert.Row.row(new Object[]{"testValue1", 1}), QueryAssert.Row.row(new Object[]{"testValue2", 4}), QueryAssert.Row.row(new Object[]{"testValue3", 5})});
            assertThatThereIsNoChangeDataFiles(str);
            QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + str, new QueryExecutor.QueryParam[0]);
        } catch (Throwable th) {
            QueryExecutors.onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + str, new QueryExecutor.QueryParam[0]);
            throw th;
        }
    }

    private void assertThatThereIsNoChangeDataFiles(String str) {
        Assertions.assertThat(this.s3Client.listObjectsV2(this.bucketName, "databricks-compatibility-test-" + str + "/_change_data/").getObjectSummaries()).isEmpty();
    }
}
