package org.apache.flink.runtime.deployment;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.types.Either;
import org.apache.flink.util.CompressedSerializedValue;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

/* loaded from: input_file:org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.class */
public class TaskDeploymentDescriptorFactory {

    @Experimental
    public static final ConfigOption<Integer> OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD = ConfigOptions.key("jobmanager.task-deployment.offload-shuffle-descriptors-to-blob-server.threshold-num").intType().defaultValue(Integer.valueOf(BatchShuffleReadBufferPool.NUM_BYTES_PER_REQUEST)).withDescription("Threshold for offloading shuffle descriptors to blob server. Once the number of shuffle descriptors exceeds this value, we will offload the shuffle descriptors to blob server. This default value means JobManager need to serialize and transport 2048 shuffle descriptors (almost 32KB) to 2048 consumers (64MB in total)");
    private final TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> serializedJobInformation;
    private final JobID jobID;
    private final PartitionLocationConstraint partitionDeploymentConstraint;
    private final boolean nonFinishedHybridPartitionShouldBeUnknown;
    private final ShuffleDescriptorSerializer shuffleDescriptorSerializer;

    /* loaded from: input_file:org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory$DefaultShuffleDescriptorSerializer.class */
    private static class DefaultShuffleDescriptorSerializer implements ShuffleDescriptorSerializer {
        private final JobID jobID;
        private final BlobWriter blobWriter;
        private final int offloadShuffleDescriptorsThreshold;

        public DefaultShuffleDescriptorSerializer(JobID jobID, BlobWriter blobWriter, int i) {
            this.jobID = (JobID) Preconditions.checkNotNull(jobID);
            this.blobWriter = (BlobWriter) Preconditions.checkNotNull(blobWriter);
            this.offloadShuffleDescriptorsThreshold = i;
        }

        @Override // org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorSerializer
        public TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptorGroup> serializeAndTryOffloadShuffleDescriptor(ShuffleDescriptorGroup shuffleDescriptorGroup, int i) throws IOException {
            CompressedSerializedValue fromObject = CompressedSerializedValue.fromObject(shuffleDescriptorGroup);
            Either offloadWithException = shouldOffload(shuffleDescriptorGroup.getShuffleDescriptors(), i) ? BlobWriter.offloadWithException(fromObject, this.jobID, this.blobWriter) : Either.Left(fromObject);
            return offloadWithException.isLeft() ? new TaskDeploymentDescriptor.NonOffloaded((SerializedValue) offloadWithException.left()) : new TaskDeploymentDescriptor.Offloaded((PermanentBlobKey) offloadWithException.right());
        }

