/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.license;

import io.confluent.command.record.Command;
import io.confluent.serializers.ProtoSerde;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LicenseStore {
    private static final Logger log = LoggerFactory.getLogger(LicenseStore.class);
    private static final String KEY_PREFIX = "CONFLUENT_LICENSE";
    private static final Command.CommandKey KEY = Command.CommandKey.newBuilder().setConfigType(Command.CommandConfigType.LICENSE_INFO).setGuid("CONFLUENT_LICENSE").build();
    private final String topic;
    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
    public static final long READ_TO_END_TIMEOUT_MS = 120000L;
    private final KafkaBasedLog<Command.CommandKey, Command.CommandMessage> licenseLog;
    private final AtomicBoolean running = new AtomicBoolean();
    private final AtomicReference<String> latestLicense;
    private final Time time;

    public LicenseStore(String topic, Map<String, Object> producerConfig, Map<String, Object> consumerConfig, Map<String, Object> topicConfig) {
        this(topic, producerConfig, consumerConfig, topicConfig, Time.SYSTEM);
    }

    protected LicenseStore(String topic, Map<String, Object> producerConfig, Map<String, Object> consumerConfig, Map<String, Object> topicConfig, Time time) {
        this.topic = topic;
        this.latestLicense = new AtomicReference();
        this.time = time;
        this.licenseLog = this.setupAndCreateKafkaBasedLog(this.topic, producerConfig, consumerConfig, topicConfig, this.latestLicense, this.time);
    }

    public LicenseStore(String topic, AtomicReference<String> latestLicense, KafkaBasedLog<Command.CommandKey, Command.CommandMessage> licenseLog, Time time) {
        this.topic = topic;
        this.latestLicense = latestLicense;
        this.licenseLog = licenseLog;
        this.time = time;
    }

    KafkaBasedLog<Command.CommandKey, Command.CommandMessage> setupAndCreateKafkaBasedLog(String topic, Map<String, Object> producerConfig, Map<String, Object> consumerConfig, Map<String, Object> topicConfig, AtomicReference<String> latestLicense, Time time) {
        HashMap<String, Object> producerProps = new HashMap<String, Object>();
        producerProps.putAll(producerConfig);
        producerProps.put("key.serializer", LicenseKeySerde.class.getName());
        producerProps.put("value.serializer", LicenseMessageSerde.class.getName());
        producerProps.put("retries", Integer.MAX_VALUE);
        HashMap<String, Object> consumerProps = new HashMap<String, Object>();
        consumerProps.putAll(consumerConfig);
        consumerProps.put("key.deserializer", LicenseKeySerde.class.getName());
        consumerProps.put("value.deserializer", LicenseMessageSerde.class.getName());
        String replicationFactorString = (String)topicConfig.get(REPLICATION_FACTOR_CONFIG);
        short replicationFactor = replicationFactorString == null ? (short)3 : (short)Short.valueOf(replicationFactorString);
        NewTopic topicDescription = TopicAdmin.defineTopic((String)topic).compacted().partitions(1).replicationFactor(replicationFactor).build();
        return this.createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(latestLicense), topicDescription, topicConfig, time);
    }

    private KafkaBasedLog<Command.CommandKey, Command.CommandMessage> createKafkaBasedLog(String topic, Map<String, Object> producerProps, Map<String, Object> consumerProps, Callback<ConsumerRecord<Command.CommandKey, Command.CommandMessage>> consumedCallback, final NewTopic topicDescription, final Map<String, Object> topicConfig, Time time) {
        Runnable createTopics = new Runnable(){

            @Override
            public void run() {
                try (TopicAdmin admin = new TopicAdmin(topicConfig);){
                    admin.createTopics(new NewTopic[]{topicDescription});
                }
            }
        };
        return new KafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, time, createTopics);
    }

    public void start() {
        if (this.running.compareAndSet(false, true)) {
            log.info("Starting License Store");
            this.licenseLog.start();
            log.info("Started License Store");
        }
    }

    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            log.info("Closing License Store");
            this.licenseLog.stop();
            log.info("Closed License Store");
        }
    }

    public String licenseScan() {
        try {
            this.licenseLog.readToEnd().get(120000L, TimeUnit.MILLISECONDS);
            return this.latestLicense.get();
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Failed to read license from Kafka: ", (Throwable)e);
            throw new IllegalStateException(e);
        }
    }

    public void registerLicense(String license) {
        this.registerLicense(license, null);
    }

    public void registerLicense(String license, org.apache.kafka.clients.producer.Callback callback) {
        Command.CommandMessage licenseMsg = Command.CommandMessage.newBuilder().setLicenseInfo(Command.LicenseInfo.newBuilder().setJwt(license).build()).build();
        this.licenseLog.send((Object)KEY, (Object)licenseMsg, callback);
    }

    public static class ConsumeCallback
    implements Callback<ConsumerRecord<Command.CommandKey, Command.CommandMessage>> {
        private final AtomicReference<String> latestLicenseRef;

        ConsumeCallback(AtomicReference<String> latestLicenseRef) {
            this.latestLicenseRef = latestLicenseRef;
        }

        public void onCompletion(Throwable error, ConsumerRecord<Command.CommandKey, Command.CommandMessage> record) {
            if (error != null) {
                log.error("Unexpected error in consumer callback for LicenseStore: ", error);
                return;
            }
            if (((Command.CommandKey)record.key()).getConfigType() == Command.CommandConfigType.LICENSE_INFO) {
                this.latestLicenseRef.set(((Command.CommandMessage)record.value()).getLicenseInfo().getJwt());
            }
        }
    }

    public static class LicenseMessageSerde
    extends ProtoSerde<Command.CommandMessage> {
        public LicenseMessageSerde() {
            super(Command.CommandMessage.getDefaultInstance());
        }
    }

    public static class LicenseKeySerde
    extends ProtoSerde<Command.CommandKey> {
        public LicenseKeySerde() {
            super(Command.CommandKey.getDefaultInstance());
        }
    }
}

