/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.replicator.offsets;

import io.confluent.connect.replicator.Translator;
import io.confluent.connect.replicator.offsets.ConsumerOffsetsTranslatorConfig;
import io.confluent.connect.replicator.offsets.ConsumerTimestampsWriterDefaults;
import io.confluent.connect.replicator.offsets.GroupTopicPartition;
import io.confluent.connect.replicator.offsets.GroupTopicPartitionDeserializer;
import io.confluent.connect.replicator.offsets.TimestampAndDelta;
import io.confluent.connect.replicator.offsets.TimestampAndDeltaDeserializer;
import io.confluent.connect.replicator.util.Utils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerOffsetsTranslator
implements Translator {
    private static final Logger log = LoggerFactory.getLogger(ConsumerOffsetsTranslator.class);
    public static final String TOPIC_NAME = "__consumer_timestamps";
    private static final GroupTopicPartitionDeserializer GROUP_TOPIC_PARTITION_DESERIALIZER = new GroupTopicPartitionDeserializer();
    private static final TimestampAndDeltaDeserializer TIMESTAMP_DESERIALIZER = new TimestampAndDeltaDeserializer();
    private final Time time;
    private final String taskId;
    private final ConsumerOffsetsTranslatorConfig config;
    private final Set<String> whitelistTopics;
    private final Pattern topicPattern;
    private final Set<String> blacklistTopics;
    private Map<String, Map<TopicPartition, TimestampForRetry>> timestampsForRetry = new HashMap<String, Map<TopicPartition, TimestampForRetry>>();
    private Map<String, Map<TopicPartition, TimestampForRetry>> timestampsForNextRetry;
    private Map<TopicPartition, Map<TimestampAndDelta, OffsetAndMetadata>> cachedOffsets;
    private final List<ConsumerRecord<byte[], byte[]>> collectedRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>();
    private final long batchPeriodMs;
    private final long batchSize;
    private long lastTranslation = 0L;
    private long nextDeadline = Long.MAX_VALUE;

    public ConsumerOffsetsTranslator(ConsumerOffsetsTranslatorConfig config, String taskId, Time time, int batchPeriodMs, int batchSize) {
        this.config = config;
        this.taskId = taskId;
        this.time = time;
        this.batchPeriodMs = batchPeriodMs;
        this.batchSize = batchSize;
        this.whitelistTopics = config != null ? config.getTopics() : Collections.emptySet();
        this.topicPattern = config != null ? config.getTopicPattern() : null;
        this.blacklistTopics = config != null ? config.getBlacklistTopics() : Collections.emptySet();
    }

    public ConsumerOffsetsTranslator(Map<String, String> config, String taskId, Time time, int batchPeriodMs, int batchSize) {
        this(config != null ? new ConsumerOffsetsTranslatorConfig(config) : null, taskId, time, batchPeriodMs, batchSize);
    }

    private boolean hasWork() {
        return this.collectedRecords.size() > 0 || this.timestampsForRetry.size() > 0;
    }

    private boolean checkBatchSize() {
        boolean doTranslate;
        int size = this.collectedRecords.size();
        boolean bl = doTranslate = (long)size >= this.batchSize;
        if (doTranslate) {
            log.debug("Translating after reaching batch size {}", (Object)size);
        }
        return doTranslate;
    }

    private boolean checkBatchPeriodMs(long now) {
        boolean doTranslate;
        long elapsedMs = now - this.lastTranslation;
        boolean bl = doTranslate = this.lastTranslation == 0L || elapsedMs >= this.batchPeriodMs;
        if (doTranslate) {
            if (this.lastTranslation == 0L) {
                log.debug("Translating after initial start");
            } else {
                log.debug("Translating after elapsed period {} ms", (Object)elapsedMs);
            }
        }
        return doTranslate;
    }

    @Override
    public String topic() {
        return TOPIC_NAME;
    }

    @Override
    public long nextDeadline() {
        return this.nextDeadline;
    }

    @Override
    public void collect(List<ConsumerRecord<byte[], byte[]>> record) {
        this.collectedRecords.addAll(record);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<ConsumerRecord<byte[], byte[]>> translateCollectedRecords() {
        log.debug("Translating consumer offsets...");
        long now = this.time.milliseconds();
        try {
            if (this.hasWork() && (this.checkBatchSize() || this.checkBatchPeriodMs(now))) {
                ArrayList<ConsumerRecord<byte[], byte[]>> translatedRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>(this.collectedRecords);
                this.translateOffsets(translatedRecords);
                this.collectedRecords.clear();
                this.lastTranslation = now;
                ArrayList<ConsumerRecord<byte[], byte[]>> arrayList = translatedRecords;
                return arrayList;
            }
            List<ConsumerRecord<byte[], byte[]>> list = Collections.emptyList();
            return list;
        }
        finally {
            this.nextDeadline = now + this.batchPeriodMs;
        }
    }

    protected Map<String, Map<TopicPartition, TimestampForRetry>> timestampsForRetry() {
        return this.timestampsForRetry;
    }

    public void translateOffsets(List<ConsumerRecord<byte[], byte[]>> records) {
        log.debug("Translating {} offset records", (Object)records.size());
        this.timestampsForNextRetry = new HashMap<String, Map<TopicPartition, TimestampForRetry>>();
        this.cachedOffsets = new HashMap<TopicPartition, Map<TimestampAndDelta, OffsetAndMetadata>>();
        Map<String, Map<TopicPartition, TimestampAndDelta>> timestamps = this.getTimestampsByGroup(records);
        HashSet<String> groupIds = new HashSet<String>(timestamps.keySet());
        groupIds.addAll(this.timestampsForRetry.keySet());
        log.debug("Translating {} groups", (Object)groupIds.size());
        int count = 0;
        long start = this.time.milliseconds();
        for (String groupId : groupIds) {
            Map<TopicPartition, TimestampAndDelta> timestampsPerGroup = this.getTimestampsPerGroup(groupId, timestamps, false);
            count += this.commitOffsets(groupId, timestampsPerGroup);
        }
        log.debug("Committed {} total offsets in {} ms", (Object)count, (Object)(this.time.milliseconds() - start));
        this.timestampsForRetry = this.timestampsForNextRetry;
    }

    private Map<TopicPartition, TimestampAndDelta> getTimestampsPerGroup(String groupId, Map<String, Map<TopicPartition, TimestampAndDelta>> timestamps, boolean createMutableMap) {
        HashMap timestampsPerGroup = timestamps.get(groupId);
        if (timestampsPerGroup == null) {
            timestampsPerGroup = createMutableMap ? new HashMap() : Collections.emptyMap();
            timestamps.put(groupId, timestampsPerGroup);
        }
        return timestampsPerGroup;
    }

    private int commitOffsets(String groupId, Map<TopicPartition, TimestampAndDelta> timestampsPerGroup) {
        int count = 0;
        try (Consumer<byte[], byte[]> destConsumer = this.buildDestConsumer(this.config, groupId);){
            long start = this.time.milliseconds();
            Map<TopicPartition, OffsetAndMetadata> offsetsForCommit = this.getOffsetsForCommit(groupId, timestampsPerGroup, destConsumer);
            log.debug("Fetched {} offsets for group {} in {} ms", new Object[]{offsetsForCommit.size(), groupId, this.time.milliseconds() - start});
            if (offsetsForCommit.size() > 0) {
                try {
                    start = this.time.milliseconds();
                    destConsumer.commitSync(offsetsForCommit);
                    count += offsetsForCommit.size();
                    log.debug("Committed {} offsets for group {} in {} ms", new Object[]{offsetsForCommit.size(), groupId, this.time.milliseconds() - start});
                }
                catch (CommitFailedException e) {
                    log.warn("Could not translate offsets for group ID {}.  There may be an active consumer group for this ID in the destination cluster. Stop the consumer group in order for offset translation to continue.", (Object)groupId);
                    this.timestampsForNextRetry.clear();
                }
                catch (KafkaException e) {
                    log.error("Could not translate offsets for group ID {}.", (Object)groupId, (Object)e);
                }
            }
        }
        return count;
    }

    protected Consumer<byte[], byte[]> buildDestConsumer(ConsumerOffsetsTranslatorConfig config, String groupId) {
        HashMap consumerConfig = new HashMap(config.getDestConsumerConfigs());
        consumerConfig.put("group.id", groupId);
        if (!consumerConfig.containsKey("client.id")) {
            consumerConfig.put("client.id", this.taskId);
        }
        consumerConfig.put("enable.auto.commit", false);
        consumerConfig.put("auto.offset.reset", "none");
        log.debug("Initializing Consumer Offsets Translator Consumer");
        return new KafkaConsumer(consumerConfig, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
    }

    private TopicPartition toDestPartition(TopicPartition sourcePartition) {
        return new TopicPartition(this.toDestTopic(sourcePartition.topic()), sourcePartition.partition());
    }

    private String toDestTopic(String sourceTopic) {
        return Utils.renameTopic((String)this.config.getTopicRenameFormat(), (String)sourceTopic);
    }

    private Map<String, Map<TopicPartition, TimestampAndDelta>> getTimestampsByGroup(List<ConsumerRecord<byte[], byte[]>> records) {
        int filterCount = 0;
        HashMap<String, Map<TopicPartition, TimestampAndDelta>> timestamps = new HashMap<String, Map<TopicPartition, TimestampAndDelta>>(records.size());
        for (ConsumerRecord<byte[], byte[]> record : records) {
            GroupTopicPartition groupTopicPartition = GROUP_TOPIC_PARTITION_DESERIALIZER.deserialize(record.topic(), (byte[])record.key());
            TimestampAndDelta timestampAndDelta = TIMESTAMP_DESERIALIZER.deserialize(record.topic(), (byte[])record.value());
            String groupId = groupTopicPartition.groupId();
            TopicPartition srcTopicPartition = groupTopicPartition.topicPartition();
            if (!this.isMatchingTopic(srcTopicPartition.topic())) {
                ++filterCount;
                continue;
            }
            TopicPartition destTopicPartition = this.toDestPartition(srcTopicPartition);
            Map<TopicPartition, TimestampAndDelta> timestampsPerGroup = this.getTimestampsPerGroup(groupId, timestamps, true);
            timestampsPerGroup.put(destTopicPartition, timestampAndDelta);
        }
        if (filterCount > 0) {
            log.debug("Filtered {} of {} timestamp records", (Object)filterCount, (Object)records.size());
        }
        return timestamps;
    }

    private boolean isMatchingTopic(String topic) {
        if (this.blacklistTopics.contains(topic)) {
            return false;
        }
        return this.whitelistTopics.contains(topic) || !ConsumerTimestampsWriterDefaults.isInternal((String)topic) && this.matchesTopicPattern(topic);
    }

    private boolean matchesTopicPattern(String topic) {
        return this.topicPattern != null && this.topicPattern.matcher(topic).matches();
    }

    private Map<TopicPartition, OffsetAndMetadata> getOffsetsForCommit(String groupId, Map<TopicPartition, TimestampAndDelta> timestampsPerGroup, Consumer<byte[], byte[]> destConsumer) {
        HashMap<TopicPartition, OffsetAndMetadata> offsetsForCommit = new HashMap<TopicPartition, OffsetAndMetadata>(timestampsPerGroup.size());
        HashMap<TopicPartition, TimestampAndDelta> timestampsToSearch = new HashMap<TopicPartition, TimestampAndDelta>();
        this.getNewOffsetsForCommit(groupId, timestampsPerGroup, timestampsToSearch, offsetsForCommit);
        this.getRetryOffsetsForCommit(groupId, timestampsToSearch, offsetsForCommit);
        long attemptedAt = this.time.milliseconds();
        this.removeMissingTopicPartitions(destConsumer, groupId, timestampsToSearch, attemptedAt);
        if (timestampsToSearch.size() > 0) {
            Map<TopicPartition, Long> timestamps = timestampsToSearch.entrySet().stream().collect(Collectors.toMap(e -> (TopicPartition)e.getKey(), e -> ((TimestampAndDelta)e.getValue()).timestamp()));
            Map newOffsetAndTimestamps = null;
            try {
                newOffsetAndTimestamps = destConsumer.offsetsForTimes(timestamps);
            }
            catch (KafkaException e2) {
                log.warn("Could not fetch offset times for group {}", (Object)groupId);
            }
            for (Map.Entry timestampsToSearchEntry : timestampsToSearch.entrySet()) {
                OffsetAndTimestamp offsetAndTimestamp;
                TopicPartition topicPartition = (TopicPartition)timestampsToSearchEntry.getKey();
                TimestampAndDelta timestampAndDelta = (TimestampAndDelta)timestampsToSearchEntry.getValue();
                OffsetAndTimestamp offsetAndTimestamp2 = offsetAndTimestamp = newOffsetAndTimestamps != null ? (OffsetAndTimestamp)newOffsetAndTimestamps.get(topicPartition) : null;
                if (offsetAndTimestamp == null) {
                    log.warn("Could not find offset for group {}, topic {}, partition {}, timestamp {}, waiting for replication to catch up.  Please check replication lag.", new Object[]{groupId, topicPartition.topic(), topicPartition.partition(), timestampAndDelta});
                    this.addTimestampForNextRetry(groupId, topicPartition, timestampAndDelta, attemptedAt);
                    continue;
                }
                OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offsetAndTimestamp.offset() + (long)timestampAndDelta.delta() + 1L);
                log.trace("Translating timestamp {} to offset {} for group {} (lookup)", new Object[]{timestampAndDelta, offsetAndMetadata.offset(), groupId});
                offsetsForCommit.put(topicPartition, offsetAndMetadata);
                this.addOffsetToCache(topicPartition, timestampAndDelta, offsetAndMetadata);
            }
        }
        return offsetsForCommit;
    }

    private void removeMissingTopicPartitions(Consumer<byte[], byte[]> destConsumer, String groupId, Map<TopicPartition, TimestampAndDelta> timestampsToSearch, long attemptedAt) {
        if (timestampsToSearch.isEmpty()) {
            return;
        }
        Map topics = destConsumer.listTopics();
        Iterator<Map.Entry<TopicPartition, TimestampAndDelta>> iter = timestampsToSearch.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<TopicPartition, TimestampAndDelta> entry = iter.next();
            TopicPartition tp = entry.getKey();
            if (this.topicPartitionExists(topics, tp)) continue;
            iter.remove();
            TimestampAndDelta timestampAndDelta = entry.getValue();
            this.addTimestampForNextRetry(groupId, tp, timestampAndDelta, attemptedAt);
        }
    }

    private boolean topicPartitionExists(Map<String, List<PartitionInfo>> topics, TopicPartition tp) {
        List<PartitionInfo> partitionInfos = topics.get(tp.topic());
        return partitionInfos != null && partitionInfos.stream().anyMatch(p -> p.partition() == tp.partition());
    }

    private void getNewOffsetsForCommit(String groupId, Map<TopicPartition, TimestampAndDelta> timestampsPerGroup, Map<TopicPartition, TimestampAndDelta> timestampsToSearch, Map<TopicPartition, OffsetAndMetadata> offsetsForCommit) {
        for (Map.Entry<TopicPartition, TimestampAndDelta> timestampEntry : timestampsPerGroup.entrySet()) {
            TimestampAndDelta timestampAndDelta;
            TopicPartition topicPartition = timestampEntry.getKey();
            if (this.getOffsetFromCache(groupId, topicPartition, timestampAndDelta = timestampEntry.getValue(), offsetsForCommit)) continue;
            timestampsToSearch.put(topicPartition, timestampAndDelta);
        }
        if (timestampsToSearch.size() > 0) {
            log.debug("Found {} new timestamps to search for group {}", (Object)timestampsToSearch.size(), (Object)groupId);
        }
    }

    private void getRetryOffsetsForCommit(String groupId, Map<TopicPartition, TimestampAndDelta> timestampsToSearch, Map<TopicPartition, OffsetAndMetadata> offsetsForCommit) {
        int oldSize = timestampsToSearch.size();
        Map<Object, TimestampForRetry> timestampsForRetryPerGroup = this.timestampsForRetry.get(groupId);
        if (timestampsForRetryPerGroup == null) {
            timestampsForRetryPerGroup = Collections.emptyMap();
        }
        int expiredCount = 0;
        int cachedCount = 0;
        int backoffCount = 0;
        int prioritizedCount = 0;
        for (Map.Entry<Object, TimestampForRetry> timestampsForRetryEntry : timestampsForRetryPerGroup.entrySet()) {
            TopicPartition topicPartition = (TopicPartition)timestampsForRetryEntry.getKey();
            TimestampForRetry timestampForRetry = timestampsForRetryEntry.getValue();
            long now = this.time.milliseconds();
            long createdAt = timestampForRetry.createdAt();
            if (createdAt + this.config.getFetchOffsetExpiryMs() <= now) {
                ++expiredCount;
                continue;
            }
            TimestampAndDelta timestampAndDelta = timestampForRetry.timestampAndDelta();
            if (this.getOffsetFromCache(groupId, topicPartition, timestampAndDelta, offsetsForCommit)) {
                ++cachedCount;
                continue;
            }
            long attemptedAt = timestampForRetry.attemptedAt();
            if (attemptedAt + this.config.getFetchOffsetRetryBackoffMs() > now) {
                this.addTimestampForNextRetry(groupId, topicPartition, timestampForRetry);
                ++backoffCount;
                continue;
            }
            TimestampAndDelta previous = timestampsToSearch.put(topicPartition, timestampAndDelta);
            if (previous == null) continue;
            this.addTimestampForNextRetry(groupId, topicPartition, previous, now);
            ++prioritizedCount;
        }
        int newSize = timestampsToSearch.size();
        if (newSize - oldSize > 0) {
            log.debug("Found {} old timestamps to search for group {}", (Object)(newSize - oldSize), (Object)groupId);
        }
        log.debug("For old timestamps for group {}, expired {}, in cache {}, backoff {}, prioritized {}", new Object[]{groupId, expiredCount, cachedCount, backoffCount, prioritizedCount});
    }

    private TimestampForRetry getTimestampForNextRetry(String groupId, TopicPartition topicPartition, TimestampAndDelta timestampAndDelta, long attemptedAt) {
        Map<TopicPartition, TimestampForRetry> timestampsForRetryPerGroup = this.timestampsForRetry.get(groupId);
        if (timestampsForRetryPerGroup == null) {
            return new TimestampForRetry(timestampAndDelta, attemptedAt);
        }
        TimestampForRetry timestampForRetry = timestampsForRetryPerGroup.get(topicPartition);
        return timestampForRetry != null && timestampForRetry.timestampAndDelta() == timestampAndDelta ? timestampForRetry.next(attemptedAt) : new TimestampForRetry(timestampAndDelta, attemptedAt);
    }

    private void addTimestampForNextRetry(String groupId, TopicPartition topicPartition, TimestampAndDelta timestampAndDelta, long attemptedAt) {
        this.addTimestampForNextRetry(groupId, topicPartition, this.getTimestampForNextRetry(groupId, topicPartition, timestampAndDelta, attemptedAt));
    }

    private void addTimestampForNextRetry(String groupId, TopicPartition topicPartition, TimestampForRetry timestampForNextRetry) {
        Map<TopicPartition, TimestampForRetry> timestampsForNextRetryPerGroup = this.timestampsForNextRetry.get(groupId);
        if (timestampsForNextRetryPerGroup == null) {
            timestampsForNextRetryPerGroup = new HashMap<TopicPartition, TimestampForRetry>();
            this.timestampsForNextRetry.put(groupId, timestampsForNextRetryPerGroup);
        }
        timestampsForNextRetryPerGroup.put(topicPartition, timestampForNextRetry);
    }

    private boolean getOffsetFromCache(String groupId, TopicPartition topicPartition, TimestampAndDelta timestampAndDelta, Map<TopicPartition, OffsetAndMetadata> offsetsForCommit) {
        OffsetAndMetadata offsetAndMetadata;
        Map<TimestampAndDelta, OffsetAndMetadata> offsets = this.cachedOffsets.get(topicPartition);
        if (offsets != null && (offsetAndMetadata = offsets.get(timestampAndDelta)) != null) {
            log.trace("Translating timestamp {} to offset {} for group {} (cached)", new Object[]{timestampAndDelta, offsetAndMetadata.offset(), groupId});
            offsetsForCommit.put(topicPartition, offsetAndMetadata);
            return true;
        }
        return false;
    }

    private void addOffsetToCache(TopicPartition topicPartition, TimestampAndDelta timestampAndDelta, OffsetAndMetadata offsetAndMetadata) {
        Map<TimestampAndDelta, OffsetAndMetadata> offsets = this.cachedOffsets.get(topicPartition);
        if (offsets == null) {
            offsets = new HashMap<TimestampAndDelta, OffsetAndMetadata>();
            this.cachedOffsets.put(topicPartition, offsets);
        }
        offsets.put(timestampAndDelta, offsetAndMetadata);
    }

    private static class TimestampForRetry {
        private final TimestampAndDelta timestampAndDelta;
        private final long createdAt;
        private final long attemptedAt;

        public TimestampForRetry(TimestampAndDelta timestampAndDelta, long attemptedAt) {
            this.timestampAndDelta = timestampAndDelta;
            this.createdAt = attemptedAt;
            this.attemptedAt = attemptedAt;
        }

        public TimestampForRetry(TimestampAndDelta timestampAndDelta, long createdAt, long attemptedAt) {
            this.timestampAndDelta = timestampAndDelta;
            this.createdAt = createdAt;
            this.attemptedAt = attemptedAt;
        }

        public TimestampAndDelta timestampAndDelta() {
            return this.timestampAndDelta;
        }

        public long createdAt() {
            return this.createdAt;
        }

        public long attemptedAt() {
            return this.attemptedAt;
        }

        public TimestampForRetry next(long attemptedAt) {
            return new TimestampForRetry(this.timestampAndDelta, this.createdAt, attemptedAt);
        }
    }
}

