/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.replicator.util;

import io.confluent.connect.replicator.util.ReplicatorAdminClient;
import io.confluent.connect.replicator.util.TopicMetadata;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NewReplicatorAdminClient
implements ReplicatorAdminClient {
    private static final Logger log = LoggerFactory.getLogger(ReplicatorAdminClient.class);
    private static final long RETRIABLE_FUTURE_TIMEOUT = 30000L;
    private final Time time;
    private final long maxAgeMs;
    private final AdminClient admin;
    private final ScheduledExecutorService metadataFetcher;
    private final AtomicReference<Map<String, TopicMetadata>> topicMetadata;
    private Collection<Node> brokers;
    private long nextBrokerMetadataRefresh;
    private volatile boolean forceTopicMetadataRefresh;
    private Set<String> interestedTopics;
    private ReplicatorAdminClient.TopicMetadataListener metadataListener;
    private ScheduledFuture<?> metadataFuture;

    public NewReplicatorAdminClient(Map<String, Object> adminConfig, Time time, long maxAgeMs, String taskId) {
        this(AdminClient.create(adminConfig), time, maxAgeMs, taskId);
    }

    public NewReplicatorAdminClient(AdminClient adminClient, Time time, long maxAgeMs, final String taskId) {
        this.time = time;
        this.maxAgeMs = maxAgeMs > 0L ? maxAgeMs : Long.MAX_VALUE;
        this.admin = adminClient;
        ThreadFactory threadFactory = new ThreadFactory(){

            @Override
            public Thread newThread(Runnable metadataTask) {
                return new Thread(metadataTask, "MetadataFetcher-" + taskId);
            }
        };
        if (maxAgeMs > 0L) {
            this.metadataFetcher = Executors.newSingleThreadScheduledExecutor(threadFactory);
        } else {
            ScheduledThreadPoolExecutor noOpExecutor = new ScheduledThreadPoolExecutor(0, threadFactory);
            noOpExecutor.setKeepAliveTime(1L, TimeUnit.SECONDS);
            noOpExecutor.allowCoreThreadTimeOut(true);
            this.metadataFetcher = Executors.unconfigurableScheduledExecutorService(noOpExecutor);
        }
        this.nextBrokerMetadataRefresh = time.milliseconds();
        this.forceTopicMetadataRefresh = true;
        this.topicMetadata = new AtomicReference();
        this.topicMetadata.set(new HashMap());
        this.interestedTopics = Collections.emptySet();
    }

    protected ScheduledExecutorService metadataFetcher() {
        return this.metadataFetcher;
    }

    @Deprecated
    protected NewReplicatorAdminClient() {
        this.time = null;
        this.maxAgeMs = 0L;
        this.admin = null;
        this.metadataFetcher = null;
        this.topicMetadata = null;
    }

    @Override
    public String clusterId() throws InterruptedException, ExecutionException {
        long timeout = this.time.milliseconds() + 30000L;
        Callable<String> clusterIdCallable = () -> (String)this.admin.describeCluster().clusterId().get();
        Future<String> result = this.metadataFetcher.submit(clusterIdCallable);
        while (this.time.milliseconds() < timeout) {
            try {
                String clusterId = result.get();
                return clusterId;
            }
            catch (Exception e) {
                if (e.getCause() instanceof ExecutionException && e.getCause().getCause() instanceof RetriableException) {
                    log.warn("Could not fetch Cluster Id, will retry", (Throwable)e);
                    result = this.metadataFetcher.schedule(clusterIdCallable, 2L, TimeUnit.SECONDS);
                    continue;
                }
                throw e;
            }
        }
        throw new TimeoutException("Failed to retrieve cluster id within timeout of 30000ms.");
    }

    @Override
    public Properties topicConfig(String topic) throws InterruptedException, ExecutionException {
        ConfigResource topicConfigResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        Config config = (Config)((Map)this.admin.describeConfigs(Collections.singleton(topicConfigResource)).all().get()).get(topicConfigResource);
        Properties topicConfig = new Properties();
        for (ConfigEntry configEntry : config.entries()) {
            if (configEntry.isDefault()) continue;
            topicConfig.put(configEntry.name(), configEntry.value());
        }
        return topicConfig;
    }

    @Override
    public void changeTopicConfig(String topic, Properties configProps) throws InterruptedException, ExecutionException {
        ConfigResource topicConfigResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        Config config = (Config)((Map)this.admin.describeConfigs(Collections.singleton(topicConfigResource)).all().get()).get(topicConfigResource);
        HashMap<String, String> configCopy = new HashMap<String, String>();
        for (String name : configProps.stringPropertyNames()) {
            configCopy.put(name, configProps.getProperty(name));
        }
        ArrayList<AlterConfigOp> alterConfigOps = new ArrayList<AlterConfigOp>();
        for (ConfigEntry configEntry : config.entries()) {
            String entryToUpdate = (String)configCopy.get(configEntry.name());
            if (entryToUpdate == null) continue;
            alterConfigOps.add(new AlterConfigOp(new ConfigEntry(configEntry.name(), entryToUpdate, configEntry.source(), configEntry.isSensitive(), configEntry.isReadOnly(), configEntry.synonyms(), configEntry.type(), configEntry.documentation()), AlterConfigOp.OpType.SET));
            configCopy.remove(configEntry.name());
        }
        for (Map.Entry entry : configCopy.entrySet()) {
            ConfigEntry configEntry = new ConfigEntry((String)entry.getKey(), (String)entry.getValue());
            alterConfigOps.add(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET));
        }
        this.admin.incrementalAlterConfigs(Collections.singletonMap(topicConfigResource, alterConfigOps)).all().get();
    }

    @Override
    public synchronized int aliveBrokers() throws InterruptedException, ExecutionException {
        long now = this.time.milliseconds();
        if (this.brokers == null || now >= this.nextBrokerMetadataRefresh) {
            this.brokers = (Collection)this.admin.describeCluster().nodes().get();
            this.nextBrokerMetadataRefresh = this.time.milliseconds() + this.maxAgeMs;
        }
        return this.brokers.size();
    }

    @Override
    public synchronized void setInterestedTopics(Set<String> topics, ReplicatorAdminClient.TopicMetadataListener topicMetadataListener) {
        this.interestedTopics = topics;
        this.metadataListener = topicMetadataListener;
        Map<String, TopicMetadata> currentTopicMetadata = this.topicMetadata.get();
        currentTopicMetadata.keySet().retainAll(topics);
        int missingTopicsNum = topics.size() - currentTopicMetadata.size();
        if (missingTopicsNum > 0) {
            log.info("Requesting metadata refresh after {} new topics were added", (Object)missingTopicsNum);
            this.forceTopicMetadataRefresh = true;
        }
    }

    private Future<?> scheduleSingleRefreshMetadata() {
        return this.metadataFetcher.submit(new TopicMetadataTask(this.interestedTopics, this.metadataListener));
    }

    private ScheduledFuture<?> schedulePeriodicRefreshMetadata() {
        this.metadataFuture = this.metadataFetcher.scheduleWithFixedDelay(new TopicMetadataTask(this.interestedTopics, this.metadataListener), 0L, this.maxAgeMs, TimeUnit.MILLISECONDS);
        return this.metadataFuture;
    }

    private synchronized Map<String, TopicMetadata> topicMetadata() {
        while (this.forceTopicMetadataRefresh) {
            if (this.metadataFuture != null) {
                this.metadataFuture.cancel(true);
            }
            try {
                this.forceTopicMetadataRefresh = false;
                this.scheduleSingleRefreshMetadata().get();
                this.schedulePeriodicRefreshMetadata();
            }
            catch (InterruptedException | RuntimeException | ExecutionException e) {
                log.warn("Topic metadata update did not complete.", (Throwable)e);
                break;
            }
        }
        return this.topicMetadata.get();
    }

    @Override
    public synchronized TopicMetadata topicMetadata(String topic) {
        return this.topicMetadata().get(topic);
    }

    @Override
    public synchronized boolean partitionExists(TopicPartition partition) {
        TopicMetadata metadata = this.topicMetadata().get(partition.topic());
        if (metadata == null) {
            return false;
        }
        return partition.partition() < metadata.numPartitions();
    }

    @Override
    public synchronized boolean topicExists(String topic) {
        return this.topicMetadata().get(topic) != null;
    }

    @Override
    public boolean createTopic(String topic, int numPartitions, short replicationFactor, Properties topicConfig) throws InterruptedException, ExecutionException {
        return this.createTopic(topic, Optional.of(numPartitions), Optional.of(replicationFactor), topicConfig);
    }

    @Override
    public boolean createTopic(String topic, Optional<Integer> numPartitions, Optional<Short> replicationFactor, Properties topicConfig) throws InterruptedException, ExecutionException {
        return this.createTopic(topic, numPartitions, replicationFactor, topicConfig, true);
    }

    private boolean createTopic(String topic, Optional<Integer> numPartitions, Optional<Short> replicationFactor, Properties topicConfig, boolean withRetry) throws InterruptedException, ExecutionException {
        log.info("Creating topic {} with {} partitions, replication factor {}, and config {}", new Object[]{topic, numPartitions, replicationFactor, topicConfig});
        HashMap<String, String> configCopy = new HashMap<String, String>();
        for (String name : topicConfig.stringPropertyNames()) {
            configCopy.put(name, topicConfig.getProperty(name));
        }
        NewTopic topicRequest = new NewTopic(topic, numPartitions, replicationFactor);
        topicRequest.configs(Collections.unmodifiableMap(configCopy));
        try {
            this.admin.createTopics(Collections.singleton(topicRequest)).all().get();
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            String unsupportedConfig = "message.downconversion.enable";
            if (cause instanceof TopicExistsException) {
                log.debug("Failed to create topic {} because it already exists. Ignoring exception", (Object)topic, (Object)e);
                return false;
            }
            if (cause instanceof InvalidConfigurationException && cause.getMessage().contains(unsupportedConfig) && withRetry) {
                log.debug("Target Kafka cluster doesn't recognize topic config '{}'. Removing this config and retrying to create the topic one more time", (Object)unsupportedConfig, (Object)e);
                topicConfig.remove(unsupportedConfig);
                return this.createTopic(topic, numPartitions, replicationFactor, topicConfig, false);
            }
            throw e;
        }
        this.forceTopicMetadataRefresh = true;
        return true;
    }

    @Override
    public void addPartitions(String topic, int numPartitions) throws InterruptedException, ExecutionException {
        log.info("Increasing number of partitions to {} for topic {} and requesting metadata refresh", (Object)numPartitions, (Object)topic);
        this.admin.createPartitions(Collections.singletonMap(topic, NewPartitions.increaseTo((int)numPartitions))).all().get();
        this.forceTopicMetadataRefresh = true;
    }

    @Override
    public void close() {
        try {
            this.admin.close();
        }
        finally {
            this.metadataFetcher.shutdownNow();
        }
    }

    protected class TopicMetadataTask
    implements Runnable {
        private final Set<String> taskInterestedTopics;
        private final ReplicatorAdminClient.TopicMetadataListener taskMetadataListener;

        public TopicMetadataTask(Set<String> interestedTopics, ReplicatorAdminClient.TopicMetadataListener metadataListener) {
            this.taskInterestedTopics = interestedTopics;
            this.taskMetadataListener = metadataListener;
        }

        @Override
        public void run() {
            try {
                this.refreshTopicMetadata();
                log.debug("Refreshed topic metadata for interested topics: {}", NewReplicatorAdminClient.this.interestedTopics);
            }
            catch (InterruptedException | RuntimeException | ExecutionException e) {
                log.warn(String.format("Failed to refresh topic metadata. Will try again in %dms.", NewReplicatorAdminClient.this.maxAgeMs), (Throwable)e);
            }
        }

        private void refreshTopicMetadata() throws InterruptedException, ExecutionException {
            HashMap<String, TopicMetadata> updatedTopicMetadata = new HashMap<String, TopicMetadata>();
            for (Map.Entry topicDescriptionEntry : NewReplicatorAdminClient.this.admin.describeTopics(this.taskInterestedTopics).topicNameValues().entrySet()) {
                try {
                    TopicDescription desc = (TopicDescription)((KafkaFuture)topicDescriptionEntry.getValue()).get();
                    int numPartitions = desc.partitions().size();
                    if (numPartitions <= 0) continue;
                    updatedTopicMetadata.put(desc.name(), new TopicMetadata(desc.name(), numPartitions));
                    log.trace("Updating metadata for topic '{}' with {} partitions", (Object)desc.name(), (Object)numPartitions);
                }
                catch (ExecutionException e) {
                    if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
                        log.warn("Unable to describe topic {}, this topic will be ignored until the next refresh. This is regular behavior upon start, but could signify an unreachable topic otherwise", topicDescriptionEntry.getKey());
                        throw e;
                    }
                    log.trace("Received UnknownTopicOrPartitionException when attempting to describe topic {}. This is expected to be transient and will be resolved in subsequent metadata updates.", topicDescriptionEntry.getKey());
                }
            }
            NewReplicatorAdminClient.this.topicMetadata.set(updatedTopicMetadata);
            if (this.taskMetadataListener != null) {
                this.taskMetadataListener.onTopicMetadataRefresh();
            }
        }
    }
}

