package org.apache.flink.runtime.checkpoint;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.util.FlinkRuntimeException;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/VertexFinishedStateChecker.class */
public class VertexFinishedStateChecker {
    private final Set<ExecutionJobVertex> vertices;
    private final Map<OperatorID, OperatorState> operatorStates;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/VertexFinishedStateChecker$VertexFinishedState.class */
    public enum VertexFinishedState {
        ALL_RUNNING,
        PARTIALLY_FINISHED,
        FULLY_FINISHED
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/VertexFinishedStateChecker$VerticesFinishedStatusCache.class */
    public static class VerticesFinishedStatusCache {
        private final Map<OperatorID, OperatorState> operatorStates;
        private final Map<JobVertexID, VertexFinishedState> finishedCache;

        private VerticesFinishedStatusCache(Map<OperatorID, OperatorState> map) {
            this.finishedCache = new HashMap();
            this.operatorStates = map;
        }

        public VertexFinishedState getOrUpdate(ExecutionJobVertex executionJobVertex) {
            return this.finishedCache.computeIfAbsent(executionJobVertex.getJobVertexId(), jobVertexID -> {
                return calculateFinishedState(executionJobVertex, this.operatorStates);
            });
        }

        private VertexFinishedState calculateFinishedState(ExecutionJobVertex executionJobVertex, Map<OperatorID, OperatorState> map) {
            Set set = (Set) executionJobVertex.getOperatorIDs().stream().map(operatorIDPair -> {
                return checkOperatorFinishedStatus(map, operatorIDPair);
            }).collect(Collectors.toSet());
            if (set.size() != 1) {
                throw new FlinkRuntimeException("Can not restore vertex " + executionJobVertex.getName() + "(" + executionJobVertex.getJobVertexId() + ") which contain mixed operator finished state: " + set.stream().sorted().collect(Collectors.toList()));
            }
            return (VertexFinishedState) set.iterator().next();
        }

        private VertexFinishedState checkOperatorFinishedStatus(Map<OperatorID, OperatorState> map, OperatorIDPair operatorIDPair) {
            Optional<OperatorID> userDefinedOperatorID = operatorIDPair.getUserDefinedOperatorID();
            Objects.requireNonNull(map);
            return (VertexFinishedState) Optional.ofNullable(map.get(userDefinedOperatorID.filter((v1) -> {
                return r1.containsKey(v1);
            }).orElse(operatorIDPair.getGeneratedOperatorID()))).map(operatorState -> {
                return operatorState.isFullyFinished() ? VertexFinishedState.FULLY_FINISHED : operatorState.getSubtaskStates().values().stream().anyMatch((v0) -> {
                    return v0.isFinished();
                }) ? VertexFinishedState.PARTIALLY_FINISHED : VertexFinishedState.ALL_RUNNING;
            }).orElse(VertexFinishedState.ALL_RUNNING);
        }
    }

    public VertexFinishedStateChecker(Set<ExecutionJobVertex> set, Map<OperatorID, OperatorState> map) {
        this.vertices = set;
        this.operatorStates = map;
    }

    public void validateOperatorsFinishedState() {
        VerticesFinishedStatusCache verticesFinishedStatusCache = new VerticesFinishedStatusCache(this.operatorStates);
        for (ExecutionJobVertex executionJobVertex : this.vertices) {
            VertexFinishedState orUpdate = verticesFinishedStatusCache.getOrUpdate(executionJobVertex);
            if (orUpdate == VertexFinishedState.FULLY_FINISHED) {
                checkPredecessorsOfFullyFinishedVertex(executionJobVertex, verticesFinishedStatusCache);
            } else if (orUpdate == VertexFinishedState.PARTIALLY_FINISHED) {
                checkPredecessorsOfPartiallyFinishedVertex(executionJobVertex, verticesFinishedStatusCache);
            }
        }
    }

    private void checkPredecessorsOfFullyFinishedVertex(ExecutionJobVertex executionJobVertex, VerticesFinishedStatusCache verticesFinishedStatusCache) {
        if (!executionJobVertex.getInputs().stream().map((v0) -> {
            return v0.getProducer();
        }).allMatch(executionJobVertex2 -> {
            return verticesFinishedStatusCache.getOrUpdate(executionJobVertex2) == VertexFinishedState.FULLY_FINISHED;
        })) {
            throw new FlinkRuntimeException("Illegal JobGraph modification. Cannot run a program with fully finished vertices predeceased with the ones not fully finished. Task vertex " + executionJobVertex.getName() + "(" + executionJobVertex.getJobVertexId() + ") has a predecessor not fully finished");
        }
    }

    private void checkPredecessorsOfPartiallyFinishedVertex(ExecutionJobVertex executionJobVertex, VerticesFinishedStatusCache verticesFinishedStatusCache) {
        HashMap hashMap = new HashMap();
        for (JobEdge jobEdge : executionJobVertex.getJobVertex().getInputs()) {
            hashMap.compute(jobEdge.getSource().getProducer().getID(), (jobVertexID, distributionPattern) -> {
                return distributionPattern == DistributionPattern.ALL_TO_ALL ? distributionPattern : jobEdge.getDistributionPattern();
            });
        }
        Iterator<IntermediateResult> it = executionJobVertex.getInputs().iterator();
        while (it.hasNext()) {
            ExecutionJobVertex producer = it.next().getProducer();
            VertexFinishedState orUpdate = verticesFinishedStatusCache.getOrUpdate(producer);
            DistributionPattern distributionPattern2 = (DistributionPattern) hashMap.get(producer.getJobVertexId());
            if (distributionPattern2 == DistributionPattern.ALL_TO_ALL && orUpdate != VertexFinishedState.FULLY_FINISHED) {
                throw new FlinkRuntimeException("Illegal JobGraph modification. Cannot run a program with partially finished vertices predeceased with running or partially finished ones and connected via the ALL_TO_ALL edges. Task vertex " + executionJobVertex.getName() + "(" + executionJobVertex.getJobVertexId() + ") has a " + (orUpdate == VertexFinishedState.ALL_RUNNING ? "all running" : "partially finished") + " predecessor");
            }
            if (distributionPattern2 == DistributionPattern.POINTWISE && orUpdate == VertexFinishedState.ALL_RUNNING) {
                throw new FlinkRuntimeException("Illegal JobGraph modification. Cannot run a program with partially finished vertices predeceased with all running ones. Task vertex " + executionJobVertex.getName() + "(" + executionJobVertex.getJobVertexId() + ") has a all running predecessor");
            }
        }
    }
}
