package io.confluent.parallelconsumer.state;

import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.metrics.PCMetrics;
import io.confluent.parallelconsumer.metrics.PCMetricsDef;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Tag;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/state/PartitionStateManager.class */
public class PartitionStateManager<K, V> implements ConsumerRebalanceListener {
    public static final double USED_PAYLOAD_THRESHOLD_MULTIPLIER_DEFAULT = 0.75d;
    private final ShardManager<K, V> sm;
    private final PCModule<K, V> module;
    private Gauge numberOfPartitionsGauge;
    private Gauge totalIncompletesGauge;
    private final PCMetrics pcMetrics;
    private static final Logger log = LoggerFactory.getLogger(PartitionStateManager.class);
    private static double USED_PAYLOAD_THRESHOLD_MULTIPLIER = 0.75d;
    private final Map<TopicPartition, PartitionState<K, V>> partitionStates = new ConcurrentHashMap();
    private final Map<TopicPartition, Long> partitionsAssignmentEpochs = new ConcurrentHashMap();
    private final Map<TopicPartition, Counter> slowWorkCounters = new HashMap();

    public PartitionStateManager(PCModule<K, V> pCModule, ShardManager<K, V> shardManager) {
        this.sm = shardManager;
        this.module = pCModule;
        this.pcMetrics = pCModule.pcMetrics();
        initMetrics();
    }

    public PartitionState<K, V> getPartitionState(TopicPartition topicPartition) {
        return this.partitionStates.get(topicPartition);
    }

