package org.apache.flink.runtime.io.network.partition;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.class */
public class TaskExecutorPartitionTrackerImpl extends AbstractPartitionTracker<JobID, TaskExecutorPartitionInfo> implements TaskExecutorPartitionTracker {
    private final Map<IntermediateDataSetID, DataSetEntry> clusterPartitions = new HashMap();
    private final ShuffleEnvironment<?, ?> shuffleEnvironment;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl$DataSetEntry.class */
    public static class DataSetEntry {
        private final Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors;
        private final int totalNumberOfPartitions;

        private DataSetEntry(int i) {
            this.shuffleDescriptors = new HashMap();
            this.totalNumberOfPartitions = i;
        }

        void addPartition(ShuffleDescriptor shuffleDescriptor) {
            this.shuffleDescriptors.put(shuffleDescriptor.getResultPartitionID(), shuffleDescriptor);
        }

        public Set<ResultPartitionID> getPartitionIds() {
            return this.shuffleDescriptors.keySet();
        }

        public int getTotalNumberOfPartitions() {
            return this.totalNumberOfPartitions;
        }

        public Map<ResultPartitionID, ShuffleDescriptor> getShuffleDescriptors() {
            return this.shuffleDescriptors;
        }
    }

    public TaskExecutorPartitionTrackerImpl(ShuffleEnvironment<?, ?> shuffleEnvironment) {
        this.shuffleEnvironment = shuffleEnvironment;
    }

    @Override // org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker
    public void startTrackingPartition(JobID jobID, TaskExecutorPartitionInfo taskExecutorPartitionInfo) {
        Preconditions.checkNotNull(jobID);
        Preconditions.checkNotNull(taskExecutorPartitionInfo);
        startTrackingPartition(jobID, taskExecutorPartitionInfo.getResultPartitionId(), taskExecutorPartitionInfo);
    }

    @Override // org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker
    public void stopTrackingAndReleaseJobPartitions(Collection<ResultPartitionID> collection) {
        if (collection.isEmpty()) {
            return;
        }
        stopTrackingPartitions(collection);
        this.shuffleEnvironment.releasePartitionsLocally(collection);
    }

    @Override // org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker
    public void stopTrackingAndReleaseJobPartitionsFor(JobID jobID) {
        this.shuffleEnvironment.releasePartitionsLocally(CollectionUtil.project(stopTrackingPartitionsFor(jobID), (v0) -> {
            return v0.getResultPartitionId();
        }));
    }

    @Override // org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker
    public void promoteJobPartitions(Collection<ResultPartitionID> collection) {
        if (collection.isEmpty()) {
            return;
        }
        for (PartitionTrackerEntry<JobID, TaskExecutorPartitionInfo> partitionTrackerEntry : stopTrackingPartitions(collection)) {
            TaskExecutorPartitionInfo metaInfo = partitionTrackerEntry.getMetaInfo();
            this.clusterPartitions.computeIfAbsent(metaInfo.getIntermediateDataSetId(), intermediateDataSetID -> {
                return new DataSetEntry(metaInfo.getNumberOfPartitions());
            }).addPartition(partitionTrackerEntry.getMetaInfo().getShuffleDescriptor());
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker
    public void stopTrackingAndReleaseClusterPartitions(Collection<IntermediateDataSetID> collection) {
        Iterator<IntermediateDataSetID> it = collection.iterator();
        while (it.hasNext()) {
            this.shuffleEnvironment.releasePartitionsLocally(this.clusterPartitions.remove(it.next()).getPartitionIds());
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker
    public void stopTrackingAndReleaseAllClusterPartitions() {
        Stream<R> map = this.clusterPartitions.values().stream().map((v0) -> {
            return v0.getPartitionIds();
        });
        ShuffleEnvironment<?, ?> shuffleEnvironment = this.shuffleEnvironment;
        shuffleEnvironment.getClass();
        map.forEach((v1) -> {
            r1.releasePartitionsLocally(v1);
        });
        this.clusterPartitions.clear();
    }

    @Override // org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker
    public ClusterPartitionReport createClusterPartitionReport() {
        return new ClusterPartitionReport((List) this.clusterPartitions.entrySet().stream().map(entry -> {
            return new ClusterPartitionReport.ClusterPartitionReportEntry((IntermediateDataSetID) entry.getKey(), ((DataSetEntry) entry.getValue()).getTotalNumberOfPartitions(), ((DataSetEntry) entry.getValue()).getShuffleDescriptors());
        }).collect(Collectors.toList()));
    }
}
