/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.network.partition.AbstractPartitionTracker;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.PartitionTrackerEntry;
import org.apache.flink.runtime.io.network.partition.PartitionTrackerFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;

public class JobMasterPartitionTrackerImpl
extends AbstractPartitionTracker<ResourceID, ResultPartitionDeploymentDescriptor>
implements JobMasterPartitionTracker {
    private final JobID jobId;
    private final ShuffleMaster<?> shuffleMaster;
    private final PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup;
    private ResourceManagerGateway resourceManagerGateway;
    private final Map<IntermediateDataSetID, List<ShuffleDescriptor>> clusterPartitionShuffleDescriptors;

    public JobMasterPartitionTrackerImpl(JobID jobId, ShuffleMaster<?> shuffleMaster, PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup) {
        this.jobId = (JobID)Preconditions.checkNotNull((Object)jobId);
        this.shuffleMaster = (ShuffleMaster)Preconditions.checkNotNull(shuffleMaster);
        this.taskExecutorGatewayLookup = taskExecutorGatewayLookup;
        this.clusterPartitionShuffleDescriptors = new HashMap<IntermediateDataSetID, List<ShuffleDescriptor>>();
    }

    @Override
    public void startTrackingPartition(ResourceID producingTaskExecutorId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
        Preconditions.checkNotNull((Object)producingTaskExecutorId);
        Preconditions.checkNotNull((Object)resultPartitionDeploymentDescriptor);
        if (!resultPartitionDeploymentDescriptor.getPartitionType().isReleaseByScheduler()) {
            return;
        }
        ResultPartitionID resultPartitionId = resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
        this.startTrackingPartition(producingTaskExecutorId, resultPartitionId, resultPartitionDeploymentDescriptor);
    }

    @Override
    void startTrackingPartition(ResourceID key, ResultPartitionID resultPartitionId, ResultPartitionDeploymentDescriptor metaInfo) {
        if (metaInfo.getShuffleDescriptor().storesLocalResourcesOn().isPresent()) {
            this.partitionTable.startTrackingPartitions(key, Collections.singletonList(resultPartitionId));
        }
        this.partitionInfos.put(resultPartitionId, new AbstractPartitionTracker.PartitionInfo<ResourceID, ResultPartitionDeploymentDescriptor>(key, metaInfo));
    }

    @Override
    public void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> resultPartitionIds, boolean releaseOnShuffleMaster) {
        this.stopTrackingAndHandlePartitions(resultPartitionIds, (tmID, partitionDescs) -> this.internalReleasePartitions((ResourceID)tmID, (Collection<ResultPartitionDeploymentDescriptor>)partitionDescs, releaseOnShuffleMaster));
    }

    @Override
    public CompletableFuture<Void> stopTrackingAndPromotePartitions(Collection<ResultPartitionID> resultPartitionIds) {
        ArrayList promoteFutures = new ArrayList();
        this.stopTrackingAndHandlePartitions(resultPartitionIds, (tmID, partitionDescs) -> promoteFutures.add(this.internalPromotePartitionsOnTaskExecutor((ResourceID)tmID, (Collection<ResultPartitionDeploymentDescriptor>)partitionDescs)));
        return FutureUtils.completeAll(promoteFutures);
    }

    @Override
    public Collection<ResultPartitionDeploymentDescriptor> getAllTrackedPartitions() {
        return this.partitionInfos.values().stream().map(AbstractPartitionTracker.PartitionInfo::getMetaInfo).collect(Collectors.toList());
    }

    @Override
    public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
        this.resourceManagerGateway = resourceManagerGateway;
    }

    @Override
    public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(IntermediateDataSetID intermediateDataSetID) {
        return this.clusterPartitionShuffleDescriptors.computeIfAbsent(intermediateDataSetID, this::requestShuffleDescriptorsFromResourceManager);
    }

    private List<ShuffleDescriptor> requestShuffleDescriptorsFromResourceManager(IntermediateDataSetID intermediateDataSetID) {
        Preconditions.checkNotNull((Object)this.resourceManagerGateway, (String)"JobMaster is not connected to ResourceManager");
        try {
            return (List)this.resourceManagerGateway.getClusterPartitionsShuffleDescriptors(intermediateDataSetID).get();
        }
        catch (Throwable e) {
            throw new RuntimeException(String.format("Failed to get shuffle descriptors of intermediate dataset %s from ResourceManager", intermediateDataSetID), e);
        }
    }

    private void stopTrackingAndHandlePartitions(Collection<ResultPartitionID> resultPartitionIds, BiConsumer<ResourceID, Collection<ResultPartitionDeploymentDescriptor>> partitionHandler) {
        Preconditions.checkNotNull(resultPartitionIds);
        Map partitionsToReleaseByResourceId = this.stopTrackingPartitions(resultPartitionIds).stream().collect(Collectors.groupingBy(PartitionTrackerEntry::getKey, Collectors.mapping(PartitionTrackerEntry::getMetaInfo, Collectors.toList())));
        partitionsToReleaseByResourceId.forEach(partitionHandler);
    }

    private void internalReleasePartitions(ResourceID potentialPartitionLocation, Collection<ResultPartitionDeploymentDescriptor> partitionDeploymentDescriptors, boolean releaseOnShuffleMaster) {
        this.internalReleasePartitionsOnTaskExecutor(potentialPartitionLocation, partitionDeploymentDescriptors);
        if (releaseOnShuffleMaster) {
            this.internalReleasePartitionsOnShuffleMaster(partitionDeploymentDescriptors.stream());
        }
    }

    private CompletableFuture<Acknowledge> internalPromotePartitionsOnTaskExecutor(ResourceID potentialPartitionLocation, Collection<ResultPartitionDeploymentDescriptor> clusterPartitionDeploymentDescriptors) {
        TaskExecutorGateway taskExecutorGateway;
        Set<ResultPartitionID> partitionsRequiringRpcPromoteCalls = clusterPartitionDeploymentDescriptors.stream().filter(JobMasterPartitionTrackerImpl::isPartitionWithLocalResources).map(JobMasterPartitionTrackerImpl::getResultPartitionId).collect(Collectors.toSet());
        if (!partitionsRequiringRpcPromoteCalls.isEmpty() && (taskExecutorGateway = (TaskExecutorGateway)this.taskExecutorGatewayLookup.lookup(potentialPartitionLocation).orElse(null)) != null) {
            return taskExecutorGateway.promotePartitions(this.jobId, partitionsRequiringRpcPromoteCalls);
        }
        return CompletableFuture.completedFuture(null);
    }

    private void internalReleasePartitionsOnTaskExecutor(ResourceID potentialPartitionLocation, Collection<ResultPartitionDeploymentDescriptor> partitionDeploymentDescriptors) {
        Set partitionsRequiringRpcReleaseCalls = partitionDeploymentDescriptors.stream().filter(JobMasterPartitionTrackerImpl::isPartitionWithLocalResources).map(JobMasterPartitionTrackerImpl::getResultPartitionId).collect(Collectors.toSet());
        if (!partitionsRequiringRpcReleaseCalls.isEmpty()) {
            this.taskExecutorGatewayLookup.lookup(potentialPartitionLocation).ifPresent(taskExecutorGateway -> taskExecutorGateway.releasePartitions(this.jobId, partitionsRequiringRpcReleaseCalls));
        }
    }

    private void internalReleasePartitionsOnShuffleMaster(Stream<ResultPartitionDeploymentDescriptor> partitionDeploymentDescriptors) {
        partitionDeploymentDescriptors.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor).forEach(this.shuffleMaster::releasePartitionExternally);
    }

    private static boolean isPartitionWithLocalResources(ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
        return resultPartitionDeploymentDescriptor.getShuffleDescriptor().storesLocalResourcesOn().isPresent();
    }

    private static ResultPartitionID getResultPartitionId(ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
        return resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
    }
}

