package org.apache.spark.sql.hudi;

import java.io.File;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Assertions;
import org.scalactic.source.Position;
import org.scalatest.Tag;
import scala.Array$;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.math.Ordering$Int$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: TestCDCForSparkSQL.scala */
@ScalaSignature(bytes = "\u0006\u0001u3A!\u0002\u0004\u0001#!)a\u0003\u0001C\u0001/!)\u0011\u0004\u0001C\u00015!9A\tAI\u0001\n\u0003)\u0005\"\u0002)\u0001\t\u0003\t&A\u0005+fgR\u001cEi\u0011$peN\u0003\u0018M]6T#2S!a\u0002\u0005\u0002\t!,H-\u001b\u0006\u0003\u0013)\t1a]9m\u0015\tYA\"A\u0003ta\u0006\u00148N\u0003\u0002\u000e\u001d\u00051\u0011\r]1dQ\u0016T\u0011aD\u0001\u0004_J<7\u0001A\n\u0003\u0001I\u0001\"a\u0005\u000b\u000e\u0003\u0019I!!\u0006\u0004\u0003-!{w\u000eZ5f'B\f'o[*rYR+7\u000f\u001e\"bg\u0016\fa\u0001P5oSRtD#\u0001\r\u0011\u0005M\u0001\u0011\u0001D2eG\u0012\u000bG/\u0019$sC6,G\u0003B\u000e.s}\u0002\"\u0001\b\u0016\u000f\u0005uAcB\u0001\u0010(\u001d\tybE\u0004\u0002!K9\u0011\u0011\u0005J\u0007\u0002E)\u00111\u0005E\u0001\u0007yI|w\u000e\u001e \n\u0003=I!!\u0004\b\n\u0005-a\u0011BA\u0005\u000b\u0013\tI\u0003\"A\u0004qC\u000e\\\u0017mZ3\n\u0005-b#!\u0003#bi\u00064%/Y7f\u0015\tI\u0003\u0002C\u0003/\u0005\u0001\u0007q&\u0001\u0005cCN,\u0007+\u0019;i!\t\u0001dG\u0004\u00022iA\u0011\u0011E\r\u0006\u0002g\u0005)1oY1mC&\u0011QGM\u0001\u0007!J,G-\u001a4\n\u0005]B$AB*ue&twM\u0003\u00026e!)!H\u0001a\u0001w\u0005Q1\u000f^1si&tw\rV:\u0011\u0005qjT\"\u0001\u001a\n\u0005y\u0012$\u0001\u0002'p]\u001eDq\u0001\u0011\u0002\u0011\u0002\u0003\u0007\u0011)\u0001\u0005f]\u0012Lgn\u001a+t!\ra$iO\u0005\u0003\u0007J\u0012aa\u00149uS>t\u0017AF2eG\u0012\u000bG/\u0019$sC6,G\u0005Z3gCVdG\u000fJ\u001a\u0016\u0003\u0019S#!Q$,\u0003!\u0003\"!\u0013(\u000e\u0003)S!a\u0013'\u0002\u0013Ut7\r[3dW\u0016$'BA'3\u0003)\tgN\\8uCRLwN\\\u0005\u0003\u001f*\u0013\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u00039\t7o]3si\u000e#5i\u00149D]R$RAU+X3n\u0003\"\u0001P*\n\u0005Q\u0013$\u0001B+oSRDQA\u0016\u0003A\u0002m\tqa\u00193d\t\u0006$\u0018\rC\u0003Y\t\u0001\u00071(A\tfqB,7\r^3e\u0013:\u001cXM\u001d;D]RDQA\u0017\u0003A\u0002m\n\u0011#\u001a=qK\u000e$X\rZ+qI\u0006$Xm\u00118u\u0011\u0015aF\u00011\u0001<\u0003I)\u0007\u0010]3di\u0016$G)\u001a7fi\u0016$7I\u001c;")
/* loaded from: input_file:org/apache/spark/sql/hudi/TestCDCForSparkSQL.class */
public class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
    public Dataset<Row> cdcDataFrame(String str, long j, Option<Object> option) {
        DataFrameReader option2 = spark().read().format("hudi").option(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key(), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()).option(DataSourceReadOptions$.MODULE$.INCREMENTAL_FORMAT().key(), DataSourceReadOptions$.MODULE$.INCREMENTAL_FORMAT_CDC_VAL()).option(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key(), BoxesRunTime.boxToLong(j).toString());
        option.foreach(obj -> {
            return $anonfun$cdcDataFrame$1(option2, BoxesRunTime.unboxToLong(obj));
        });
        return option2.load(str);
    }

    public Option<Object> cdcDataFrame$default$3() {
        return None$.MODULE$;
    }

    public void assertCDCOpCnt(Dataset<Row> dataset, long j, long j2, long j3) {
        Assertions.assertEquals(j, dataset.where("op = 'i'").count());
        Assertions.assertEquals(j2, dataset.where("op = 'u'").count());
        Assertions.assertEquals(j3, dataset.where("op = 'd'").count());
    }

    public static final /* synthetic */ DataFrameReader $anonfun$cdcDataFrame$1(DataFrameReader dataFrameReader, long j) {
        return dataFrameReader.option(DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key(), BoxesRunTime.boxToLong(j).toString());
    }

    public static final /* synthetic */ void $anonfun$new$4(TestCDCForSparkSQL testCDCForSparkSQL, HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode, String str, File file) {
        String generateTableName = testCDCForSparkSQL.generateTableName();
        String sb = new StringBuilder(1).append(file.getCanonicalPath()).append("/").append(generateTableName).toString();
        testCDCForSparkSQL.spark().sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(508).append("\n               | create table ").append(generateTableName).append(" (\n               |  id int,\n               |  name string,\n               |  price double,\n               |  ts long\n               | ) using hudi\n               | tblproperties (\n               |   'primaryKey' = 'id',\n               |   'preCombineField' = 'ts',\n               |   'hoodie.table.cdc.enabled' = 'true',\n               |   'hoodie.table.cdc.supplemental.logging.mode' = '").append(hoodieCDCSupplementalLoggingMode.name()).append("',\n               |   type = '").append(str).append("'\n               | )\n               | location '").append(sb).append("'\n        ").toString())).stripMargin());
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setBasePath(sb).setConf(testCDCForSparkSQL.spark().sessionState().newHadoopConf()).build();
        testCDCForSparkSQL.spark().sql(new StringBuilder(81).append("insert into ").append(generateTableName).append(" values (1, 'a1', 11, 1000), (2, 'a2', 12, 1000), (3, 'a3', 13, 1000)").toString());
        String timestamp = ((HoodieInstant) build.reloadActiveTimeline().lastInstant().get()).getTimestamp();
        Dataset<Row> cdcDataFrame = testCDCForSparkSQL.cdcDataFrame(sb, new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1, testCDCForSparkSQL.cdcDataFrame$default$3());
        cdcDataFrame.show(false);
        testCDCForSparkSQL.assertCDCOpCnt(cdcDataFrame, 3L, 0L, 0L);
        testCDCForSparkSQL.spark().sql(new StringBuilder(42).append("insert into ").append(generateTableName).append(" values (1, 'a1_v2', 11, 1100)").toString());
        Dataset<Row> cdcDataFrame2 = testCDCForSparkSQL.cdcDataFrame(sb, new StringOps(Predef$.MODULE$.augmentString(((HoodieInstant) build.reloadActiveTimeline().lastInstant().get()).getTimestamp())).toLong() - 1, testCDCForSparkSQL.cdcDataFrame$default$3());
        cdcDataFrame2.show(false);
        testCDCForSparkSQL.assertCDCOpCnt(cdcDataFrame2, 0L, 1L, 0L);
        StructType schema = testCDCForSparkSQL.spark().read().format("hudi").load(sb).schema();
        testCDCForSparkSQL.checkAnswer((Row[]) cdcDataFrame2.select(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("op"), functions$.MODULE$.from_json(functions$.MODULE$.col("before"), schema).as("before"), functions$.MODULE$.from_json(functions$.MODULE$.col("after"), schema).as("after")})).select(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("op"), functions$.MODULE$.col("after.id"), functions$.MODULE$.col("before.name"), functions$.MODULE$.col("before.price"), functions$.MODULE$.col("after.name"), functions$.MODULE$.col("after.price")})).collect(), (Seq<Seq<Object>>) Predef$.MODULE$.wrapRefArray(new Seq[]{Predef$.MODULE$.genericWrapArray(Array$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"u", BoxesRunTime.boxToInteger(1), "a1", BoxesRunTime.boxToInteger(11), "a1_v2", BoxesRunTime.boxToInteger(11)}), ClassTag$.MODULE$.Any()))}));
        testCDCForSparkSQL.spark().sql(new StringBuilder(50).append("update ").append(generateTableName).append(" set name = 'a2_v2', ts = 1200 where id = 2").toString());
        Dataset<Row> cdcDataFrame3 = testCDCForSparkSQL.cdcDataFrame(sb, new StringOps(Predef$.MODULE$.augmentString(((HoodieInstant) build.reloadActiveTimeline().lastInstant().get()).getTimestamp())).toLong() - 1, testCDCForSparkSQL.cdcDataFrame$default$3());
        cdcDataFrame3.show(false);
        testCDCForSparkSQL.assertCDCOpCnt(cdcDataFrame3, 0L, 1L, 0L);
        testCDCForSparkSQL.spark().sql(new StringBuilder(25).append("delete from ").append(generateTableName).append(" where id = 3").toString());
        Dataset<Row> cdcDataFrame4 = testCDCForSparkSQL.cdcDataFrame(sb, new StringOps(Predef$.MODULE$.augmentString(((HoodieInstant) build.reloadActiveTimeline().lastInstant().get()).getTimestamp())).toLong() - 1, testCDCForSparkSQL.cdcDataFrame$default$3());
        cdcDataFrame4.show(false);
        testCDCForSparkSQL.assertCDCOpCnt(cdcDataFrame4, 0L, 0L, 1L);
        testCDCForSparkSQL.spark().sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(557).append("\n               | merge into ").append(generateTableName).append("\n               | using (\n               |  select * from (\n               |  select 1 as id, 'a1_v3' as name, cast(11 as double) as price, cast(1300 as long) as ts\n               |  union all\n               |  select 4 as id, 'a4' as name, cast(14 as double) as price, cast(1300 as long) as ts\n               |  )\n               | ) s0\n               | on s0.id = ").append(generateTableName).append(".id\n               | when matched then update set id = s0.id, name = s0.name, price = s0.price, ts = s0.ts\n               | when not matched then insert *\n        ").toString())).stripMargin());
        Dataset<Row> cdcDataFrame5 = testCDCForSparkSQL.cdcDataFrame(sb, new StringOps(Predef$.MODULE$.augmentString(((HoodieInstant) build.reloadActiveTimeline().lastInstant().get()).getTimestamp())).toLong() - 1, testCDCForSparkSQL.cdcDataFrame$default$3());
        cdcDataFrame5.show(false);
        testCDCForSparkSQL.assertCDCOpCnt(cdcDataFrame5, 1L, 1L, 0L);
        testCDCForSparkSQL.checkAnswer((Row[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Row[]) cdcDataFrame5.select(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("op"), functions$.MODULE$.from_json(functions$.MODULE$.col("before"), schema).as("before"), functions$.MODULE$.from_json(functions$.MODULE$.col("after"), schema).as("after")})).select(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("op"), functions$.MODULE$.col("after.id"), functions$.MODULE$.col("before.name"), functions$.MODULE$.col("before.price"), functions$.MODULE$.col("after.name"), functions$.MODULE$.col("after.price")})).collect())).sortBy(row -> {
            return BoxesRunTime.boxToInteger(row.getInt(1));
        }, Ordering$Int$.MODULE$), (Seq<Seq<Object>>) Predef$.MODULE$.wrapRefArray(new Seq[]{Predef$.MODULE$.genericWrapArray(Array$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"u", BoxesRunTime.boxToInteger(1), "a1_v2", BoxesRunTime.boxToInteger(11), "a1_v3", BoxesRunTime.boxToInteger(11)}), ClassTag$.MODULE$.Any())), Predef$.MODULE$.genericWrapArray(Array$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{"i", BoxesRunTime.boxToInteger(4), null, null, "a4", BoxesRunTime.boxToInteger(14)}), ClassTag$.MODULE$.Any()))}));
        testCDCForSparkSQL.assertCDCOpCnt(testCDCForSparkSQL.cdcDataFrame(sb, new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1, testCDCForSparkSQL.cdcDataFrame$default$3()), 4L, 3L, 1L);
    }

    public static final /* synthetic */ void $anonfun$new$3(TestCDCForSparkSQL testCDCForSparkSQL, String str, HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode) {
        testCDCForSparkSQL.withTempDir(file -> {
            $anonfun$new$4(testCDCForSparkSQL, hoodieCDCSupplementalLoggingMode, str, file);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$new$2(TestCDCForSparkSQL testCDCForSparkSQL, String str) {
        Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new HoodieCDCSupplementalLoggingMode[]{HoodieCDCSupplementalLoggingMode.op_key_only, HoodieCDCSupplementalLoggingMode.data_before})).foreach(hoodieCDCSupplementalLoggingMode -> {
            $anonfun$new$3(testCDCForSparkSQL, str, hoodieCDCSupplementalLoggingMode);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$new$9(TestCDCForSparkSQL testCDCForSparkSQL, HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode, String str, File file) {
        String generateTableName = testCDCForSparkSQL.generateTableName();
        String sb = new StringBuilder(1).append(file.getCanonicalPath()).append("/").append(generateTableName).toString();
        testCDCForSparkSQL.spark().sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(576).append("\n               | create table ").append(generateTableName).append(" (\n               |  id int,\n               |  name string,\n               |  price double,\n               |  ts long,\n               |  pt string\n               | ) using hudi\n               | partitioned by (pt)\n               | tblproperties (\n               |   'primaryKey' = 'id',\n               |   'preCombineField' = 'ts',\n               |   'hoodie.table.cdc.enabled' = 'true',\n               |   'hoodie.table.cdc.supplemental.logging.mode' = '").append(hoodieCDCSupplementalLoggingMode.name()).append("',\n               |   'type' = '").append(str).append("'\n               | )\n               | location '").append(sb).append("'\n        ").toString())).stripMargin());
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setBasePath(sb).setConf(testCDCForSparkSQL.spark().sessionState().newHadoopConf()).build();
        testCDCForSparkSQL.spark().sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(183).append("\n               | insert into ").append(generateTableName).append(" values\n               | (1, 'a1', 11, 1000, '2021'),\n               | (2, 'a2', 12, 1000, '2022'),\n               | (3, 'a3', 13, 1000, '2022')\n        ").toString())).stripMargin());
        String timestamp = ((HoodieInstant) build.reloadActiveTimeline().lastInstant().get()).getTimestamp();
        Dataset<Row> cdcDataFrame = testCDCForSparkSQL.cdcDataFrame(sb, new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1, testCDCForSparkSQL.cdcDataFrame$default$3());
        cdcDataFrame.show(false);
        testCDCForSparkSQL.assertCDCOpCnt(cdcDataFrame, 3L, 0L, 0L);
        testCDCForSparkSQL.spark().sql(new StringBuilder(77).append("insert overwrite table ").append(generateTableName).append(" partition (pt = '2021') values (1, 'a1_v2', 11, 1100)").toString());
        Dataset<Row> cdcDataFrame2 = testCDCForSparkSQL.cdcDataFrame(sb, new StringOps(Predef$.MODULE$.augmentString(((HoodieInstant) build.reloadActiveTimeline().lastInstant().get()).getTimestamp())).toLong() - 1, testCDCForSparkSQL.cdcDataFrame$default$3());
        cdcDataFrame2.show(false);
        testCDCForSparkSQL.assertCDCOpCnt(cdcDataFrame2, 1L, 0L, 1L);
        testCDCForSparkSQL.spark().sql(new StringBuilder(50).append("update ").append(generateTableName).append(" set name = 'a2_v2', ts = 1200 where id = 2").toString());
        Dataset<Row> cdcDataFrame3 = testCDCForSparkSQL.cdcDataFrame(sb, new StringOps(Predef$.MODULE$.augmentString(((HoodieInstant) build.reloadActiveTimeline().lastInstant().get()).getTimestamp())).toLong() - 1, testCDCForSparkSQL.cdcDataFrame$default$3());
        cdcDataFrame3.show(false);
        testCDCForSparkSQL.assertCDCOpCnt(cdcDataFrame3, 0L, 1L, 0L);
        testCDCForSparkSQL.spark().sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(597).append("\n               | merge into ").append(generateTableName).append("\n               | using (\n               |  select * from (\n               |  select 1 as id, 'a1_v3' as name, cast(11 as double) as price, cast(1300 as long) as ts, \"2021\" as pt\n               |  union all\n               |  select 4 as id, 'a4' as name, cast(14 as double) as price, cast(1300 as long) as ts, \"2022\" as pt\n               |  )\n               | ) s0\n               | on s0.id = ").append(generateTableName).append(".id\n               | when matched then update set id = s0.id, name = s0.name, price = s0.price, ts = s0.ts, pt = s0.pt\n               | when not matched then insert *\n        ").toString())).stripMargin());
        Dataset<Row> cdcDataFrame4 = testCDCForSparkSQL.cdcDataFrame(sb, new StringOps(Predef$.MODULE$.augmentString(((HoodieInstant) build.reloadActiveTimeline().lastInstant().get()).getTimestamp())).toLong() - 1, testCDCForSparkSQL.cdcDataFrame$default$3());
        cdcDataFrame4.show(false);
        testCDCForSparkSQL.assertCDCOpCnt(cdcDataFrame4, 1L, 1L, 0L);
        testCDCForSparkSQL.assertCDCOpCnt(testCDCForSparkSQL.cdcDataFrame(sb, new StringOps(Predef$.MODULE$.augmentString(timestamp)).toLong() - 1, testCDCForSparkSQL.cdcDataFrame$default$3()), 5L, 2L, 1L);
    }

    public static final /* synthetic */ void $anonfun$new$8(TestCDCForSparkSQL testCDCForSparkSQL, String str, HoodieCDCSupplementalLoggingMode hoodieCDCSupplementalLoggingMode) {
        testCDCForSparkSQL.withTempDir(file -> {
            $anonfun$new$9(testCDCForSparkSQL, hoodieCDCSupplementalLoggingMode, str, file);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$new$7(TestCDCForSparkSQL testCDCForSparkSQL, String str) {
        Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new HoodieCDCSupplementalLoggingMode[]{HoodieCDCSupplementalLoggingMode.op_key_only, HoodieCDCSupplementalLoggingMode.data_before})).foreach(hoodieCDCSupplementalLoggingMode -> {
            $anonfun$new$8(testCDCForSparkSQL, str, hoodieCDCSupplementalLoggingMode);
            return BoxedUnit.UNIT;
        });
    }

    public TestCDCForSparkSQL() {
        test("Test Non-Partitioned Hoodie Table", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.spark().sql(new StringBuilder(30).append("create database if not exists ").append("hudi_database").toString());
            this.spark().sql(new StringBuilder(4).append("use ").append("hudi_database").toString());
            Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"cow", "mor"})).foreach(str -> {
                $anonfun$new$2(this, str);
                return BoxedUnit.UNIT;
            });
        }, new Position("TestCDCForSparkSQL.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 50));
        test("Test Partitioned Hoodie Table", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            this.spark().sql(new StringBuilder(30).append("create database if not exists ").append("hudi_database").toString());
            this.spark().sql(new StringBuilder(4).append("use ").append("hudi_database").toString());
            Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"cow", "mor"})).foreach(str -> {
                $anonfun$new$7(this, str);
                return BoxedUnit.UNIT;
            });
        }, new Position("TestCDCForSparkSQL.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 170));
    }
}
