package org.apache.flink.runtime.io.network.partition;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.network.partition.AbstractPartitionTracker;
import org.apache.flink.runtime.io.network.partition.PartitionTrackerFactory;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.class */
public class JobMasterPartitionTrackerImpl extends AbstractPartitionTracker<ResourceID, ResultPartitionDeploymentDescriptor> implements JobMasterPartitionTracker {
    private final JobID jobId;
    private final ShuffleMaster<?> shuffleMaster;
    private final PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup;

    public JobMasterPartitionTrackerImpl(JobID jobID, ShuffleMaster<?> shuffleMaster, PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup) {
        this.jobId = (JobID) Preconditions.checkNotNull(jobID);
        this.shuffleMaster = (ShuffleMaster) Preconditions.checkNotNull(shuffleMaster);
        this.taskExecutorGatewayLookup = taskExecutorGatewayLookup;
    }

    @Override // org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker
    public void startTrackingPartition(ResourceID resourceID, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
        Preconditions.checkNotNull(resourceID);
        Preconditions.checkNotNull(resultPartitionDeploymentDescriptor);
        if (resultPartitionDeploymentDescriptor.getPartitionType().isReleaseByScheduler()) {
            startTrackingPartition(resourceID, resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID(), resultPartitionDeploymentDescriptor);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.AbstractPartitionTracker
    public void startTrackingPartition(ResourceID resourceID, ResultPartitionID resultPartitionID, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
        if (resultPartitionDeploymentDescriptor.getShuffleDescriptor().storesLocalResourcesOn().isPresent()) {
            this.partitionTable.startTrackingPartitions(resourceID, Collections.singletonList(resultPartitionID));
        }
        this.partitionInfos.put(resultPartitionID, new AbstractPartitionTracker.PartitionInfo(resourceID, resultPartitionDeploymentDescriptor));
    }

    @Override // org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker
    public void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> collection, boolean z) {
        stopTrackingAndHandlePartitions(collection, (resourceID, collection2) -> {
            internalReleasePartitions(resourceID, collection2, z);
        });
    }

    @Override // org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker
    public void stopTrackingAndReleaseOrPromotePartitions(Collection<ResultPartitionID> collection) {
        stopTrackingAndHandlePartitions(collection, (resourceID, collection2) -> {
            internalReleaseOrPromotePartitions(resourceID, collection2);
        });
    }

    @Override // org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker
    public Collection<ResultPartitionDeploymentDescriptor> getAllTrackedPartitions() {
        return (Collection) this.partitionInfos.values().stream().map((v0) -> {
            return v0.getMetaInfo();
        }).collect(Collectors.toList());
    }

    private void stopTrackingAndHandlePartitions(Collection<ResultPartitionID> collection, BiConsumer<ResourceID, Collection<ResultPartitionDeploymentDescriptor>> biConsumer) {
        Preconditions.checkNotNull(collection);
        ((Map) stopTrackingPartitions(collection).stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getKey();
        }, Collectors.mapping((v0) -> {
            return v0.getMetaInfo();
        }, Collectors.toList())))).forEach(biConsumer);
    }

    private void internalReleasePartitions(ResourceID resourceID, Collection<ResultPartitionDeploymentDescriptor> collection, boolean z) {
        internalReleasePartitionsOnTaskExecutor(resourceID, collection);
        if (z) {
            internalReleasePartitionsOnShuffleMaster(collection.stream());
        }
    }

    private void internalReleaseOrPromotePartitions(ResourceID resourceID, Collection<ResultPartitionDeploymentDescriptor> collection) {
        internalReleaseOrPromotePartitionsOnTaskExecutor(resourceID, collection);
        internalReleasePartitionsOnShuffleMaster(excludePersistentPartitions(collection));
    }

    private void internalReleasePartitionsOnTaskExecutor(ResourceID resourceID, Collection<ResultPartitionDeploymentDescriptor> collection) {
        internalReleaseOrPromotePartitionsOnTaskExecutor(resourceID, (Set) collection.stream().filter(JobMasterPartitionTrackerImpl::isPartitionWithLocalResources).map(JobMasterPartitionTrackerImpl::getResultPartitionId).collect(Collectors.toSet()), Collections.emptySet());
    }

    private void internalReleaseOrPromotePartitionsOnTaskExecutor(ResourceID resourceID, Collection<ResultPartitionDeploymentDescriptor> collection) {
        Map map = (Map) collection.stream().filter(JobMasterPartitionTrackerImpl::isPartitionWithLocalResources).collect(Collectors.partitioningBy(resultPartitionDeploymentDescriptor -> {
            return resultPartitionDeploymentDescriptor.getPartitionType().isPersistent();
        }, Collectors.mapping(JobMasterPartitionTrackerImpl::getResultPartitionId, Collectors.toSet())));
        internalReleaseOrPromotePartitionsOnTaskExecutor(resourceID, (Set) map.get(false), (Set) map.get(true));
    }

    private void internalReleaseOrPromotePartitionsOnTaskExecutor(ResourceID resourceID, Set<ResultPartitionID> set, Set<ResultPartitionID> set2) {
        if (set.isEmpty() && set2.isEmpty()) {
            return;
        }
        this.taskExecutorGatewayLookup.lookup(resourceID).ifPresent(taskExecutorGateway -> {
            taskExecutorGateway.releaseOrPromotePartitions(this.jobId, set, set2);
        });
    }

    private void internalReleasePartitionsOnShuffleMaster(Stream<ResultPartitionDeploymentDescriptor> stream) {
        Stream<R> map = stream.map((v0) -> {
            return v0.getShuffleDescriptor();
        });
        ShuffleMaster<?> shuffleMaster = this.shuffleMaster;
        shuffleMaster.getClass();
        map.forEach(shuffleMaster::releasePartitionExternally);
    }

    private static boolean isPartitionWithLocalResources(ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
        return resultPartitionDeploymentDescriptor.getShuffleDescriptor().storesLocalResourcesOn().isPresent();
    }

    private static Stream<ResultPartitionDeploymentDescriptor> excludePersistentPartitions(Collection<ResultPartitionDeploymentDescriptor> collection) {
        return collection.stream().filter(resultPartitionDeploymentDescriptor -> {
            return !resultPartitionDeploymentDescriptor.getPartitionType().isPersistent();
        });
    }

    private static ResultPartitionID getResultPartitionId(ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
        return resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
    }
}