    private PartitionState<K, V> getPartitionState(EpochAndRecordsMap<K, V>.RecordsAndEpoch recordsAndEpoch) {
        return getPartitionState(recordsAndEpoch.getTopicPartition());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PartitionState<K, V> getPartitionState(WorkContainer<K, V> workContainer) {
        return getPartitionState(workContainer.getTopicPartition());
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        log.debug("Partitions assigned: {}", collection);
        for (TopicPartition topicPartition : collection) {
            if (this.partitionStates.containsKey(topicPartition)) {
                PartitionState<K, V> partitionState = this.partitionStates.get(topicPartition);
                if (partitionState.isRemoved()) {
                    log.trace("Reassignment of previously revoked partition {} - state: {}", topicPartition, partitionState);
                } else {
                    log.warn("New assignment of partition which already exists and isn't recorded as removed in partition state. Could be a state bug - was the partition revocation somehow missed, or is this a race? Please file a GH issue. Partition: {}, state: {}", topicPartition, partitionState);
                }
            }
        }
        incrementPartitionAssignmentEpoch(collection);
        try {
            this.partitionStates.putAll(new OffsetMapCodecManager(this.module).loadPartitionStateForAssignment(collection));
            initPartitionCounters(collection);
            this.sm.removeStaleContainers();
        } catch (Exception e) {
            log.error("Error in onPartitionsAssigned", e);
            throw e;
        }
    }

    private void initPartitionCounters(Collection<TopicPartition> collection) {
        collection.forEach(topicPartition -> {
            if (this.slowWorkCounters.containsKey(topicPartition)) {
                return;
            }
            this.slowWorkCounters.put(topicPartition, this.pcMetrics.getCounterFromMetricDef(PCMetricsDef.SLOW_RECORDS, Tag.of("topic", topicPartition.topic()), Tag.of("partition", String.valueOf(topicPartition.partition()))));
        });
    }

    private void deregisterPartitionCounters(Collection<TopicPartition> collection) {
        collection.forEach(topicPartition -> {
            Meter meter = (Counter) this.slowWorkCounters.remove(topicPartition);
            if (meter != null) {
                this.pcMetrics.removeMeter(meter);
            }
        });
    }

    public void incrementSlowWorkCounter(TopicPartition topicPartition) {
        Optional.ofNullable(this.slowWorkCounters.get(topicPartition)).ifPresent((v0) -> {
            v0.increment();
        });
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        log.info("Partitions revoked: {}", collection);
        try {
            onPartitionsRemoved(collection);
        } catch (Exception e) {
            log.error("Error in onPartitionsRevoked", e);
            throw e;
        }
    }

    void onPartitionsRemoved(Collection<TopicPartition> collection) {
        incrementPartitionAssignmentEpoch(collection);
        resetOffsetMapAndRemoveWork(collection);
        deregisterPartitionCounters(collection);
        this.sm.removeStaleContainers();
    }

    public void onPartitionsLost(Collection<TopicPartition> collection) {
        try {
            log.info("Lost partitions: {}", collection);
            onPartitionsRemoved(collection);
        } catch (Exception e) {
            log.error("Error in onPartitionsLost", e);
            throw e;
        }
    }

    public void onOffsetCommitSuccess(Map<TopicPartition, OffsetAndMetadata> map) {
        map.forEach((topicPartition, offsetAndMetadata) -> {
            getPartitionState(topicPartition).onOffsetCommitSuccess(offsetAndMetadata);
        });
    }

    private void resetOffsetMapAndRemoveWork(Collection<TopicPartition> collection) {
        for (TopicPartition topicPartition : collection) {
            PartitionState<K, V> partitionState = this.partitionStates.get(topicPartition);
            this.partitionStates.put(topicPartition, RemovedPartitionState.getSingleton());
            partitionState.onPartitionsRemoved(this.sm);
        }
    }

    public Long getEpochOfPartition(TopicPartition topicPartition) {
        return this.partitionsAssignmentEpochs.get(topicPartition);
    }

    private void incrementPartitionAssignmentEpoch(Collection<TopicPartition> collection) {
        for (TopicPartition topicPartition : collection) {
            this.partitionsAssignmentEpochs.put(topicPartition, Long.valueOf(this.partitionsAssignmentEpochs.getOrDefault(topicPartition, -1L).longValue() + 1));
        }
    }

    public boolean isAllowedMoreRecords(TopicPartition topicPartition) {
        return getPartitionState(topicPartition).isAllowedMoreRecords();
    }

    public boolean isAllowedMoreRecords(WorkContainer<?, ?> workContainer) {
        return isAllowedMoreRecords(workContainer.getTopicPartition());
    }

    public boolean hasIncompleteOffsets() {
        Iterator<PartitionState<K, V>> it = getAssignedPartitions().values().iterator();
        while (it.hasNext()) {
            if (it.next().hasIncompleteOffsets()) {
                return true;
            }
        }
        return false;
    }

    public long getNumberOfIncompleteOffsets() {
        return getAssignedPartitions().values().stream().mapToLong((v0) -> {
            return v0.getNumberOfIncompleteOffsets();
        }).reduce(Long::sum).orElse(0L);
    }

    public long getHighestSeenOffset(TopicPartition topicPartition) {
        return getPartitionState(topicPartition).getOffsetHighestSeen();
    }

    public void onSuccess(WorkContainer<K, V> workContainer) {
        getPartitionState(workContainer.getTopicPartition()).onSuccess(workContainer.offset());
    }

    public void onFailure(WorkContainer<K, V> workContainer) {
        getPartitionState(workContainer.getTopicPartition()).onFailure(workContainer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybeRegisterNewRecordAsWork(EpochAndRecordsMap<K, V> epochAndRecordsMap) {
        log.debug("Incoming {} new records...", Integer.valueOf(epochAndRecordsMap.count()));
        for (EpochAndRecordsMap<K, V>.RecordsAndEpoch recordsAndEpoch : epochAndRecordsMap.getRecordMap().values()) {
            getPartitionState(recordsAndEpoch).maybeRegisterNewPollBatchAsWork(recordsAndEpoch);
        }
    }

    public Map<TopicPartition, OffsetAndMetadata> collectDirtyCommitData() {
        HashMap hashMap = new HashMap();
        for (PartitionState<K, V> partitionState : getAssignedPartitions().values()) {
            partitionState.getCommitDataIfDirty().ifPresent(offsetAndMetadata -> {
                hashMap.put(partitionState.getTp(), offsetAndMetadata);
            });
        }
        return hashMap;
    }

    private Map<TopicPartition, PartitionState<K, V>> getAssignedPartitions() {
        return Collections.unmodifiableMap((Map) this.partitionStates.entrySet().stream().filter(entry -> {
            return !((PartitionState) entry.getValue()).isRemoved();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
    }

    public boolean couldBeTakenAsWork(WorkContainer<K, V> workContainer) {
        return getPartitionState(workContainer).couldBeTakenAsWork(workContainer);
    }

    public boolean isDirty() {
        return this.partitionStates.values().stream().anyMatch((v0) -> {
            return v0.isDirty();
        });
    }

    private void initMetrics() {
        this.numberOfPartitionsGauge = this.pcMetrics.gaugeFromMetricDef(PCMetricsDef.NUMBER_OF_PARTITIONS, this, partitionStateManager -> {
            return getAssignedPartitions().size();
        }, new Tag[0]);
        this.totalIncompletesGauge = this.pcMetrics.gaugeFromMetricDef(PCMetricsDef.INCOMPLETE_OFFSETS_TOTAL, this, partitionStateManager2 -> {
            return partitionStateManager2.getAssignedPartitions().values().stream().mapToInt((v0) -> {
                return v0.getNumberOfIncompleteOffsets();
            }).sum();
        }, new Tag[0]);
    }

    public static double getUSED_PAYLOAD_THRESHOLD_MULTIPLIER() {
        return USED_PAYLOAD_THRESHOLD_MULTIPLIER;
    }

    public static void setUSED_PAYLOAD_THRESHOLD_MULTIPLIER(double d) {
        USED_PAYLOAD_THRESHOLD_MULTIPLIER = d;
    }
}
