/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.DateTimeUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.TestJsonKafkaSource;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
import org.apache.hudi.utilities.sources.processor.maxwell.MaxwellJsonKafkaSourcePostProcessor;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class TestJsonKafkaSourcePostProcessor
extends SparkClientFunctionalTestHarness {
    private static KafkaTestUtils testUtils;
    private final HoodieDeltaStreamerMetrics metrics = (HoodieDeltaStreamerMetrics)Mockito.mock(HoodieDeltaStreamerMetrics.class);
    private SchemaProvider schemaProvider;

    @BeforeAll
    public static void initClass() {
        testUtils = new KafkaTestUtils();
        testUtils.setup();
    }

    @AfterAll
    public static void cleanupClass() {
        testUtils.teardown();
    }

    @BeforeEach
    public void init() throws Exception {
        String schemaFilePath = Objects.requireNonNull(TestJsonKafkaSource.SCHEMA_FILE_URL).toURI().getPath();
        TypedProperties props = new TypedProperties();
        props.put((Object)"hoodie.deltastreamer.schemaprovider.source.schema.file", (Object)schemaFilePath);
        this.schemaProvider = new FilebasedSchemaProvider(props, this.jsc());
    }

    @Test
    public void testNoPostProcessor() {
        String topic = "hoodie_test_testNoPostProcessor";
        testUtils.createTopic("hoodie_test_testNoPostProcessor", 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        TypedProperties props = TestJsonKafkaSource.createPropsForJsonKafkaSource(testUtils.brokerAddress(), "hoodie_test_testNoPostProcessor", null, "earliest");
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        testUtils.sendMessages("hoodie_test_testNoPostProcessor", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals((long)900L, (long)((JavaRDD)fetch1.getBatch().get()).count());
    }

    @Test
    public void testSampleJsonKafkaSourcePostProcessor() {
        String topic = "hoodie_test_testSampleJsonKafkaSourcePostProcessor";
        testUtils.createTopic("hoodie_test_testSampleJsonKafkaSourcePostProcessor", 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        TypedProperties props = TestJsonKafkaSource.createPropsForJsonKafkaSource(testUtils.brokerAddress(), "hoodie_test_testSampleJsonKafkaSourcePostProcessor", null, "earliest");
        props.setProperty(KafkaOffsetGen.Config.JSON_KAFKA_PROCESSOR_CLASS_OPT.key(), SampleJsonKafkaSourcePostProcessor.class.getName());
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        testUtils.sendMessages("hoodie_test_testSampleJsonKafkaSourcePostProcessor", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertNotEquals((long)900L, (long)((JavaRDD)fetch1.getBatch().get()).count());
    }

    @Test
    public void testInvalidJsonKafkaSourcePostProcessor() {
        String topic = "hoodie_test_testInvalidJsonKafkaSourcePostProcessor";
        testUtils.createTopic("hoodie_test_testInvalidJsonKafkaSourcePostProcessor", 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        TypedProperties props = TestJsonKafkaSource.createPropsForJsonKafkaSource(testUtils.brokerAddress(), "hoodie_test_testInvalidJsonKafkaSourcePostProcessor", null, "earliest");
        props.setProperty(KafkaOffsetGen.Config.JSON_KAFKA_PROCESSOR_CLASS_OPT.key(), "InvalidJsonKafkaSourcePostProcessor");
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        testUtils.sendMessages("hoodie_test_testInvalidJsonKafkaSourcePostProcessor", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        Assertions.assertThrows(HoodieSourcePostProcessException.class, () -> kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900L));
    }

    @Test
    public void testChainedJsonKafkaSourcePostProcessor() {
        String topic = "hoodie_test_testChainedJsonKafkaSourcePostProcessor";
        testUtils.createTopic("hoodie_test_testChainedJsonKafkaSourcePostProcessor", 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        TypedProperties props = TestJsonKafkaSource.createPropsForJsonKafkaSource(testUtils.brokerAddress(), "hoodie_test_testChainedJsonKafkaSourcePostProcessor", null, "earliest");
        props.setProperty(KafkaOffsetGen.Config.JSON_KAFKA_PROCESSOR_CLASS_OPT.key(), SampleJsonKafkaSourcePostProcessor.class.getName() + "," + DummyJsonKafkaSourcePostProcessor.class.getName());
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        testUtils.sendMessages("hoodie_test_testChainedJsonKafkaSourcePostProcessor", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals((long)0L, (long)((JavaRDD)fetch1.getBatch().get()).count());
    }

    @Test
    public void testMaxwellJsonKafkaSourcePostProcessor() throws IOException {
        String hudiMaxwell01Insert = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"insert\",\"ts\":1647074402,\"xid\":6233,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\",\"name\":\"mathieu\",\"age\":18,\"insert_time\":\"2022-03-12 08:40:02\",\"update_time\":\"2022-03-12 08:40:02\"}}";
        String hudiMaxwell01Update = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"update\",\"ts\":1647074482,\"xid\":6440,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\",\"name\":\"mathieu\",\"age\":20,\"insert_time\":\"2022-03-12 04:40:02\",\"update_time\":\"2022-03-12 04:42:25\"},\"old\":{\"age\":18,\"insert_time\":\"2022-03-12 08:40:02\",\"update_time\":\"2022-03-12 08:40:02\"}}";
        String hudiMaxwell01Delete = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"delete\",\"ts\":1647074555,\"xid\":6631,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\",\"name\":\"mathieu\",\"age\":20,\"insert_time\":\"2022-03-12 04:40:02\",\"update_time\":\"2022-03-12 04:42:25\"}}";
        String hudiMaxwell01Ddl = "{\"type\":\"table-alter\",\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"old\":{\"database\":\"hudi\",\"charset\":\"utf8\",\"table\":\"hudi_maxwell_01\",\"primary-key\":[\"id\"],\"columns\":[{\"type\":\"varchar\",\"name\":\"id\",\"charset\":\"utf8\"},{\"type\":\"varchar\",\"name\":\"name\",\"charset\":\"utf8\"},{\"type\":\"int\",\"name\":\"age\",\"signed\":true},{\"type\":\"timestamp\",\"name\":\"insert_time\",\"column-length\":0},{\"type\":\"timestamp\",\"name\":\"update_time\",\"column-length\":0}]},\"def\":{\"database\":\"hudi\",\"charset\":\"utf8\",\"table\":\"hudi_maxwell_01\",\"primary-key\":[\"id\"],\"columns\":[{\"type\":\"varchar\",\"name\":\"id\",\"charset\":\"utf8\"},{\"type\":\"varchar\",\"name\":\"name\",\"charset\":\"utf8\"},{\"type\":\"int\",\"name\":\"age\",\"signed\":true},{\"type\":\"timestamp\",\"name\":\"insert_time\",\"column-length\":0},{\"type\":\"timestamp\",\"name\":\"update_time\",\"column-length\":0}]},\"ts\":1647072305000,\"sql\":\"/* ApplicationName=DBeaver 21.0.4 - Main */ ALTER TABLE hudi.hudi_maxwell_01 MODIFY COLUMN age int(3) NULL\"}";
        String hudiMaxwell010Insert = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_010\",\"type\":\"insert\",\"ts\":1647073982,\"xid\":5164,\"commit\":true,\"data\":{\"id\":\"f3eaf4cdf7534e47a88cdf93d19b2ee6\",\"name\":\"wangxianghu\",\"age\":18,\"insert_time\":\"2022-03-12 08:33:02\",\"update_time\":\"2022-03-12 08:33:02\"}}";
        String hudi02Maxwell02Insert = "{\"database\":\"hudi_02\",\"table\":\"hudi_maxwell_02\",\"type\":\"insert\",\"ts\":1647073916,\"xid\":4990,\"commit\":true,\"data\":{\"id\":\"9bb17f316ee8488cb107621ddf0f3cb0\",\"name\":\"andy\",\"age\":17,\"insert_time\":\"2022-03-12 08:31:56\",\"update_time\":\"2022-03-12 08:31:56\"}}";
        String hudi02Maxwell01Insert = "{\"database\":\"hudi_02\",\"table\":\"hudi_maxwell_01\",\"type\":\"insert\",\"ts\":1647073916,\"xid\":4990,\"commit\":true,\"data\":{\"id\":\"9bb17f316ee8488cb107621ddf0f3cb0\",\"name\":\"andy\",\"age\":17,\"insert_time\":\"2022-03-12 08:31:56\",\"update_time\":\"2022-03-12 08:31:56\"}}";
        ObjectMapper mapper = new ObjectMapper();
        TypedProperties props = new TypedProperties();
        props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.DATABASE_NAME_REGEX_PROP.key(), "hudi(_)?[0-9]{0,2}");
        props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.TABLE_NAME_REGEX_PROP.key(), "hudi_maxwell(_)?[0-9]{0,2}");
        JavaRDD inputInsertAndUpdate = this.jsc().parallelize(Arrays.asList(hudiMaxwell01Insert, hudiMaxwell01Update));
        MaxwellJsonKafkaSourcePostProcessor processor = new MaxwellJsonKafkaSourcePostProcessor(props);
        processor.process(inputInsertAndUpdate).map(arg_0 -> ((ObjectMapper)mapper).readTree(arg_0)).foreach((VoidFunction & Serializable)record -> {
            JsonNode database = record.get("database");
            boolean isDelete = record.get("_hoodie_is_deleted").booleanValue();
            Assertions.assertFalse((boolean)isDelete);
            Assertions.assertNull((Object)database);
        });
        props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.PRECOMBINE_FIELD_TYPE_PROP.key(), "DATE_STRING");
        props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.PRECOMBINE_FIELD_FORMAT_PROP.key(), "yyyy-MM-dd HH:mm:ss");
        props.setProperty(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "update_time");
        JavaRDD inputDelete = this.jsc().parallelize(Collections.singletonList(hudiMaxwell01Delete));
        long ts = mapper.readTree(hudiMaxwell01Delete).get("ts").longValue();
        String formatTs = DateTimeUtils.formatUnixTimestamp((long)ts, (String)"yyyy-MM-dd HH:mm:ss");
        new MaxwellJsonKafkaSourcePostProcessor(props).process(inputDelete).map(arg_0 -> ((ObjectMapper)mapper).readTree(arg_0)).foreach((VoidFunction & Serializable)record -> {
            boolean isDelete = record.get("_hoodie_is_deleted").booleanValue();
            String updateTime = record.get("update_time").textValue();
            Assertions.assertEquals((Object)formatTs, (Object)updateTime);
            Assertions.assertTrue((boolean)isDelete);
        });
        props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.PRECOMBINE_FIELD_TYPE_PROP.key(), "NON_TIMESTAMP");
        props.setProperty(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "id");
        JavaRDD inputDelete2 = this.jsc().parallelize(Collections.singletonList(hudiMaxwell01Delete));
        String updateTimeInUpdate = mapper.readTree(hudiMaxwell01Update).get("data").get("update_time").textValue();
        new MaxwellJsonKafkaSourcePostProcessor(props).process(inputDelete2).map(arg_0 -> ((ObjectMapper)mapper).readTree(arg_0)).foreach((VoidFunction & Serializable)record -> {
            String updateTimeInDelete = record.get("update_time").textValue();
            Assertions.assertEquals((Object)updateTimeInUpdate, (Object)updateTimeInDelete);
        });
        JavaRDD dirtyData = this.jsc().parallelize(Arrays.asList(hudiMaxwell01Insert, hudiMaxwell010Insert, hudi02Maxwell02Insert));
        long validDataNum = processor.process(dirtyData).count();
        Assertions.assertEquals((long)2L, (long)validDataNum);
        JavaRDD ddlData = this.jsc().parallelize(Collections.singletonList(hudiMaxwell01Ddl));
        long ddlDataNum = processor.process(ddlData).count();
        Assertions.assertEquals((long)0L, (long)ddlDataNum);
        props.remove((Object)MaxwellJsonKafkaSourcePostProcessor.Config.DATABASE_NAME_REGEX_PROP.key());
        props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.TABLE_NAME_REGEX_PROP.key(), "hudi_maxwell(_)?[0-9]{0,2}");
        JavaRDD dataWithoutDatabaseRegex = this.jsc().parallelize(Arrays.asList(hudiMaxwell01Insert, hudi02Maxwell01Insert));
        long countWithoutDatabaseRegex = processor.process(dataWithoutDatabaseRegex).count();
        Assertions.assertEquals((long)2L, (long)countWithoutDatabaseRegex);
    }

    public static class DummyJsonKafkaSourcePostProcessor
    extends JsonKafkaSourcePostProcessor {
        public DummyJsonKafkaSourcePostProcessor(TypedProperties props) {
            super(props);
        }

        public JavaRDD<String> process(JavaRDD<String> inputJsonRecords) {
            return inputJsonRecords.map((Function & Serializable)x -> "").filter((Function & Serializable)x -> !Objects.equals(x, ""));
        }
    }

    public static class SampleJsonKafkaSourcePostProcessor
    extends JsonKafkaSourcePostProcessor {
        public SampleJsonKafkaSourcePostProcessor(TypedProperties props) {
            super(props);
        }

        public JavaRDD<String> process(JavaRDD<String> inputJsonRecords) {
            return inputJsonRecords.sample(false, 0.5);
        }
    }
}

