/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.replicator.offsets;

import io.confluent.connect.replicator.offsets.ConsumerTimestampsWriterConfig;
import io.confluent.connect.replicator.offsets.ConsumerTimestampsWriterDefaults;
import io.confluent.connect.replicator.offsets.GroupTopicPartition;
import io.confluent.connect.replicator.offsets.TimestampAndDelta;
import io.confluent.connect.replicator.util.NewReplicatorAdminClient;
import io.confluent.connect.replicator.util.ReplicatorAdminClient;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerTimestampsWriter
implements Configurable,
AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(ConsumerTimestampsWriter.class);
    private Producer<GroupTopicPartition, TimestampAndDelta> producer;
    private ReplicatorAdminClient adminClient;
    private boolean closeAdminClient = false;
    private Set<String> whitelistTopics;
    private Pattern topicPattern;
    private Set<String> blacklistTopics;

    public ConsumerTimestampsWriter() {
    }

    public ConsumerTimestampsWriter(ReplicatorAdminClient adminClient) {
        this.adminClient = adminClient;
    }

    public void configure(Map<String, ?> configs) {
        if (this.adminClient == null) {
            this.adminClient = new NewReplicatorAdminClient(configs, Time.SYSTEM, 0L, null);
            this.closeAdminClient = true;
        }
        this.adminClient.setInterestedTopics(Collections.singleton("__consumer_timestamps"), null);
        ConsumerTimestampsWriterConfig writerConfig = ConsumerTimestampsWriterConfig.getConfig(configs);
        Integer partitions = writerConfig.getInt("timestamps.topic.num.partitions");
        Short replicationFactor = writerConfig.getShort("timestamps.topic.replication.factor");
        this.whitelistTopics = writerConfig.getTopics();
        this.topicPattern = writerConfig.getTopicPattern();
        this.blacklistTopics = writerConfig.getBlacklistTopics();
        if (!this.topicExists()) {
            try {
                this.createTopic(Optional.of(partitions), Optional.of(replicationFactor));
            }
            catch (Exception e) {
                Throwable t = e;
                while (t.getCause() != null) {
                    t = t.getCause();
                }
                if (t instanceof InvalidReplicationFactorException) {
                    log.warn("Failed to create topic __consumer_timestamps with replication factor {}. Attempting again with the cluster's default replication factor ", (Object)replicationFactor);
                    try {
                        this.createTopic(Optional.of(partitions), Optional.empty());
                    }
                    catch (Exception ex) {
                        log.error("Failed to create topic __consumer_timestamps in the source clusterusing default replication factor", (Throwable)ex);
                        throw ex;
                    }
                }
                log.error("Failed to create topic __consumer_timestamps in the source cluster ", (Throwable)e);
                throw e;
            }
        }
        log.debug("Creating timestamps producer...");
        this.producer = this.createProducer(configs);
    }

    protected boolean topicExists() {
        return this.adminClient.topicExists("__consumer_timestamps");
    }

    protected boolean createTopic(Optional<Integer> partitions, Optional<Short> replicationFactor) {
        try {
            Properties properties = new Properties();
            properties.put("cleanup.policy", "compact");
            properties.put("segment.bytes", String.valueOf(0x6400000L));
            return this.adminClient.createTopic("__consumer_timestamps", partitions, replicationFactor, properties);
        }
        catch (Exception e) {
            throw this.toRuntimeException(e);
        }
    }

    protected Set<String> whitelistTopics() {
        return this.whitelistTopics;
    }

    protected Pattern topicPattern() {
        return this.topicPattern;
    }

    protected Set<String> blacklistTopics() {
        return this.blacklistTopics;
    }

    private RuntimeException toRuntimeException(Exception e) {
        return e instanceof RuntimeException ? (RuntimeException)e : new RuntimeException(e);
    }

    private Producer<GroupTopicPartition, TimestampAndDelta> createProducer(Map<String, ?> configs) {
        HashMap producerProps = new HashMap(configs);
        producerProps.putAll(ConsumerTimestampsWriterDefaults.PRODUCER_CONFIG_DEFAULTS);
        producerProps.putAll(ConsumerTimestampsWriter.withPrefix(configs, "timestamps.producer.", Collections.emptySet()));
        try {
            return new KafkaProducer(producerProps);
        }
        catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }

    private static Map<String, Object> withPrefix(Map<String, ?> input, String prefix, Set<String> exclude) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        for (Map.Entry<String, ?> entry : input.entrySet()) {
            String substring;
            if (!entry.getKey().startsWith(prefix) || exclude.contains(substring = entry.getKey().substring(prefix.length()))) continue;
            props.put(substring, entry.getValue());
        }
        return props;
    }

    public Future<RecordMetadata> send(GroupTopicPartition key, TimestampAndDelta value) {
        return this.send(key, value, null);
    }

    public Future<RecordMetadata> send(GroupTopicPartition key, TimestampAndDelta value, Callback callback) {
        return this.producer.send(new ProducerRecord("__consumer_timestamps", (Object)key, (Object)value), callback);
    }

    @Override
    public void close() {
        try {
            this.producer.close();
        }
        catch (KafkaException e) {
            log.error("Failed to stop ConsumerTimestampsWriter producer", (Throwable)e);
        }
        try {
            if (this.closeAdminClient) {
                this.adminClient.close();
            }
        }
        catch (KafkaException e) {
            log.error("Failed to stop ConsumerTimestampsWriter admin client", (Throwable)e);
        }
    }
}

