/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import java.net.URL;
import java.util.Objects;
import java.util.UUID;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.BaseTestKafkaSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestJsonKafkaSource
extends BaseTestKafkaSource {
    static final URL SCHEMA_FILE_URL = TestJsonKafkaSource.class.getClassLoader().getResource("delta-streamer-config/source.avsc");

    @BeforeEach
    public void init() throws Exception {
        String schemaFilePath = Objects.requireNonNull(SCHEMA_FILE_URL).toURI().getPath();
        TypedProperties props = new TypedProperties();
        props.put((Object)"hoodie.deltastreamer.schemaprovider.source.schema.file", (Object)schemaFilePath);
        this.schemaProvider = new FilebasedSchemaProvider(props, this.jsc());
    }

    @Override
    TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) {
        return TestJsonKafkaSource.createPropsForJsonKafkaSource(testUtils.brokerAddress(), topic, maxEventsToReadFromKafkaSource, resetStrategy);
    }

    static TypedProperties createPropsForJsonKafkaSource(String brokerAddress, String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) {
        TypedProperties props = new TypedProperties();
        props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic);
        props.setProperty("bootstrap.servers", brokerAddress);
        props.setProperty("auto.offset.reset", resetStrategy);
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(KafkaOffsetGen.Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue()));
        props.setProperty("group.id", UUID.randomUUID().toString());
        return props;
    }

    @Override
    SourceFormatAdapter createSource(TypedProperties props) {
        return new SourceFormatAdapter((Source)new JsonKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics));
    }

    @Test
    public void testJsonKafkaSourceFilterNullMsg() {
        String topic = "hoodie_test_testJsonKafkaSourceFilterNullMsg";
        testUtils.createTopic("hoodie_test_testJsonKafkaSourceFilterNullMsg", 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        TypedProperties props = this.createPropsForKafkaSource("hoodie_test_testJsonKafkaSourceFilterNullMsg", null, "earliest");
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        Assertions.assertEquals((Object)Option.empty(), (Object)kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceFilterNullMsg", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceFilterNullMsg", new String[100]);
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((long)1000L, (long)((JavaRDD)fetch1.getBatch().get()).count());
    }

    @Test
    public void testJsonKafkaSourceWithDefaultUpperCap() {
        String topic = "hoodie_test_testJsonKafkaSourceWithDefaultUpperCap";
        testUtils.createTopic("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        TypedProperties props = this.createPropsForKafkaSource("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", Long.MAX_VALUE, "earliest");
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
        Assertions.assertEquals((long)1000L, (long)((JavaRDD)fetch1.getBatch().get()).count());
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithDefaultUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("001", Integer.valueOf(1000))));
        InputBatch fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), 1500L);
        Assertions.assertEquals((long)1000L, (long)((Dataset)fetch2.getBatch().get()).count());
    }

    @Test
    public void testJsonKafkaSourceWithConfigurableUpperCap() {
        String topic = "hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap";
        testUtils.createTopic("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", 2);
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        TypedProperties props = this.createPropsForKafkaSource("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", 500L, "earliest");
        JsonKafkaSource jsonSource = new JsonKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)jsonSource);
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(1000))));
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals((long)900L, (long)((JavaRDD)fetch1.getBatch().get()).count());
        testUtils.sendMessages("hoodie_test_testJsonKafkaSourceWithConfigurableUpperCap", UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("001", Integer.valueOf(1000))));
        InputBatch fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)500L, (long)((Dataset)fetch2.getBatch().get()).count());
        InputBatch fetch3 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), 400L);
        Assertions.assertEquals((long)400L, (long)((JavaRDD)fetch3.getBatch().get()).count());
        InputBatch fetch4 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch2.getCheckpointForNextBatch()), 600L);
        Assertions.assertEquals((long)600L, (long)((JavaRDD)fetch4.getBatch().get()).count());
        InputBatch fetch5 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)((Dataset)fetch2.getBatch().get()).count(), (long)((JavaRDD)fetch5.getBatch().get()).count());
        Assertions.assertEquals((Object)fetch2.getCheckpointForNextBatch(), (Object)fetch5.getCheckpointForNextBatch());
        InputBatch fetch6 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch4.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((Object)Option.empty(), (Object)fetch6.getBatch());
    }

    @Override
    void sendMessagesToKafka(String topic, int count, int numPartitions) {
        HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
        testUtils.sendMessages(topic, UtilitiesTestBase.Helpers.jsonifyRecords(dataGenerator.generateInserts("000", Integer.valueOf(count))));
    }
}

