/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.DoubleValue;
import com.google.protobuf.FloatValue;
import com.google.protobuf.Int32Value;
import com.google.protobuf.Int64Value;
import com.google.protobuf.StringValue;
import com.google.protobuf.UInt32Value;
import com.google.protobuf.UInt64Value;
import com.google.protobuf.util.Timestamps;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.BaseTestKafkaSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.ProtoKafkaSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.test.proto.Nested;
import org.apache.hudi.utilities.test.proto.Sample;
import org.apache.hudi.utilities.test.proto.SampleEnum;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestProtoKafkaSource
extends BaseTestKafkaSource {
    private static final Random RANDOM = new Random();

    @Override
    protected TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) {
        TypedProperties props = new TypedProperties();
        props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic);
        props.setProperty("bootstrap.servers", testUtils.brokerAddress());
        props.setProperty("auto.offset.reset", resetStrategy);
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(KafkaOffsetGen.Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue()));
        props.setProperty("group.id", UUID.randomUUID().toString());
        props.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName());
        return props;
    }

    @Override
    SourceFormatAdapter createSource(TypedProperties props) {
        this.schemaProvider = new ProtoClassBasedSchemaProvider(props, this.jsc());
        ProtoKafkaSource protoKafkaSource = new ProtoKafkaSource(props, this.jsc(), this.spark(), this.schemaProvider, this.metrics);
        return new SourceFormatAdapter((Source)protoKafkaSource);
    }

    @Test
    public void testProtoKafkaSourceWithFlattenWrappedPrimitives() {
        String topic = "hoodie_test_testProtoKafkaSourceFlatten";
        testUtils.createTopic("hoodie_test_testProtoKafkaSourceFlatten", 2);
        TypedProperties props = this.createPropsForKafkaSource("hoodie_test_testProtoKafkaSourceFlatten", null, "earliest");
        props.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(), "true");
        ProtoClassBasedSchemaProvider schemaProvider = new ProtoClassBasedSchemaProvider(props, this.jsc());
        ProtoKafkaSource protoKafkaSource = new ProtoKafkaSource(props, this.jsc(), this.spark(), (SchemaProvider)schemaProvider, this.metrics);
        SourceFormatAdapter kafkaSource = new SourceFormatAdapter((Source)protoKafkaSource);
        Assertions.assertEquals((Object)Option.empty(), (Object)kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
        this.sendMessagesToKafka("hoodie_test_testProtoKafkaSourceFlatten", 1000, 2);
        InputBatch fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900L);
        Assertions.assertEquals((long)900L, (long)((JavaRDD)fetch1.getBatch().get()).count());
        Dataset fetch1AsRows = AvroConversionUtils.createDataFrame((RDD)JavaRDD.toRDD((JavaRDD)((JavaRDD)fetch1.getBatch().get())), (String)schemaProvider.getSourceSchema().toString(), (SparkSession)protoKafkaSource.getSparkSession());
        Assertions.assertEquals((long)900L, (long)fetch1AsRows.count());
        this.sendMessagesToKafka("hoodie_test_testProtoKafkaSourceFlatten", 1000, 2);
        InputBatch fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)1100L, (long)((Dataset)fetch2.getBatch().get()).count());
        InputBatch fetch3 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)((Dataset)fetch2.getBatch().get()).count(), (long)((JavaRDD)fetch3.getBatch().get()).count());
        Assertions.assertEquals((Object)fetch2.getCheckpointForNextBatch(), (Object)fetch3.getCheckpointForNextBatch());
        InputBatch fetch3AsRows = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((long)((Dataset)fetch2.getBatch().get()).count(), (long)((Dataset)fetch3AsRows.getBatch().get()).count());
        Assertions.assertEquals((Object)fetch2.getCheckpointForNextBatch(), (Object)fetch3AsRows.getCheckpointForNextBatch());
        InputBatch fetch4 = kafkaSource.fetchNewDataInAvroFormat(Option.of((Object)fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((Object)Option.empty(), (Object)fetch4.getBatch());
        InputBatch fetch4AsRows = kafkaSource.fetchNewDataInRowFormat(Option.of((Object)fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
        Assertions.assertEquals((Object)Option.empty(), (Object)fetch4AsRows.getBatch());
    }

    private static List<Sample> createSampleMessages(int count) {
        return IntStream.range(0, count).mapToObj(unused -> {
            Sample.Builder builder = Sample.newBuilder().setPrimitiveDouble(RANDOM.nextDouble()).setPrimitiveFloat(RANDOM.nextFloat()).setPrimitiveInt(RANDOM.nextInt()).setPrimitiveLong(RANDOM.nextLong()).setPrimitiveUnsignedInt(RANDOM.nextInt()).setPrimitiveUnsignedLong(RANDOM.nextLong()).setPrimitiveSignedInt(RANDOM.nextInt()).setPrimitiveSignedLong(RANDOM.nextLong()).setPrimitiveFixedInt(RANDOM.nextInt()).setPrimitiveFixedLong(RANDOM.nextLong()).setPrimitiveFixedSignedInt(RANDOM.nextInt()).setPrimitiveFixedSignedLong(RANDOM.nextLong()).setPrimitiveBoolean(RANDOM.nextBoolean()).setPrimitiveString(UUID.randomUUID().toString()).setPrimitiveBytes(ByteString.copyFrom((byte[])UUID.randomUUID().toString().getBytes()));
            if (RANDOM.nextBoolean()) {
                HashMap<String, Integer> primitiveMap = new HashMap<String, Integer>();
                primitiveMap.put(UUID.randomUUID().toString(), RANDOM.nextInt());
                HashMap<String, Nested> messageMap = new HashMap<String, Nested>();
                messageMap.put(UUID.randomUUID().toString(), TestProtoKafkaSource.generateRandomNestedMessage());
                builder.addAllRepeatedPrimitive(Arrays.asList(RANDOM.nextInt(), RANDOM.nextInt())).putAllMapPrimitive(primitiveMap).setNestedMessage(TestProtoKafkaSource.generateRandomNestedMessage()).addAllRepeatedMessage(Arrays.asList(TestProtoKafkaSource.generateRandomNestedMessage(), TestProtoKafkaSource.generateRandomNestedMessage())).putAllMapMessage(messageMap).setWrappedString(StringValue.of((String)UUID.randomUUID().toString())).setWrappedInt(Int32Value.of((int)RANDOM.nextInt())).setWrappedLong(Int64Value.of((long)RANDOM.nextLong())).setWrappedUnsignedInt(UInt32Value.of((int)RANDOM.nextInt())).setWrappedUnsignedLong(UInt64Value.of((long)RANDOM.nextLong())).setWrappedDouble(DoubleValue.of((double)RANDOM.nextDouble())).setWrappedFloat(FloatValue.of((float)RANDOM.nextFloat())).setWrappedBoolean(BoolValue.of((boolean)RANDOM.nextBoolean())).setWrappedBytes(BytesValue.of((ByteString)ByteString.copyFrom((byte[])UUID.randomUUID().toString().getBytes()))).setEnum(SampleEnum.SECOND).setTimestamp(Timestamps.fromMillis((long)System.currentTimeMillis()));
            }
            return builder.build();
        }).collect(Collectors.toList());
    }

    private static Nested generateRandomNestedMessage() {
        return Nested.newBuilder().setNestedInt(RANDOM.nextInt()).build();
    }

    @Override
    void sendMessagesToKafka(String topic, int count, int numPartitions) {
        List<Sample> messages = TestProtoKafkaSource.createSampleMessages(count);
        try (KafkaProducer producer = new KafkaProducer(this.getProducerProperties());){
            for (int i = 0; i < messages.size(); ++i) {
                producer.send(new ProducerRecord(topic, (Object)Integer.toString(i % numPartitions), (Object)messages.get(i).toByteArray()));
            }
        }
    }

    private Properties getProducerProperties() {
        Properties props = new Properties();
        props.put("bootstrap.servers", testUtils.brokerAddress());
        props.put("value.serializer", ByteArraySerializer.class.getName());
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("acks", "all");
        return props;
    }
}

