/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.kafka;

import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClient;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.NewTopic;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TopicExistsException;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.kafka.KafkaLogSerializationSchema;
import org.apache.paimon.flink.kafka.KafkaSinkFunction;
import org.apache.paimon.flink.log.LogSinkProvider;
import org.apache.paimon.flink.sink.LogSinkFunction;

public class KafkaLogSinkProvider
implements LogSinkProvider {
    private static final long serialVersionUID = 1L;
    private final String topic;
    private final Properties properties;
    @Nullable
    private final SerializationSchema<RowData> primaryKeySerializer;
    private final SerializationSchema<RowData> valueSerializer;
    private final CoreOptions.LogConsistency consistency;
    private final CoreOptions.LogChangelogMode changelogMode;
    private final Integer numBuckets;
    public static final int DEFAULT_REPLICATION_FACTOR = 2;

    public KafkaLogSinkProvider(String topic, Properties properties, @Nullable SerializationSchema<RowData> primaryKeySerializer, SerializationSchema<RowData> valueSerializer, CoreOptions.LogConsistency consistency, CoreOptions.LogChangelogMode changelogMode, Integer numBuckets) {
        this.topic = topic;
        this.properties = properties;
        this.primaryKeySerializer = primaryKeySerializer;
        this.valueSerializer = valueSerializer;
        this.consistency = consistency;
        this.changelogMode = changelogMode;
        this.numBuckets = numBuckets;
    }

    @Override
    public LogSinkFunction createSink() {
        FlinkKafkaProducer.Semantic semantic;
        switch (this.consistency) {
            case TRANSACTIONAL: {
                semantic = FlinkKafkaProducer.Semantic.EXACTLY_ONCE;
                break;
            }
            case EVENTUAL: {
                semantic = FlinkKafkaProducer.Semantic.AT_LEAST_ONCE;
                break;
            }
            default: {
                throw new IllegalArgumentException("Unsupported: " + this.consistency);
            }
        }
        this.createTopicIfNotExists();
        return new KafkaSinkFunction(this.topic, this.createSerializationSchema(), this.properties, semantic);
    }

    @VisibleForTesting
    KafkaLogSerializationSchema createSerializationSchema() {
        return new KafkaLogSerializationSchema(this.topic, this.primaryKeySerializer, this.valueSerializer, this.changelogMode);
    }

    private void createTopicIfNotExists() {
        try (AdminClient adminClient = AdminClient.create((Properties)this.properties);){
            if (!((Set)adminClient.listTopics().names().get()).contains(this.topic)) {
                int numBrokers = ((Collection)adminClient.describeCluster().nodes().get()).size();
                int replicationFactor = 2 > numBrokers ? numBrokers : 2;
                NewTopic newTopic = new NewTopic(this.topic, this.numBuckets.intValue(), (short)replicationFactor);
                adminClient.createTopics(Collections.singleton(newTopic)).all().get();
            }
        }
        catch (Exception e) {
            if (e.getCause() instanceof TopicExistsException) {
                throw new TableException(String.format("Failed to create kafka topic. Reason: topic %s exists. ", this.topic));
            }
            throw new TableException("Error in createTopicIfNotExists", (Throwable)e);
        }
    }
}

