package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.class */
public class CommittableCollector<CommT> {
    private static final long EOI = Long.MAX_VALUE;
    private final NavigableMap<Long, CheckpointCommittableManagerImpl<CommT>> checkpointCommittables;
    private final SinkCommitterMetricGroup metricGroup;

    public CommittableCollector(SinkCommitterMetricGroup sinkCommitterMetricGroup) {
        this(new TreeMap(), sinkCommitterMetricGroup);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CommittableCollector(Map<Long, CheckpointCommittableManagerImpl<CommT>> map, SinkCommitterMetricGroup sinkCommitterMetricGroup) {
        this.checkpointCommittables = new TreeMap((Map) Preconditions.checkNotNull(map));
        this.metricGroup = sinkCommitterMetricGroup;
        this.metricGroup.setCurrentPendingCommittablesGauge(this::getNumPending);
    }

    private int getNumPending() {
        return this.checkpointCommittables.values().stream().mapToInt(checkpointCommittableManagerImpl -> {
            return (int) checkpointCommittableManagerImpl.getPendingRequests().count();
        }).sum();
    }

    public static <CommT> CommittableCollector<CommT> of(SinkCommitterMetricGroup sinkCommitterMetricGroup) {
        return new CommittableCollector<>(sinkCommitterMetricGroup);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <CommT> CommittableCollector<CommT> ofLegacy(List<CommT> list, SinkCommitterMetricGroup sinkCommitterMetricGroup) {
        CommittableCollector<CommT> committableCollector = new CommittableCollector<>(sinkCommitterMetricGroup);
        committableCollector.addSummary(new CommittableSummary<>(0, 1, 1L, list.size(), list.size(), 0));
        list.forEach(obj -> {
            committableCollector.addCommittable(new CommittableWithLineage<>(obj, 1L, 0));
        });
        return committableCollector;
    }

    public void addMessage(CommittableMessage<CommT> committableMessage) {
        if (committableMessage instanceof CommittableSummary) {
            addSummary((CommittableSummary) committableMessage);
        } else if (committableMessage instanceof CommittableWithLineage) {
            addCommittable((CommittableWithLineage) committableMessage);
        }
    }

    public Collection<? extends CheckpointCommittableManager<CommT>> getCheckpointCommittablesUpTo(long j) {
        return new ArrayList(this.checkpointCommittables.headMap(Long.valueOf(j), true).values());
    }

    public Optional<CheckpointCommittableManager<CommT>> getEndOfInputCommittable() {
        return Optional.ofNullable((CheckpointCommittableManager) this.checkpointCommittables.get(Long.MAX_VALUE));
    }

    public boolean isFinished() {
        return this.checkpointCommittables.values().stream().allMatch((v0) -> {
            return v0.isFinished();
        });
    }

    public void merge(CommittableCollector<CommT> committableCollector) {
        for (Map.Entry<Long, CheckpointCommittableManagerImpl<CommT>> entry : committableCollector.checkpointCommittables.entrySet()) {
            this.checkpointCommittables.merge(entry.getKey(), entry.getValue(), (v0, v1) -> {
                return v0.merge(v1);
            });
        }
    }

    public CommittableCollector<CommT> copy() {
        return new CommittableCollector<>((Map) this.checkpointCommittables.entrySet().stream().map(entry -> {
            return Tuple2.of((Long) entry.getKey(), ((CheckpointCommittableManagerImpl) entry.getValue()).copy());
        }).collect(Collectors.toMap(tuple2 -> {
            return (Long) tuple2.f0;
        }, tuple22 -> {
            return (CheckpointCommittableManagerImpl) tuple22.f1;
        })), this.metricGroup);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<CheckpointCommittableManagerImpl<CommT>> getCheckpointCommittables() {
        return this.checkpointCommittables.values();
    }

    private void addSummary(CommittableSummary<CommT> committableSummary) {
        ((CheckpointCommittableManagerImpl) this.checkpointCommittables.computeIfAbsent(Long.valueOf(committableSummary.getCheckpointIdOrEOI()), l -> {
            return CheckpointCommittableManagerImpl.forSummary(committableSummary, this.metricGroup);
        })).addSummary(committableSummary);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addCommittable(CommittableWithLineage<CommT> committableWithLineage) {
        getCheckpointCommittables(committableWithLineage).addCommittable(committableWithLineage);
    }

    private CheckpointCommittableManagerImpl<CommT> getCheckpointCommittables(CommittableMessage<CommT> committableMessage) {
        return (CheckpointCommittableManagerImpl) Preconditions.checkNotNull((CheckpointCommittableManagerImpl) this.checkpointCommittables.get(Long.valueOf(committableMessage.getCheckpointIdOrEOI())), "Unknown checkpoint for %s", committableMessage);
    }

    public void remove(CheckpointCommittableManager<CommT> checkpointCommittableManager) {
        this.checkpointCommittables.remove(Long.valueOf(checkpointCommittableManager.getCheckpointId()));
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        return Objects.equals(this.checkpointCommittables, ((CommittableCollector) obj).checkpointCommittables);
    }

    public int hashCode() {
        return Objects.hashCode(this.checkpointCommittables);
    }

    public String toString() {
        return "CommittableCollector{checkpointCommittables=" + this.checkpointCommittables + "}";
    }
}
