package org.apache.flink.runtime.deployment;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertexInputInfo;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.types.Either;
import org.apache.flink.util.CompressedSerializedValue;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

/* loaded from: input_file:org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.class */
public class TaskDeploymentDescriptorFactory {
    private final ExecutionAttemptID executionId;
    private final TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> serializedJobInformation;
    private final TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> taskInfo;
    private final JobID jobID;
    private final PartitionLocationConstraint partitionDeploymentConstraint;
    private final List<ConsumedPartitionGroup> consumedPartitionGroups;
    private final Function<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitionRetriever;
    private final BlobWriter blobWriter;
    private final Map<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> consumedClusterPartitionShuffleDescriptors;
    private final Function<IntermediateDataSetID, ExecutionVertexInputInfo> executionVertexInputInfoRetriever;
    private final boolean nonFinishedHybridPartitionShouldBeUnknown;

    /* loaded from: input_file:org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory$PartitionLocationConstraint.class */
    public enum PartitionLocationConstraint {
        MUST_BE_KNOWN,
        CAN_BE_UNKNOWN;

        public static PartitionLocationConstraint fromJobType(JobType jobType) {
            switch (jobType) {
                case BATCH:
                    return CAN_BE_UNKNOWN;
                case STREAMING:
                    return MUST_BE_KNOWN;
                default:
                    throw new IllegalArgumentException(String.format("Unknown JobType %s. Cannot derive partition location constraint for it.", jobType));
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory$ShuffleDescriptorAndIndex.class */
    public static class ShuffleDescriptorAndIndex implements Serializable {
        private static final long serialVersionUID = 1;
        private final ShuffleDescriptor shuffleDescriptor;
        private final int index;

        public ShuffleDescriptorAndIndex(ShuffleDescriptor shuffleDescriptor, int i) {
            this.shuffleDescriptor = shuffleDescriptor;
            this.index = i;
        }

        public ShuffleDescriptor getShuffleDescriptor() {
            return this.shuffleDescriptor;
        }

        public int getIndex() {
            return this.index;
        }
    }

    private TaskDeploymentDescriptorFactory(ExecutionAttemptID executionAttemptID, TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> maybeOffloaded, TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> maybeOffloaded2, JobID jobID, PartitionLocationConstraint partitionLocationConstraint, List<ConsumedPartitionGroup> list, Function<IntermediateResultPartitionID, IntermediateResultPartition> function, BlobWriter blobWriter, boolean z, Map<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> map, Function<IntermediateDataSetID, ExecutionVertexInputInfo> function2) {
        this.executionId = executionAttemptID;
        this.serializedJobInformation = maybeOffloaded;
        this.taskInfo = maybeOffloaded2;
        this.jobID = jobID;
        this.partitionDeploymentConstraint = partitionLocationConstraint;
        this.consumedPartitionGroups = list;
        this.resultPartitionRetriever = function;
        this.blobWriter = blobWriter;
        this.nonFinishedHybridPartitionShouldBeUnknown = z;
        this.consumedClusterPartitionShuffleDescriptors = map;
        this.executionVertexInputInfoRetriever = (Function) Preconditions.checkNotNull(function2);
    }

    public TaskDeploymentDescriptor createDeploymentDescriptor(AllocationID allocationID, @Nullable JobManagerTaskRestore jobManagerTaskRestore, Collection<ResultPartitionDeploymentDescriptor> collection) throws IOException {
        return new TaskDeploymentDescriptor(this.jobID, this.serializedJobInformation, this.taskInfo, this.executionId, allocationID, jobManagerTaskRestore, new ArrayList(collection), createInputGateDeploymentDescriptors());
    }

    private List<InputGateDeploymentDescriptor> createInputGateDeploymentDescriptors() throws IOException {
        ArrayList arrayList = new ArrayList(this.consumedPartitionGroups.size());
        for (ConsumedPartitionGroup consumedPartitionGroup : this.consumedPartitionGroups) {
            IntermediateResult intermediateResult = this.resultPartitionRetriever.apply(consumedPartitionGroup.getFirst()).getIntermediateResult();
            IntermediateDataSetID id = intermediateResult.getId();
            arrayList.add(new InputGateDeploymentDescriptor(id, intermediateResult.getResultType(), this.executionVertexInputInfoRetriever.apply(id).getSubpartitionIndexRange(), consumedPartitionGroup.size(), getConsumedPartitionShuffleDescriptors(intermediateResult, consumedPartitionGroup)));
        }
        for (Map.Entry<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> entry : this.consumedClusterPartitionShuffleDescriptors.entrySet()) {
            arrayList.add(new InputGateDeploymentDescriptor(entry.getKey(), ResultPartitionType.BLOCKING_PERSISTENT, 0, entry.getValue()));
        }
        return arrayList;
    }

    private List<TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptorAndIndex[]>> getConsumedPartitionShuffleDescriptors(IntermediateResult intermediateResult, ConsumedPartitionGroup consumedPartitionGroup) throws IOException {
        CachedShuffleDescriptors cachedShuffleDescriptors = intermediateResult.getCachedShuffleDescriptors(consumedPartitionGroup);
        if (cachedShuffleDescriptors == null) {
            cachedShuffleDescriptors = intermediateResult.cacheShuffleDescriptors(consumedPartitionGroup, computeConsumedPartitionShuffleDescriptors(consumedPartitionGroup));
        }
        cachedShuffleDescriptors.serializeShuffleDescriptors(this::serializeAndTryOffloadShuffleDescriptor);
        return cachedShuffleDescriptors.getAllSerializedShuffleDescriptors();
    }

    private ShuffleDescriptorAndIndex[] computeConsumedPartitionShuffleDescriptors(ConsumedPartitionGroup consumedPartitionGroup) {
        ShuffleDescriptorAndIndex[] shuffleDescriptorAndIndexArr = new ShuffleDescriptorAndIndex[consumedPartitionGroup.size()];
        int i = 0;
        Iterator<IntermediateResultPartitionID> it = consumedPartitionGroup.iterator();
        while (it.hasNext()) {
            shuffleDescriptorAndIndexArr[i] = new ShuffleDescriptorAndIndex(getConsumedPartitionShuffleDescriptor(this.resultPartitionRetriever.apply(it.next()), this.partitionDeploymentConstraint, this.nonFinishedHybridPartitionShouldBeUnknown), i);
            i++;
        }
        return shuffleDescriptorAndIndexArr;
    }

    private TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptorAndIndex[]> serializeAndTryOffloadShuffleDescriptor(ShuffleDescriptorAndIndex[] shuffleDescriptorAndIndexArr) throws IOException {
        Either tryOffload = BlobWriter.tryOffload(CompressedSerializedValue.fromObject(shuffleDescriptorAndIndexArr), this.jobID, this.blobWriter);
        return tryOffload.isLeft() ? new TaskDeploymentDescriptor.NonOffloaded((SerializedValue) tryOffload.left()) : new TaskDeploymentDescriptor.Offloaded((PermanentBlobKey) tryOffload.right());
    }

    public static TaskDeploymentDescriptorFactory fromExecution(Execution execution) throws IOException, ClusterDatasetCorruptedException {
        ExecutionVertex vertex = execution.getVertex();
        InternalExecutionGraphAccessor executionGraphAccessor = vertex.getExecutionGraphAccessor();
        try {
            Map<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> clusterPartitionShuffleDescriptors = getClusterPartitionShuffleDescriptors(vertex);
            ExecutionAttemptID attemptId = execution.getAttemptId();
            TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> serializedJobInformation = getSerializedJobInformation(executionGraphAccessor);
            TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> serializedTaskInformation = getSerializedTaskInformation(vertex.getJobVertex().getTaskInformationOrBlobKey());
            JobID jobID = executionGraphAccessor.getJobID();
            PartitionLocationConstraint partitionLocationConstraint = executionGraphAccessor.getPartitionLocationConstraint();
            List<ConsumedPartitionGroup> allConsumedPartitionGroups = vertex.getAllConsumedPartitionGroups();
            executionGraphAccessor.getClass();
            Function function = executionGraphAccessor::getResultPartitionOrThrow;
            BlobWriter blobWriter = executionGraphAccessor.getBlobWriter();
            boolean isNonFinishedHybridPartitionShouldBeUnknown = executionGraphAccessor.isNonFinishedHybridPartitionShouldBeUnknown();
            vertex.getClass();
            return new TaskDeploymentDescriptorFactory(attemptId, serializedJobInformation, serializedTaskInformation, jobID, partitionLocationConstraint, allConsumedPartitionGroups, function, blobWriter, isNonFinishedHybridPartitionShouldBeUnknown, clusterPartitionShuffleDescriptors, vertex::getExecutionVertexInputInfo);
        } catch (Throwable th) {
            throw new ClusterDatasetCorruptedException(th, vertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume());
        }
    }

    private static Map<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> getClusterPartitionShuffleDescriptors(ExecutionVertex executionVertex) {
        InternalExecutionGraphAccessor executionGraphAccessor = executionVertex.getExecutionGraphAccessor();
        List<IntermediateDataSetID> intermediateDataSetIdsToConsume = executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume();
        HashMap hashMap = new HashMap();
        for (IntermediateDataSetID intermediateDataSetID : intermediateDataSetIdsToConsume) {
            List<ShuffleDescriptor> clusterPartitionShuffleDescriptors = executionGraphAccessor.getClusterPartitionShuffleDescriptors(intermediateDataSetID);
            Preconditions.checkState(executionVertex.getTotalNumberOfParallelSubtasks() == clusterPartitionShuffleDescriptors.size(), "The parallelism (%s) of the cache consuming job vertex is different from the number of shuffle descriptors (%s) of the intermediate data set", new Object[]{Integer.valueOf(executionVertex.getTotalNumberOfParallelSubtasks()), Integer.valueOf(clusterPartitionShuffleDescriptors.size())});
            hashMap.put(intermediateDataSetID, new ShuffleDescriptorAndIndex[]{new ShuffleDescriptorAndIndex(clusterPartitionShuffleDescriptors.get(executionVertex.getParallelSubtaskIndex()), 0)});
        }
        return hashMap;
    }

    private static TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> getSerializedJobInformation(InternalExecutionGraphAccessor internalExecutionGraphAccessor) {
        Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey = internalExecutionGraphAccessor.getJobInformationOrBlobKey();
        return jobInformationOrBlobKey.isLeft() ? new TaskDeploymentDescriptor.NonOffloaded((SerializedValue) jobInformationOrBlobKey.left()) : new TaskDeploymentDescriptor.Offloaded((PermanentBlobKey) jobInformationOrBlobKey.right());
    }

    private static TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> getSerializedTaskInformation(Either<SerializedValue<TaskInformation>, PermanentBlobKey> either) {
        return either.isLeft() ? new TaskDeploymentDescriptor.NonOffloaded((SerializedValue) either.left()) : new TaskDeploymentDescriptor.Offloaded((PermanentBlobKey) either.right());
    }

    public static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(IntermediateResultPartition intermediateResultPartition, PartitionLocationConstraint partitionLocationConstraint, boolean z) {
        Execution partitionProducer = intermediateResultPartition.getProducer().getPartitionProducer();
        return getConsumedPartitionShuffleDescriptor(new ResultPartitionID(intermediateResultPartition.getPartitionId(), partitionProducer.getAttemptId()), intermediateResultPartition.getResultType(), intermediateResultPartition.hasDataAllProduced(), partitionProducer.getState(), partitionLocationConstraint, partitionProducer.getResultPartitionDeploymentDescriptor(intermediateResultPartition.getPartitionId()).orElse(null), z);
    }

    @VisibleForTesting
    static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(ResultPartitionID resultPartitionID, ResultPartitionType resultPartitionType, boolean z, ExecutionState executionState, PartitionLocationConstraint partitionLocationConstraint, @Nullable ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor, boolean z2) {
        if ((!resultPartitionType.canBePipelinedConsumed() && !z) || resultPartitionDeploymentDescriptor == null || !isProducerAvailable(executionState)) {
            if (partitionLocationConstraint == PartitionLocationConstraint.CAN_BE_UNKNOWN) {
                return new UnknownShuffleDescriptor(resultPartitionID);
            }
            throw handleConsumedPartitionShuffleDescriptorErrors(resultPartitionID, resultPartitionType, z, executionState);
        }
        if (!resultPartitionType.isHybridResultPartition() || !z2 || executionState == ExecutionState.FINISHED) {
            return resultPartitionDeploymentDescriptor.getShuffleDescriptor();
        }
        Preconditions.checkState(partitionLocationConstraint == PartitionLocationConstraint.CAN_BE_UNKNOWN, "partition location constraint should allow unknown shuffle descriptor when nonFinishedHybridPartitionShouldBeUnknown is true.");
        return new UnknownShuffleDescriptor(resultPartitionID);
    }

    private static RuntimeException handleConsumedPartitionShuffleDescriptorErrors(ResultPartitionID resultPartitionID, ResultPartitionType resultPartitionType, boolean z, ExecutionState executionState) {
        return new IllegalStateException(isProducerFailedOrCanceled(executionState) ? "Trying to consume an input partition whose producer has been canceled or failed. The producer is in state " + executionState + ScopeFormat.SCOPE_SEPARATOR : String.format("Trying to consume an input partition whose producer is not ready (result type: %s, hasAllDataProduced: %s, producer state: %s, partition id: %s).", resultPartitionType, Boolean.valueOf(z), executionState, resultPartitionID));
    }

    private static boolean isProducerAvailable(ExecutionState executionState) {
        return executionState == ExecutionState.RUNNING || executionState == ExecutionState.INITIALIZING || executionState == ExecutionState.FINISHED || executionState == ExecutionState.SCHEDULED || executionState == ExecutionState.DEPLOYING;
    }

    private static boolean isProducerFailedOrCanceled(ExecutionState executionState) {
        return executionState == ExecutionState.CANCELING || executionState == ExecutionState.CANCELED || executionState == ExecutionState.FAILED;
    }
}
