/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.PartitionInfo;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils;
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.types.DataType;

public class KafkaSchema {
    private static final int MAX_RETRY = 5;
    private static final int POLL_TIMEOUT_MILLIS = 1000;
    private final String databaseName;
    private final String tableName;
    private final Map<String, DataType> fields;
    private final List<String> primaryKeys;

    public KafkaSchema(String databaseName, String tableName, Map<String, DataType> fields, List<String> primaryKeys) {
        this.databaseName = databaseName;
        this.tableName = tableName;
        this.fields = fields;
        this.primaryKeys = primaryKeys;
    }

    public String tableName() {
        return this.tableName;
    }

    public String databaseName() {
        return this.databaseName;
    }

    public Map<String, DataType> fields() {
        return this.fields;
    }

    public List<String> primaryKeys() {
        return this.primaryKeys;
    }

    private static KafkaConsumer<String, String> getKafkaEarliestConsumer(Configuration kafkaConfig, String topic) {
        Properties props = new Properties();
        props.put("bootstrap.servers", kafkaConfig.get(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS));
        props.put("group.id", KafkaActionUtils.kafkaPropertiesGroupId(kafkaConfig));
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "false");
        KafkaConsumer consumer = new KafkaConsumer(props);
        List partitionInfos = consumer.partitionsFor(topic);
        if (partitionInfos.isEmpty()) {
            throw new IllegalArgumentException("Failed to find partition information for topic " + topic);
        }
        int firstPartition = partitionInfos.stream().map(PartitionInfo::partition).sorted().findFirst().get();
        List<TopicPartition> topicPartitions = Collections.singletonList(new TopicPartition(topic, firstPartition));
        consumer.assign(topicPartitions);
        consumer.seekToBeginning(topicPartitions);
        return consumer;
    }

    public static KafkaSchema getKafkaSchema(Configuration kafkaConfig, String topic) throws KafkaSchemaRetrievalException {
        KafkaConsumer<String, String> consumer = KafkaSchema.getKafkaEarliestConsumer(kafkaConfig, topic);
        int retry = 0;
        int retryInterval = 1000;
        DataFormat format = DataFormat.getDataFormat(kafkaConfig);
        RecordParser recordParser = format.createParser(true, new TableNameConverter(true), Collections.emptyList());
        ConsumerRecords consumerRecords;
        Iterable records;
        Stream<ConsumerRecord> recordStream;
        Optional<KafkaSchema> kafkaSchema;
        while (!(kafkaSchema = (recordStream = StreamSupport.stream((records = (consumerRecords = consumer.poll(Duration.ofMillis(1000L))).records(topic)).spliterator(), false)).map(record -> recordParser.getKafkaSchema((String)record.value())).filter(Objects::nonNull).findFirst()).isPresent()) {
            if (retry >= 5) {
                throw new KafkaSchemaRetrievalException(String.format("Could not get metadata from server, topic: %s", topic));
            }
            KafkaSchema.sleepSafely(retryInterval);
            retryInterval *= 2;
            ++retry;
        }
        return kafkaSchema.get();
    }

    private static void sleepSafely(int duration) {
        try {
            Thread.sleep(duration);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof KafkaSchema)) {
            return false;
        }
        KafkaSchema that = (KafkaSchema)o;
        return this.databaseName.equals(that.databaseName) && this.tableName.equals(that.tableName) && this.fields.equals(that.fields) && this.primaryKeys.equals(that.primaryKeys);
    }

    public int hashCode() {
        return Objects.hash(this.databaseName, this.tableName, this.fields, this.primaryKeys);
    }

    public static class KafkaSchemaRetrievalException
    extends Exception {
        public KafkaSchemaRetrievalException(String message) {
            super(message);
        }
    }
}