        private boolean shouldOffload(ShuffleDescriptorAndIndex[] shuffleDescriptorAndIndexArr, int i) {
            return shuffleDescriptorAndIndexArr.length * i >= this.offloadShuffleDescriptorsThreshold;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory$PartitionLocationConstraint.class */
    public enum PartitionLocationConstraint {
        MUST_BE_KNOWN,
        CAN_BE_UNKNOWN;

        public static PartitionLocationConstraint fromJobType(JobType jobType) {
            switch (jobType) {
                case BATCH:
                    return CAN_BE_UNKNOWN;
                case STREAMING:
                    return MUST_BE_KNOWN;
                default:
                    throw new IllegalArgumentException(String.format("Unknown JobType %s. Cannot derive partition location constraint for it.", jobType));
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory$ShuffleDescriptorAndIndex.class */
    public static class ShuffleDescriptorAndIndex implements Serializable {
        private static final long serialVersionUID = 1;
        private final ShuffleDescriptor shuffleDescriptor;
        private final int index;

        public ShuffleDescriptorAndIndex(ShuffleDescriptor shuffleDescriptor, int i) {
            this.shuffleDescriptor = shuffleDescriptor;
            this.index = i;
        }

        public ShuffleDescriptor getShuffleDescriptor() {
            return this.shuffleDescriptor;
        }

        public int getIndex() {
            return this.index;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory$ShuffleDescriptorGroup.class */
    public static class ShuffleDescriptorGroup implements Serializable {
        private static final long serialVersionUID = 1;
        private final ShuffleDescriptorAndIndex[] shuffleDescriptors;

        public ShuffleDescriptorGroup(ShuffleDescriptorAndIndex[] shuffleDescriptorAndIndexArr) {
            this.shuffleDescriptors = (ShuffleDescriptorAndIndex[]) Preconditions.checkNotNull(shuffleDescriptorAndIndexArr);
        }

        public ShuffleDescriptorAndIndex[] getShuffleDescriptors() {
            return this.shuffleDescriptors;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory$ShuffleDescriptorSerializer.class */
    public interface ShuffleDescriptorSerializer {
        TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptorGroup> serializeAndTryOffloadShuffleDescriptor(ShuffleDescriptorGroup shuffleDescriptorGroup, int i) throws IOException;
    }

    public TaskDeploymentDescriptorFactory(Either<SerializedValue<JobInformation>, PermanentBlobKey> either, JobID jobID, PartitionLocationConstraint partitionLocationConstraint, BlobWriter blobWriter, boolean z, int i) {
        this.serializedJobInformation = getSerializedJobInformation(either);
        this.jobID = jobID;
        this.partitionDeploymentConstraint = partitionLocationConstraint;
        this.nonFinishedHybridPartitionShouldBeUnknown = z;
        this.shuffleDescriptorSerializer = new DefaultShuffleDescriptorSerializer(jobID, blobWriter, i);
    }

    public TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> getSerializedJobInformation() {
        return this.serializedJobInformation;
    }

    public TaskDeploymentDescriptor createDeploymentDescriptor(Execution execution, AllocationID allocationID, @Nullable JobManagerTaskRestore jobManagerTaskRestore, Collection<ResultPartitionDeploymentDescriptor> collection) throws IOException, ClusterDatasetCorruptedException {
        ExecutionVertex vertex = execution.getVertex();
        return new TaskDeploymentDescriptor(this.jobID, this.serializedJobInformation, getSerializedTaskInformation(vertex.getJobVertex().getTaskInformationOrBlobKey()), execution.getAttemptId(), allocationID, jobManagerTaskRestore, new ArrayList(collection), createInputGateDeploymentDescriptors(vertex));
    }

    private List<InputGateDeploymentDescriptor> createInputGateDeploymentDescriptors(ExecutionVertex executionVertex) throws IOException, ClusterDatasetCorruptedException {
        List<ConsumedPartitionGroup> allConsumedPartitionGroups = executionVertex.getAllConsumedPartitionGroups();
        ArrayList arrayList = new ArrayList(allConsumedPartitionGroups.size());
        for (ConsumedPartitionGroup consumedPartitionGroup : allConsumedPartitionGroups) {
            IntermediateResult intermediateResult = executionVertex.getExecutionGraphAccessor().getResultPartitionOrThrow(consumedPartitionGroup.getFirst()).getIntermediateResult();
            IntermediateDataSetID id = intermediateResult.getId();
            arrayList.add(new InputGateDeploymentDescriptor(id, intermediateResult.getResultType(), executionVertex.getExecutionVertexInputInfo(id).getSubpartitionIndexRange(), consumedPartitionGroup.size(), getConsumedPartitionShuffleDescriptors(intermediateResult, consumedPartitionGroup, executionVertex.getExecutionGraphAccessor())));
        }
        try {
            for (Map.Entry<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> entry : getClusterPartitionShuffleDescriptors(executionVertex).entrySet()) {
                arrayList.add(new InputGateDeploymentDescriptor(entry.getKey(), ResultPartitionType.BLOCKING_PERSISTENT, 0, entry.getValue()));
            }
            return arrayList;
        } catch (Throwable th) {
            throw new ClusterDatasetCorruptedException(th, executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume());
        }
    }

    private List<TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptorGroup>> getConsumedPartitionShuffleDescriptors(IntermediateResult intermediateResult, ConsumedPartitionGroup consumedPartitionGroup, InternalExecutionGraphAccessor internalExecutionGraphAccessor) throws IOException {
        CachedShuffleDescriptors cachedShuffleDescriptors = intermediateResult.getCachedShuffleDescriptors(consumedPartitionGroup);
        if (cachedShuffleDescriptors == null) {
            cachedShuffleDescriptors = intermediateResult.cacheShuffleDescriptors(consumedPartitionGroup, computeConsumedPartitionShuffleDescriptors(consumedPartitionGroup, internalExecutionGraphAccessor));
        }
        cachedShuffleDescriptors.serializeShuffleDescriptors(this.shuffleDescriptorSerializer);
        return cachedShuffleDescriptors.getAllSerializedShuffleDescriptorGroups();
    }

    private ShuffleDescriptorAndIndex[] computeConsumedPartitionShuffleDescriptors(ConsumedPartitionGroup consumedPartitionGroup, InternalExecutionGraphAccessor internalExecutionGraphAccessor) {
        ShuffleDescriptorAndIndex[] shuffleDescriptorAndIndexArr = new ShuffleDescriptorAndIndex[consumedPartitionGroup.size()];
        int i = 0;
        Iterator<IntermediateResultPartitionID> it = consumedPartitionGroup.iterator();
        while (it.hasNext()) {
            shuffleDescriptorAndIndexArr[i] = new ShuffleDescriptorAndIndex(getConsumedPartitionShuffleDescriptor(internalExecutionGraphAccessor.getResultPartitionOrThrow(it.next()), this.partitionDeploymentConstraint, this.nonFinishedHybridPartitionShouldBeUnknown), i);
            i++;
        }
        return shuffleDescriptorAndIndexArr;
    }

    private static Map<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> getClusterPartitionShuffleDescriptors(ExecutionVertex executionVertex) {
        InternalExecutionGraphAccessor executionGraphAccessor = executionVertex.getExecutionGraphAccessor();
        List<IntermediateDataSetID> intermediateDataSetIdsToConsume = executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume();
        HashMap hashMap = new HashMap();
        for (IntermediateDataSetID intermediateDataSetID : intermediateDataSetIdsToConsume) {
            List<ShuffleDescriptor> clusterPartitionShuffleDescriptors = executionGraphAccessor.getClusterPartitionShuffleDescriptors(intermediateDataSetID);
            Preconditions.checkState(executionVertex.getTotalNumberOfParallelSubtasks() == clusterPartitionShuffleDescriptors.size(), "The parallelism (%s) of the cache consuming job vertex is different from the number of shuffle descriptors (%s) of the intermediate data set", Integer.valueOf(executionVertex.getTotalNumberOfParallelSubtasks()), Integer.valueOf(clusterPartitionShuffleDescriptors.size()));
            hashMap.put(intermediateDataSetID, new ShuffleDescriptorAndIndex[]{new ShuffleDescriptorAndIndex(clusterPartitionShuffleDescriptors.get(executionVertex.getParallelSubtaskIndex()), 0)});
        }
        return hashMap;
    }

    private static TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> getSerializedJobInformation(Either<SerializedValue<JobInformation>, PermanentBlobKey> either) {
        return either.isLeft() ? new TaskDeploymentDescriptor.NonOffloaded(either.left()) : new TaskDeploymentDescriptor.Offloaded(either.right());
    }

    private static TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> getSerializedTaskInformation(Either<SerializedValue<TaskInformation>, PermanentBlobKey> either) {
        return either.isLeft() ? new TaskDeploymentDescriptor.NonOffloaded(either.left()) : new TaskDeploymentDescriptor.Offloaded(either.right());
    }

    public static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(IntermediateResultPartition intermediateResultPartition, PartitionLocationConstraint partitionLocationConstraint, boolean z) {
        Execution partitionProducer = intermediateResultPartition.getProducer().getPartitionProducer();
        return getConsumedPartitionShuffleDescriptor(new ResultPartitionID(intermediateResultPartition.getPartitionId(), partitionProducer.getAttemptId()), intermediateResultPartition.getResultType(), intermediateResultPartition.hasDataAllProduced(), partitionProducer.getState(), partitionLocationConstraint, partitionProducer.getResultPartitionDeploymentDescriptor(intermediateResultPartition.getPartitionId()).orElse(null), z);
    }

    @VisibleForTesting
    static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(ResultPartitionID resultPartitionID, ResultPartitionType resultPartitionType, boolean z, ExecutionState executionState, PartitionLocationConstraint partitionLocationConstraint, @Nullable ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor, boolean z2) {
        if ((!resultPartitionType.canBePipelinedConsumed() && !z) || resultPartitionDeploymentDescriptor == null || !isProducerAvailable(executionState)) {
            if (partitionLocationConstraint == PartitionLocationConstraint.CAN_BE_UNKNOWN) {
                return new UnknownShuffleDescriptor(resultPartitionID);
            }
            throw handleConsumedPartitionShuffleDescriptorErrors(resultPartitionID, resultPartitionType, z, executionState);
        }
        if (!resultPartitionType.isHybridResultPartition() || !z2 || executionState == ExecutionState.FINISHED) {
            return resultPartitionDeploymentDescriptor.getShuffleDescriptor();
        }
        Preconditions.checkState(partitionLocationConstraint == PartitionLocationConstraint.CAN_BE_UNKNOWN, "partition location constraint should allow unknown shuffle descriptor when nonFinishedHybridPartitionShouldBeUnknown is true.");
        return new UnknownShuffleDescriptor(resultPartitionID);
    }

    private static RuntimeException handleConsumedPartitionShuffleDescriptorErrors(ResultPartitionID resultPartitionID, ResultPartitionType resultPartitionType, boolean z, ExecutionState executionState) {
        return new IllegalStateException(isProducerFailedOrCanceled(executionState) ? "Trying to consume an input partition whose producer has been canceled or failed. The producer is in state " + executionState + "." : String.format("Trying to consume an input partition whose producer is not ready (result type: %s, hasAllDataProduced: %s, producer state: %s, partition id: %s).", resultPartitionType, Boolean.valueOf(z), executionState, resultPartitionID));
    }

    private static boolean isProducerAvailable(ExecutionState executionState) {
        return executionState == ExecutionState.RUNNING || executionState == ExecutionState.INITIALIZING || executionState == ExecutionState.FINISHED || executionState == ExecutionState.SCHEDULED || executionState == ExecutionState.DEPLOYING;
    }

    private static boolean isProducerFailedOrCanceled(ExecutionState executionState) {
        return executionState == ExecutionState.CANCELING || executionState == ExecutionState.CANCELED || executionState == ExecutionState.FAILED;
    }
}
