/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.tools;

import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import kafka.log.Log;
import kafka.server.LogDirFailureChannel;
import kafka.tier.TopicIdPartition;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.serdes.TierKafkaKey;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.state.TierPartitionState;
import kafka.tier.tools.DumpTierPartitionState;
import kafka.tier.tools.TierTopicMaterializationToolConfig;
import kafka.tier.topic.TierMessageFormatter;
import kafka.tier.topic.TierTopicManager;
import kafka.utils.Scheduler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;

public class TierTopicMaterializationUtils {
    private final String topic = "_confluent-tier-state";
    public KafkaConsumer<byte[], byte[]> consumer;
    final TierTopicMaterializationToolConfig config;
    final Properties consumerProps;
    private final Map<TopicIdPartition, Long> offsetMap;
    final Map<TopicIdPartition, FileTierPartitionState> stateMap = new HashMap<TopicIdPartition, FileTierPartitionState>();
    private UserTierPartition targetTierPartition;
    private Scheduler scheduler;
    private static final int SAMPLING_INTERVAL = 1000;

    public TierTopicMaterializationUtils(TierTopicMaterializationToolConfig config, Properties additionalConsumerProps, Map<TopicIdPartition, Long> offsetMap, Scheduler scheduler) {
        this.offsetMap = offsetMap;
        this.config = config;
        this.consumerProps = TierTopicMaterializationUtils.getConsumerProps(config, additionalConsumerProps);
        this.targetTierPartition = new UserTierPartition(null, config.userTopicId, config.userPartition);
        this.consumer = new KafkaConsumer(this.consumerProps, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
        this.scheduler = scheduler;
    }

    public TierTopicMaterializationUtils(TierTopicMaterializationToolConfig config, Properties additionalConsumerProps, Scheduler scheduler) {
        this(config, additionalConsumerProps, new HashMap<TopicIdPartition, Long>(), scheduler);
    }

    public void setupConsumer(TierTopicMaterializationToolConfig config) {
        if (config.partition != -1) {
            this.seek(this.topic, config.partition, config.startOffset);
        } else {
            List partitions = this.consumer.partitionsFor(this.topic);
            Set<TopicPartition> tierTopicPartitions = TierTopicManager.partitions(this.topic, partitions.size());
            System.out.println("Listening on all " + partitions.size() + " partitions of tier topic");
            this.consumer.assign(tierTopicPartitions);
            this.consumer.seekToBeginning(tierTopicPartitions);
        }
    }

    private ConsumerRecords<byte[], byte[]> fetchRecords() {
        ConsumerRecords records = this.consumer.poll(Duration.ofSeconds(30L));
        if (records.isEmpty()) {
            throw new TimeoutException();
        }
        return records;
    }

    private File getStateFolder(TopicIdPartition id) {
        return new File(this.config.materializationPath + "/" + id.topic() + "-" + id.partition());
    }

    public void run() throws IOException {
        boolean materialize = this.config.getBoolean("materialize");
        int recordCount = 0;
        int currentOffset = this.config.startOffset;
        TierMessageFormatter writer = new TierMessageFormatter();
        this.setupConsumer(this.config);
        System.out.println("Event processing from " + this.config.startOffset + " until " + this.config.endOffset);
        try {
            while (this.config.endOffset == -1 || currentOffset <= this.config.endOffset) {
                ConsumerRecords<byte[], byte[]> records = this.fetchRecords();
                for (ConsumerRecord record : records) {
                    currentOffset = (int)record.offset();
                    TopicIdPartition id = AbstractTierMetadata.deserializeKey((byte[])record.key());
                    if (!this.doMaterialize(id, record.offset())) continue;
                    if (this.config.dumpEvents.booleanValue() || ++recordCount % 1000 == 0) {
                        writer.writeTo((ConsumerRecord<byte[], byte[]>)record, System.out);
                    }
                    if (!materialize) continue;
                    this.materializeRecord((ConsumerRecord<byte[], byte[]>)record, id);
                }
                if (!this.exitLoop()) continue;
                System.out.println("Done reading events for all configured source topic partitions.");
                break;
            }
        }
        catch (TimeoutException | WakeupException we) {
            System.out.println("Timeout after processing " + recordCount + " messages.");
        }
        this.saveMaterializedStates();
        this.dumpMaterializedState();
        System.out.println("Done event processing.");
    }

    private static Properties getConsumerProps(TierTopicMaterializationToolConfig config, Properties consumerProps) {
        Properties props = new Properties();
        props.putAll((Map<?, ?>)consumerProps);
        props.put("bootstrap.servers", config.server);
        props.put("session.timeout.ms", "30000");
        props.put("group.id", "tier-topic-materialization-tool");
        return props;
    }

    private boolean exitLoop() {
        return this.offsetMap != null && this.offsetMap.isEmpty();
    }

    private void saveMaterializedStates() throws IOException {
        System.out.println("Saving Materialized states for " + this.stateMap.keySet());
        for (TopicIdPartition id : this.stateMap.keySet()) {
            System.out.println("Closing state file " + id);
            this.stateMap.get(id).close();
        }
        System.out.println("Done saving states.");
    }

    private boolean doMaterialize(TopicIdPartition tpid, Long offset) {
        if (this.offsetMap != null) {
            if (!this.offsetMap.containsKey(tpid)) {
                return false;
            }
            if (this.offsetMap.get(tpid) >= offset) {
                return true;
            }
            this.offsetMap.remove(tpid);
            return false;
        }
        return this.targetTierPartition.filter(tpid.topic(), tpid.partition(), tpid.topicId());
    }

    private void materializeRecord(ConsumerRecord<byte[], byte[]> record, TopicIdPartition id) throws IOException {
        FileTierPartitionState state = this.getStateIfRequired(record, id);
        Optional<AbstractTierMetadata> entryOpt = AbstractTierMetadata.deserialize((byte[])record.key(), (byte[])record.value());
        if (entryOpt.isPresent()) {
            OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(record.offset(), record.leaderEpoch());
            AbstractTierMetadata entry = entryOpt.get();
            TierPartitionState.AppendResult result = state.append(entry, offsetAndEpoch);
            if (result != TierPartitionState.AppendResult.ACCEPTED) {
                System.out.println((Object)((Object)result) + " offset " + record.offset());
            }
        }
    }

    private FileTierPartitionState getStateIfRequired(ConsumerRecord<byte[], byte[]> record, TopicIdPartition id) throws IOException {
        if (!this.stateMap.containsKey(id)) {
            TierKafkaKey tierKey = TierKafkaKey.getRootAsTierKafkaKey(ByteBuffer.wrap((byte[])record.key()));
            File path = this.getStateFolder(id);
            if (!path.exists()) {
                path.mkdirs();
            }
            FileTierPartitionState state = new FileTierPartitionState(path, new LogDirFailureChannel(1), new TopicPartition(tierKey.topicName(), tierKey.partition()), true, this.scheduler);
            state.setTopicId(new UUID(tierKey.topicId().mostSignificantBits(), tierKey.topicId().leastSignificantBits()));
            this.stateMap.put(id, state);
            this.stateMap.get(id).onCatchUpComplete();
        }
        return this.stateMap.get(id);
    }

    private void dumpMaterializedState() {
        if (!this.config.dumpRecords.booleanValue() && !this.config.dumpHeader.booleanValue()) {
            return;
        }
        System.out.println("Dumping materialized records");
        for (TopicIdPartition id : this.stateMap.keySet()) {
            File basePath = this.getStateFolder(id);
            for (File file : basePath.listFiles()) {
                if (!file.isFile() || !Log.isTierStateFile(file)) continue;
                System.out.println("Dumping for " + basePath);
                if (this.config.dumpHeader.booleanValue()) {
                    DumpTierPartitionState.dumpTierState(id.topicPartition(), file, true);
                    continue;
                }
                DumpTierPartitionState.dumpTierState(id.topicPartition(), file, false);
            }
        }
    }

    public Path getTierStateFile(TopicIdPartition id) {
        File basePath = this.getStateFolder(id);
        for (File file : basePath.listFiles()) {
            if (!file.isFile() || !Log.isTierStateFile(file)) continue;
            return file.toPath();
        }
        return null;
    }

    public long getStartOffset(TopicPartition tp) {
        try {
            Map ret = this.consumer.beginningOffsets((Collection)ImmutableList.of((Object)tp));
            return (Long)ret.get(tp);
        }
        catch (Exception ex) {
            System.out.println("Not able to fetch startOffset for " + tp + " due to : " + ex);
            return -2L;
        }
    }

    private void seek(String topic, Integer partition, Integer offset) {
        TopicPartition topicPartition = new TopicPartition(topic, partition.intValue());
        this.consumer.assign(Collections.singletonList(topicPartition));
        this.consumer.seek(topicPartition, (long)offset.intValue());
    }

    private class UserTierPartition {
        final String name;
        final UUID id;
        final Integer partitionId;

        UserTierPartition(String name, UUID id, Integer partitionId) {
            this.name = name;
            this.id = id;
            this.partitionId = partitionId;
        }

        public boolean filter(String name, Integer partitonId, UUID id) {
            return !(this.name != null && !this.name.equals(name) || !this.id.equals(TierTopicMaterializationToolConfig.EMPTY_UUID) && !this.id.equals(id) || !this.partitionId.equals(-1) && !this.partitionId.equals(partitonId));
        }
    }
}

