/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.EdgeManagerBuildUtil;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.executiongraph.SubtaskAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.adaptivebatch.ExecutionPlanSchedulingContext;
import org.apache.flink.runtime.scheduler.adaptivebatch.NonAdaptiveExecutionPlanSchedulingContext;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.types.Either;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

public class ExecutionJobVertex
implements AccessExecutionJobVertex,
Archiveable<ArchivedExecutionJobVertex> {
    private static final Logger LOG = DefaultExecutionGraph.LOG;
    private final Object stateMonitor = new Object();
    private final InternalExecutionGraphAccessor graph;
    private final JobVertex jobVertex;
    @Nullable
    private ExecutionVertex[] taskVertices;
    @Nullable
    private IntermediateResult[] producedDataSets;
    @Nullable
    private List<IntermediateResult> inputs;
    private final VertexParallelismInformation parallelismInfo;
    private final SlotSharingGroup slotSharingGroup;
    @Nullable
    private final CoLocationGroup coLocationGroup;
    @Nullable
    private InputSplit[] inputSplits;
    private final ResourceProfile resourceProfile;
    private int numExecutionVertexFinished;
    private Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey = null;
    private final Collection<OperatorCoordinatorHolder> operatorCoordinators;
    @Nullable
    private InputSplitAssigner splitAssigner;

    @VisibleForTesting
    public ExecutionJobVertex(InternalExecutionGraphAccessor graph, JobVertex jobVertex, VertexParallelismInformation parallelismInfo, CoordinatorStore coordinatorStore, JobManagerJobMetricGroup jobManagerJobMetricGroup) throws JobException {
        if (graph == null || jobVertex == null) {
            throw new NullPointerException();
        }
        this.graph = graph;
        this.jobVertex = jobVertex;
        this.parallelismInfo = parallelismInfo;
        if (this.parallelismInfo.getParallelism() > this.parallelismInfo.getMaxParallelism()) {
            throw new JobException(String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.", jobVertex.getName(), this.parallelismInfo.getParallelism(), this.parallelismInfo.getMaxParallelism()));
        }
        this.resourceProfile = ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), MemorySize.ZERO);
        this.slotSharingGroup = (SlotSharingGroup)Preconditions.checkNotNull((Object)jobVertex.getSlotSharingGroup());
        this.coLocationGroup = jobVertex.getCoLocationGroup();
        List<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders = this.getJobVertex().getOperatorCoordinators();
        if (coordinatorProviders.isEmpty()) {
            this.operatorCoordinators = Collections.emptyList();
        } else {
            ArrayList<OperatorCoordinatorHolder> coordinators = new ArrayList<OperatorCoordinatorHolder>(coordinatorProviders.size());
            try {
                for (SerializedValue<OperatorCoordinator.Provider> provider : coordinatorProviders) {
                    coordinators.add(this.createOperatorCoordinatorHolder(provider, graph.getUserClassLoader(), coordinatorStore, jobManagerJobMetricGroup));
                }
            }
            catch (Exception | LinkageError e) {
                IOUtils.closeAllQuietly(coordinators);
                throw new JobException("Cannot instantiate the coordinator for operator " + this.getName(), e);
            }
            this.operatorCoordinators = Collections.unmodifiableList(coordinators);
        }
    }

    @VisibleForTesting
    protected void initialize(int executionHistorySizeLimit, Duration timeout, long createTimestamp, SubtaskAttemptNumberStore initialAttemptCounts) throws JobException {
        this.initialize(executionHistorySizeLimit, timeout, createTimestamp, initialAttemptCounts, NonAdaptiveExecutionPlanSchedulingContext.INSTANCE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void initialize(int executionHistorySizeLimit, Duration timeout, long createTimestamp, SubtaskAttemptNumberStore initialAttemptCounts, ExecutionPlanSchedulingContext executionPlanSchedulingContext) throws JobException {
        block10: {
            int i;
            Preconditions.checkState((this.parallelismInfo.getParallelism() > 0 ? 1 : 0) != 0);
            Preconditions.checkState((!this.isInitialized() ? 1 : 0) != 0);
            this.taskVertices = new ExecutionVertex[this.parallelismInfo.getParallelism()];
            this.inputs = new ArrayList<IntermediateResult>(this.jobVertex.getInputs().size());
            this.producedDataSets = new IntermediateResult[this.jobVertex.getNumberOfProducedIntermediateDataSets()];
            for (i = 0; i < this.jobVertex.getProducedDataSets().size(); ++i) {
                IntermediateDataSet result = this.jobVertex.getProducedDataSets().get(i);
                this.producedDataSets[i] = new IntermediateResult(result, this, this.parallelismInfo.getParallelism(), result.getResultType(), executionPlanSchedulingContext);
            }
            for (i = 0; i < this.parallelismInfo.getParallelism(); ++i) {
                ExecutionVertex vertex;
                this.taskVertices[i] = vertex = this.createExecutionVertex(this, i, this.producedDataSets, timeout, createTimestamp, executionHistorySizeLimit, initialAttemptCounts.getAttemptCount(i));
            }
            for (IntermediateResult ir : this.producedDataSets) {
                if (ir.getNumberOfAssignedPartitions() == this.parallelismInfo.getParallelism()) continue;
                throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");
            }
            try {
                InputSplitSource<?> splitSource = this.jobVertex.getInputSplitSource();
                if (splitSource != null) {
                    Thread currentThread = Thread.currentThread();
                    ClassLoader oldContextClassLoader = currentThread.getContextClassLoader();
                    currentThread.setContextClassLoader(this.graph.getUserClassLoader());
                    try {
                        this.inputSplits = splitSource.createInputSplits(this.parallelismInfo.getParallelism());
                        if (this.inputSplits != null) {
                            this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
                        }
                        break block10;
                    }
                    finally {
                        currentThread.setContextClassLoader(oldContextClassLoader);
                    }
                }
                this.inputSplits = null;
            }
            catch (Throwable t) {
                throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);
            }
        }
    }

    protected ExecutionVertex createExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets, Duration timeout, long createTimestamp, int executionHistorySizeLimit, int initialAttemptCount) {
        return new ExecutionVertex(jobVertex, subTaskIndex, producedDataSets, timeout, createTimestamp, executionHistorySizeLimit, initialAttemptCount);
    }

    protected OperatorCoordinatorHolder createOperatorCoordinatorHolder(SerializedValue<OperatorCoordinator.Provider> provider, ClassLoader classLoader, CoordinatorStore coordinatorStore, JobManagerJobMetricGroup jobManagerJobMetricGroup) throws Exception {
        return OperatorCoordinatorHolder.create(provider, this, classLoader, coordinatorStore, false, this.getTaskInformation(), jobManagerJobMetricGroup);
    }

    public boolean isInitialized() {
        return this.taskVertices != null;
    }

    public boolean isParallelismDecided() {
        return this.parallelismInfo.getParallelism() > 0;
    }

    public List<OperatorIDPair> getOperatorIDs() {
        return this.jobVertex.getOperatorIDs();
    }

    public void setMaxParallelism(int maxParallelism) {
        this.parallelismInfo.setMaxParallelism(maxParallelism);
    }

    public InternalExecutionGraphAccessor getGraph() {
        return this.graph;
    }

    public void setParallelism(int parallelism) {
        this.parallelismInfo.setParallelism(parallelism);
    }

    public JobVertex getJobVertex() {
        return this.jobVertex;
    }

    @Override
    public String getName() {
        return this.getJobVertex().getName();
    }

    @Override
    public int getParallelism() {
        return this.parallelismInfo.getParallelism();
    }

    @Override
    public int getMaxParallelism() {
        return this.parallelismInfo.getMaxParallelism();
    }

    @Override
    public ResourceProfile getResourceProfile() {
        return this.resourceProfile;
    }

    public boolean canRescaleMaxParallelism(int desiredMaxParallelism) {
        return this.parallelismInfo.canRescaleMaxParallelism(desiredMaxParallelism);
    }

    public JobID getJobId() {
        return this.graph.getJobID();
    }

    @Override
    public JobVertexID getJobVertexId() {
        return this.jobVertex.getID();
    }

    public ExecutionVertex[] getTaskVertices() {
        if (this.taskVertices == null) {
            LOG.debug("Trying to get execution vertices of an uninitialized job vertex " + String.valueOf(this.getJobVertexId()));
            return new ExecutionVertex[0];
        }
        return this.taskVertices;
    }

    public IntermediateResult[] getProducedDataSets() {
        Preconditions.checkState((boolean)this.isInitialized());
        return this.producedDataSets;
    }

    public InputSplitAssigner getSplitAssigner() {
        Preconditions.checkState((boolean)this.isInitialized());
        return this.splitAssigner;
    }

    @Override
    public SlotSharingGroup getSlotSharingGroup() {
        return this.slotSharingGroup;
    }

    @Nullable
    public CoLocationGroup getCoLocationGroup() {
        return this.coLocationGroup;
    }

    public List<IntermediateResult> getInputs() {
        Preconditions.checkState((boolean)this.isInitialized());
        return this.inputs;
    }

    public Collection<OperatorCoordinatorHolder> getOperatorCoordinators() {
        Preconditions.checkState((boolean)this.isInitialized());
        return this.operatorCoordinators;
    }

    public List<SourceCoordinator<?, ?>> getSourceCoordinators() {
        ArrayList sourceCoordinators = new ArrayList();
        for (OperatorCoordinatorHolder oph : this.operatorCoordinators) {
            if (!(oph.coordinator() instanceof RecreateOnResetOperatorCoordinator)) continue;
            RecreateOnResetOperatorCoordinator opc = (RecreateOnResetOperatorCoordinator)oph.coordinator();
            try {
                if (!(opc.getInternalCoordinator() instanceof SourceCoordinator)) continue;
                sourceCoordinators.add((SourceCoordinator)opc.getInternalCoordinator());
            }
            catch (Throwable e) {
                throw new RuntimeException("Unexpected error occurred when get sourceCoordinators.", e);
            }
        }
        return sourceCoordinators;
    }

    int getNumExecutionVertexFinished() {
        return this.numExecutionVertexFinished;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Either<SerializedValue<TaskInformation>, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException {
        Object object = this.stateMonitor;
        synchronized (object) {
            if (this.taskInformationOrBlobKey == null) {
                BlobWriter blobWriter = this.graph.getBlobWriter();
                TaskInformation taskInformation = this.getTaskInformation();
                this.taskInformationOrBlobKey = BlobWriter.serializeAndTryOffload(taskInformation, this.getJobId(), blobWriter);
            }
            return this.taskInformationOrBlobKey;
        }
    }

    public TaskInformation getTaskInformation() {
        return new TaskInformation(this.jobVertex.getID(), this.jobVertex.getName(), this.parallelismInfo.getParallelism(), this.parallelismInfo.getMaxParallelism(), this.jobVertex.getInvokableClassName(), this.jobVertex.getConfiguration());
    }

    @Override
    public ExecutionState getAggregateState() {
        int[] num = new int[ExecutionState.values().length];
        for (ExecutionVertex vertex : this.getTaskVertices()) {
            int n = vertex.getExecutionState().ordinal();
            num[n] = num[n] + 1;
        }
        return ExecutionJobVertex.getAggregateJobVertexState(num, this.parallelismInfo.getParallelism());
    }

    public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
        Preconditions.checkState((boolean)this.isInitialized());
        List<JobEdge> inputs = this.jobVertex.getInputs();
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", this.jobVertex.getID(), this.jobVertex.getName(), inputs.size()));
        }
        for (int num = 0; num < inputs.size(); ++num) {
            IntermediateResult ires;
            JobEdge edge = inputs.get(num);
            if (LOG.isDebugEnabled()) {
                if (edge.getSource() == null) {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.", num, this.jobVertex.getID(), this.jobVertex.getName(), edge.getSourceId()));
                } else {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).", num, this.jobVertex.getID(), this.jobVertex.getName(), edge.getSource().getProducer().getID(), edge.getSource().getProducer().getName()));
                }
            }
            if ((ires = intermediateDataSets.get(edge.getSourceId())) == null) {
                throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID " + String.valueOf(edge.getSourceId()));
            }
            this.inputs.add(ires);
            EdgeManagerBuildUtil.connectVertexToResult(this, ires);
        }
    }

    public void cancel() {
        for (ExecutionVertex ev : this.getTaskVertices()) {
            ev.cancel();
        }
    }

    public CompletableFuture<Void> cancelWithFuture() {
        return FutureUtils.waitForAll(this.mapExecutionVertices(ExecutionVertex::cancel));
    }

    public CompletableFuture<Void> suspend() {
        return FutureUtils.waitForAll(this.mapExecutionVertices(ExecutionVertex::suspend));
    }

    @Nonnull
    private Collection<CompletableFuture<?>> mapExecutionVertices(Function<ExecutionVertex, CompletableFuture<?>> mapFunction) {
        return Arrays.stream(this.getTaskVertices()).map(mapFunction).collect(Collectors.toList());
    }

    public void fail(Throwable t) {
        for (ExecutionVertex ev : this.getTaskVertices()) {
            ev.fail(t);
        }
    }

    void executionVertexFinished() {
        Preconditions.checkState((boolean)this.isInitialized());
        ++this.numExecutionVertexFinished;
        if (this.numExecutionVertexFinished == this.parallelismInfo.getParallelism()) {
            this.getGraph().jobVertexFinished();
        }
    }

    void executionVertexUnFinished() {
        Preconditions.checkState((boolean)this.isInitialized());
        if (this.numExecutionVertexFinished == this.parallelismInfo.getParallelism()) {
            this.getGraph().jobVertexUnFinished();
        }
        --this.numExecutionVertexFinished;
    }

    public boolean isFinished() {
        return this.isParallelismDecided() && this.numExecutionVertexFinished == this.parallelismInfo.getParallelism();
    }

    @Override
    public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified() {
        HashMap userAccumulators = new HashMap();
        for (ExecutionVertex vertex : this.getTaskVertices()) {
            Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
            if (next == null) continue;
            AccumulatorHelper.mergeInto(userAccumulators, next);
        }
        return StringifiedAccumulatorResult.stringifyAccumulatorResults(userAccumulators);
    }

    public ArchivedExecutionJobVertex archive() {
        return new ArchivedExecutionJobVertex(this);
    }

    public static ExecutionState getAggregateJobVertexState(int[] verticesPerState, int parallelism) {
        if (verticesPerState == null || verticesPerState.length != ExecutionState.values().length) {
            throw new IllegalArgumentException("Must provide an array as large as there are execution states.");
        }
        if (verticesPerState[ExecutionState.FAILED.ordinal()] > 0) {
            return ExecutionState.FAILED;
        }
        if (verticesPerState[ExecutionState.CANCELING.ordinal()] > 0) {
            return ExecutionState.CANCELING;
        }
        if (verticesPerState[ExecutionState.CANCELED.ordinal()] > 0) {
            return ExecutionState.CANCELED;
        }
        if (verticesPerState[ExecutionState.INITIALIZING.ordinal()] > 0) {
            return ExecutionState.INITIALIZING;
        }
        if (verticesPerState[ExecutionState.RUNNING.ordinal()] > 0) {
            return ExecutionState.RUNNING;
        }
        if (verticesPerState[ExecutionState.FINISHED.ordinal()] > 0) {
            return verticesPerState[ExecutionState.FINISHED.ordinal()] == parallelism ? ExecutionState.FINISHED : ExecutionState.RUNNING;
        }
        return ExecutionState.CREATED;
    }

    public static class Factory {
        ExecutionJobVertex createExecutionJobVertex(InternalExecutionGraphAccessor graph, JobVertex jobVertex, VertexParallelismInformation parallelismInfo, CoordinatorStore coordinatorStore, JobManagerJobMetricGroup jobManagerJobMetricGroup) throws JobException {
            return new ExecutionJobVertex(graph, jobVertex, parallelismInfo, coordinatorStore, jobManagerJobMetricGroup);
        }
    }
}

