/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.replicator.offsets;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerOffsetsTopicCommitter {
    private static final Logger log = LoggerFactory.getLogger(ConsumerOffsetsTopicCommitter.class);
    private final Consumer<byte[], byte[]> consumer;
    private final ConcurrentMap<TopicPartition, Long> offsets = new ConcurrentHashMap<TopicPartition, Long>();
    private final AtomicBoolean doCommit = new AtomicBoolean(false);
    private ConcurrentMap<TopicPartition, Long> numUncommittedRecords;
    private final Time time;
    private final long commitOffsetPeriodMs;
    private long lastCommitOffsetMs = 0L;

    public ConsumerOffsetsTopicCommitter(Consumer<byte[], byte[]> consumer) {
        this(consumer, false, Time.SYSTEM, -1);
    }

    public ConsumerOffsetsTopicCommitter(Consumer<byte[], byte[]> consumer, boolean useNumUncommittedRecords, Time time, int commitOffsetPeriodMs) {
        this.consumer = consumer;
        if (useNumUncommittedRecords) {
            this.numUncommittedRecords = new ConcurrentHashMap<TopicPartition, Long>();
        }
        this.time = time;
        this.commitOffsetPeriodMs = commitOffsetPeriodMs;
    }

    public void commitRecords(List<ConsumerRecord<byte[], byte[]>> records) {
        log.debug("Committing consumer offsets...");
        for (ConsumerRecord<byte[], byte[]> record : records) {
            this.commitRecord(record.topic(), record.partition(), record.offset());
        }
    }

    public void commitRecord(SourceRecord record, RecordMetadata metadata) {
        Map sourcePartition = record.sourcePartition();
        if (sourcePartition == null) {
            log.warn("Could not get source partition map from source record with destination topic {}", (Object)record.topic());
            return;
        }
        String topic = (String)sourcePartition.get("topic");
        if (topic == null) {
            log.warn("Could not get source topic from source record with destination topic {}", (Object)record.topic());
            return;
        }
        Integer partition = (Integer)sourcePartition.get("partition");
        if (partition == null) {
            log.warn("Could not get source partition from source record with destination topic {}", (Object)record.topic());
            return;
        }
        Map sourceOffset = record.sourceOffset();
        if (sourceOffset == null) {
            log.warn("Could not get source offset map from source record with destination topic {}", (Object)record.topic());
            return;
        }
        Long offset = (Long)sourceOffset.get("offset");
        if (offset == null) {
            log.warn("Could not get source offset from source record with destination topic {}", (Object)record.topic());
            return;
        }
        if (metadata != null || this.numUncommittedRecords == null) {
            this.commitRecord(topic, partition, offset);
        } else {
            this.maybeCommitFilteredRecordByConnect(topic, partition, offset);
        }
    }

    public void commitRecord(String topic, int partition, long offset) {
        Long numUncommitted;
        TopicPartition tp = new TopicPartition(topic, partition);
        if (this.numUncommittedRecords != null && (numUncommitted = this.numUncommittedRecords.computeIfPresent(tp, (key, val) -> {
            val = val - 1L;
            return val;
        })) != null && numUncommitted < 0L) {
            log.error("Number of uncommitted records is {} for topic partition {} and offset {}", new Object[]{numUncommitted, tp, offset});
        }
        this.offsets.put(tp, offset);
    }

    private void maybeCommitFilteredRecordByConnect(String topic, int partition, long offset) {
        TopicPartition tp = new TopicPartition(topic, partition);
        Long numUncommitted = this.numUncommittedRecords.computeIfPresent(tp, (key, val) -> {
            val = val - 1L;
            return val;
        });
        if (numUncommitted == null) {
            log.error("Topic partition {} not found for offset {}", (Object)tp, (Object)offset);
            return;
        }
        if (numUncommitted < 0L) {
            log.error("Number of uncommitted records is {} for topic partition {} and offset {}", new Object[]{numUncommitted, tp, offset});
            return;
        }
        if (numUncommitted == 0L) {
            this.offsets.put(tp, offset);
        }
    }

    public void maybeCommitFilteredRecordByReplicator(String topic, int partition, long offset) {
        Long numUncommitted;
        TopicPartition tp = new TopicPartition(topic, partition);
        Long l = numUncommitted = this.numUncommittedRecords != null ? (Long)this.numUncommittedRecords.get(tp) : null;
        if (numUncommitted == null || numUncommitted == 0L) {
            this.offsets.put(tp, offset);
        } else if (numUncommitted < 0L) {
            log.error("Number of uncommitted records is {} for topic partition {} and offset {}", new Object[]{numUncommitted, tp, offset});
        }
    }

    public void updateNumUncommittedRecords(TopicPartition tp, long numUncommitted) {
        if (this.numUncommittedRecords != null) {
            if (numUncommitted == 0L) {
                return;
            }
            if (this.numUncommittedRecords.computeIfPresent(tp, (key, val) -> {
                val = val + numUncommitted;
                return val;
            }) == null) {
                this.numUncommittedRecords.put(tp, numUncommitted);
            }
        }
    }

    public void commit() {
        this.doCommit.set(true);
    }

    public void checkCommit() {
        if (this.numUncommittedRecords != null) {
            long nowMs = this.time.milliseconds();
            if (nowMs < this.lastCommitOffsetMs) {
                this.lastCommitOffsetMs = nowMs;
                return;
            }
            if (nowMs - this.lastCommitOffsetMs < this.commitOffsetPeriodMs) {
                return;
            }
            this.lastCommitOffsetMs = nowMs;
            this.commitConsumerOffset();
        } else if (this.doCommit.getAndSet(false)) {
            this.commitConsumerOffset();
        }
    }

    private void commitConsumerOffset() {
        log.debug("Writing {} internal offset records", (Object)this.offsets.size());
        HashMap<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (TopicPartition tp : this.offsets.keySet()) {
            Long offset = (Long)this.offsets.get(tp);
            if (offset == null) continue;
            offsetsToCommit.put(tp, new OffsetAndMetadata(offset + 1L, ""));
        }
        try {
            this.consumer.commitSync(offsetsToCommit);
            offsetsToCommit.forEach(this.offsets::remove);
        }
        catch (KafkaException e) {
            log.warn("Commit of offsets threw an unexpected exception: {}", offsetsToCommit, (Object)e);
            if (e instanceof WakeupException || e.getCause() instanceof WakeupException) {
                log.debug("Commit of offsets threw WakeupException. This is expected on the consumer thread and may happen from time to time: {}", offsetsToCommit);
            } else {
                log.error("Commit of offsets threw an unexpected exception: {}", offsetsToCommit, (Object)e);
            }
            this.doCommit.set(true);
        }
    }
}

