/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.util;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicAdmin
implements AutoCloseable {
    private static final String CLEANUP_POLICY_CONFIG = "cleanup.policy";
    private static final String CLEANUP_POLICY_COMPACT = "compact";
    private static final String MIN_INSYNC_REPLICAS_CONFIG = "min.insync.replicas";
    private static final String UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG = "unclean.leader.election.enable";
    private static final Logger log = LoggerFactory.getLogger(TopicAdmin.class);
    private final Map<String, Object> adminConfig;
    private final Admin admin;

    public static NewTopicBuilder defineTopic(String topicName) {
        return new NewTopicBuilder(topicName);
    }

    public TopicAdmin(Map<String, Object> adminConfig) {
        this(adminConfig, Admin.create(adminConfig));
    }

    TopicAdmin(Map<String, Object> adminConfig, Admin adminClient) {
        this.admin = adminClient;
        this.adminConfig = adminConfig != null ? adminConfig : Collections.emptyMap();
    }

    public Admin admin() {
        return this.admin;
    }

    public boolean createTopic(NewTopic topic) {
        if (topic == null) {
            return false;
        }
        Set<String> newTopicNames = this.createTopics(topic);
        return newTopicNames.contains(topic.name());
    }

    public Set<String> createTopics(NewTopic ... topics) {
        HashMap<String, NewTopic> topicsByName = new HashMap<String, NewTopic>();
        if (topics != null) {
            for (NewTopic topic : topics) {
                if (topic == null) continue;
                topicsByName.put(topic.name(), topic);
            }
        }
        if (topicsByName.isEmpty()) {
            return Collections.emptySet();
        }
        String bootstrapServers = this.bootstrapServers();
        String topicNameList = Utils.join(topicsByName.keySet(), (String)"', '");
        CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false);
        Map newResults = this.admin.createTopics(topicsByName.values(), args).values();
        HashSet<String> newlyCreatedTopicNames = new HashSet<String>();
        for (Map.Entry entry : newResults.entrySet()) {
            String topic = (String)entry.getKey();
            try {
                ((KafkaFuture)entry.getValue()).get();
                log.info("Created topic {} on brokers at {}", topicsByName.get(topic), (Object)bootstrapServers);
                newlyCreatedTopicNames.add(topic);
            }
            catch (ExecutionException e) {
                Throwable cause = e.getCause();
                if (cause instanceof TopicExistsException) {
                    log.debug("Found existing topic '{}' on the brokers at {}", (Object)topic, (Object)bootstrapServers);
                    continue;
                }
                if (cause instanceof UnsupportedVersionException) {
                    log.debug("Unable to create topic(s) '{}' since the brokers at {} do not support the CreateTopics API. Falling back to assume topic(s) exist or will be auto-created by the broker.", (Object)topicNameList, (Object)bootstrapServers);
                    return Collections.emptySet();
                }
                if (cause instanceof ClusterAuthorizationException) {
                    log.debug("Not authorized to create topic(s) '{}' upon the brokers {}. Falling back to assume topic(s) exist or will be auto-created by the broker.", (Object)topicNameList, (Object)bootstrapServers);
                    return Collections.emptySet();
                }
                if (cause instanceof TopicAuthorizationException) {
                    log.debug("Not authorized to create topic(s) '{}' upon the brokers {}. Falling back to assume topic(s) exist or will be auto-created by the broker.", (Object)topicNameList, (Object)bootstrapServers);
                    return Collections.emptySet();
                }
                if (cause instanceof TimeoutException) {
                    throw new ConnectException("Timed out while checking for or creating topic(s) '" + topicNameList + "'. This could indicate a connectivity issue, unavailable topic partitions, or if this is your first use of the topic it may have taken too long to create.", cause);
                }
                throw new ConnectException("Error while attempting to create/find topic(s) '" + topicNameList + "'", (Throwable)e);
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                throw new ConnectException("Interrupted while attempting to create/find topic(s) '" + topicNameList + "'", (Throwable)e);
            }
        }
        return newlyCreatedTopicNames;
    }

    public Map<TopicPartition, Long> endOffsets(Set<TopicPartition> partitions) {
        if (partitions == null || partitions.isEmpty()) {
            return Collections.emptyMap();
        }
        Map offsetSpecMap = partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()));
        ListOffsetsResult resultFuture = this.admin.listOffsets(offsetSpecMap);
        HashMap<TopicPartition, Long> result = new HashMap<TopicPartition, Long>();
        for (TopicPartition partition : partitions) {
            try {
                ListOffsetsResult.ListOffsetsResultInfo info = (ListOffsetsResult.ListOffsetsResultInfo)resultFuture.partitionResult(partition).get();
                result.put(partition, info.offset());
            }
            catch (ExecutionException e) {
                Throwable cause = e.getCause();
                String topic = partition.topic();
                if (cause instanceof AuthorizationException) {
                    String msg = String.format("Not authorized to get the end offsets for topic '%s' on brokers at %s", topic, this.bootstrapServers());
                    throw new ConnectException(msg, (Throwable)e);
                }
                if (cause instanceof UnsupportedVersionException) {
                    String msg = String.format("API to get the get the end offsets for topic '%s' is unsupported on brokers at %s", topic, this.bootstrapServers());
                    throw new UnsupportedVersionException(msg, (Throwable)e);
                }
                if (cause instanceof TimeoutException) {
                    String msg = String.format("Timed out while waiting to get end offsets for topic '%s' on brokers at %s", topic, this.bootstrapServers());
                    throw new TimeoutException(msg, (Throwable)e);
                }
                if (cause instanceof LeaderNotAvailableException) {
                    String msg = String.format("Unable to get end offsets during leader election for topic '%s' on brokers at %s", topic, this.bootstrapServers());
                    throw new LeaderNotAvailableException(msg, (Throwable)e);
                }
                if (cause instanceof org.apache.kafka.common.errors.RetriableException) {
                    throw (org.apache.kafka.common.errors.RetriableException)cause;
                }
                String msg = String.format("Error while getting end offsets for topic '%s' on brokers at %s", topic, this.bootstrapServers());
                throw new ConnectException(msg, (Throwable)e);
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                String msg = String.format("Interrupted while attempting to read end offsets for topic '%s' on brokers at %s", partition.topic(), this.bootstrapServers());
                throw new RetriableException(msg, (Throwable)e);
            }
        }
        return result;
    }

    @Override
    public void close() {
        this.admin.close();
    }

    private String bootstrapServers() {
        Object servers = this.adminConfig.get("bootstrap.servers");
        return servers != null ? servers.toString() : "<unknown>";
    }

    public static class NewTopicBuilder {
        private String name;
        private int numPartitions;
        private short replicationFactor;
        private Map<String, String> configs = new HashMap<String, String>();

        NewTopicBuilder(String name) {
            this.name = name;
        }

        public NewTopicBuilder partitions(int numPartitions) {
            this.numPartitions = numPartitions;
            return this;
        }

        public NewTopicBuilder replicationFactor(short replicationFactor) {
            this.replicationFactor = replicationFactor;
            return this;
        }

        public NewTopicBuilder compacted() {
            this.configs.put(TopicAdmin.CLEANUP_POLICY_CONFIG, TopicAdmin.CLEANUP_POLICY_COMPACT);
            return this;
        }

        public NewTopicBuilder minInSyncReplicas(short minInSyncReplicas) {
            this.configs.put(TopicAdmin.MIN_INSYNC_REPLICAS_CONFIG, Short.toString(minInSyncReplicas));
            return this;
        }

        public NewTopicBuilder uncleanLeaderElection(boolean allow) {
            this.configs.put(TopicAdmin.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, Boolean.toString(allow));
            return this;
        }

        public NewTopicBuilder config(Map<String, Object> configs) {
            if (configs != null) {
                for (Map.Entry<String, Object> entry : configs.entrySet()) {
                    Object value = entry.getValue();
                    this.configs.put(entry.getKey(), value != null ? value.toString() : null);
                }
            } else {
                this.configs.clear();
            }
            return this;
        }

        public NewTopic build() {
            return new NewTopic(this.name, this.numPartitions, this.replicationFactor).configs(this.configs);
        }
    }
}

