package org.apache.flink.runtime.checkpoint;

import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCheckpointPlan.class */
public class DefaultCheckpointPlan implements CheckpointPlan {
    private final List<Execution> tasksToTrigger;
    private final List<Execution> tasksToWaitFor;
    private final List<ExecutionVertex> tasksToCommitTo;
    private final List<Execution> finishedTasks;
    private final boolean mayHaveFinishedTasks;
    private final Map<JobVertexID, ExecutionJobVertex> fullyFinishedOrFinishedOnRestoreVertices = new HashMap();
    private final IdentityHashMap<ExecutionJobVertex, Integer> vertexOperatorsFinishedTasksCount;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultCheckpointPlan(List<Execution> list, List<Execution> list2, List<ExecutionVertex> list3, List<Execution> list4, List<ExecutionJobVertex> list5, boolean z) {
        this.tasksToTrigger = (List) Preconditions.checkNotNull(list);
        this.tasksToWaitFor = (List) Preconditions.checkNotNull(list2);
        this.tasksToCommitTo = (List) Preconditions.checkNotNull(list3);
        this.finishedTasks = (List) Preconditions.checkNotNull(list4);
        this.mayHaveFinishedTasks = z;
        list5.forEach(executionJobVertex -> {
            this.fullyFinishedOrFinishedOnRestoreVertices.put(executionJobVertex.getJobVertexId(), executionJobVertex);
        });
        this.vertexOperatorsFinishedTasksCount = new IdentityHashMap<>();
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointPlan
    public List<Execution> getTasksToTrigger() {
        return this.tasksToTrigger;
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointPlan
    public List<Execution> getTasksToWaitFor() {
        return this.tasksToWaitFor;
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointPlan
    public List<ExecutionVertex> getTasksToCommitTo() {
        return this.tasksToCommitTo;
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointPlan
    public List<Execution> getFinishedTasks() {
        return this.finishedTasks;
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointPlan
    public Collection<ExecutionJobVertex> getFullyFinishedJobVertex() {
        return this.fullyFinishedOrFinishedOnRestoreVertices.values();
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointPlan
    public boolean mayHaveFinishedTasks() {
        return this.mayHaveFinishedTasks;
    }

    @Override // org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider
    public void reportTaskFinishedOnRestore(ExecutionVertex executionVertex) {
        this.fullyFinishedOrFinishedOnRestoreVertices.putIfAbsent(executionVertex.getJobvertexId(), executionVertex.getJobVertex());
    }

    @Override // org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider
    public void reportTaskHasFinishedOperators(ExecutionVertex executionVertex) {
        this.vertexOperatorsFinishedTasksCount.compute(executionVertex.getJobVertex(), (executionJobVertex, num) -> {
            return Integer.valueOf(num == null ? 1 : num.intValue() + 1);
        });
    }

    @Override // org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider
    public void fulfillFinishedTaskStatus(Map<OperatorID, OperatorState> map) {
        if (this.mayHaveFinishedTasks) {
            HashMap hashMap = new HashMap();
            for (Execution execution : this.finishedTasks) {
                JobVertexID jobvertexId = execution.getVertex().getJobvertexId();
                if (!this.fullyFinishedOrFinishedOnRestoreVertices.containsKey(jobvertexId)) {
                    hashMap.put(jobvertexId, execution.getVertex().getJobVertex());
                }
            }
            checkNoPartlyFinishedVertexUsedUnionListState(hashMap, map);
            checkNoPartlyOperatorsFinishedVertexUsedUnionListState(hashMap, map);
            fulfillFullyFinishedOrFinishedOnRestoreOperatorStates(map);
            fulfillSubtaskStateForPartiallyFinishedOperators(map);
        }
    }

    private void checkNoPartlyFinishedVertexUsedUnionListState(Map<JobVertexID, ExecutionJobVertex> map, Map<OperatorID, OperatorState> map2) {
        for (ExecutionJobVertex executionJobVertex : map.values()) {
            if (hasUsedUnionListState(executionJobVertex, map2)) {
                throw new FinishedTaskStateProvider.PartialFinishingNotSupportedByStateException(String.format("The vertex %s (id = %s) has used UnionListState, but part of its tasks are FINISHED.", executionJobVertex.getName(), executionJobVertex.getJobVertexId()));
            }
        }
    }

    private void checkNoPartlyOperatorsFinishedVertexUsedUnionListState(Map<JobVertexID, ExecutionJobVertex> map, Map<OperatorID, OperatorState> map2) {
        for (Map.Entry<ExecutionJobVertex, Integer> entry : this.vertexOperatorsFinishedTasksCount.entrySet()) {
            ExecutionJobVertex key = entry.getKey();
            if (!map.containsKey(key.getJobVertexId()) && entry.getValue().intValue() != key.getParallelism() && hasUsedUnionListState(key, map2)) {
                throw new FinishedTaskStateProvider.PartialFinishingNotSupportedByStateException(String.format("The vertex %s (id = %s) has used UnionListState, but part of its tasks has called operators' finish method.", key.getName(), key.getJobVertexId()));
            }
        }
    }

    private boolean hasUsedUnionListState(ExecutionJobVertex executionJobVertex, Map<OperatorID, OperatorState> map) {
        Iterator<OperatorIDPair> it = executionJobVertex.getOperatorIDs().iterator();
        while (it.hasNext()) {
            OperatorState operatorState = map.get(it.next().getGeneratedOperatorID());
            if (operatorState != null) {
                for (OperatorSubtaskState operatorSubtaskState : operatorState.getStates()) {
                    if (Stream.concat(operatorSubtaskState.getManagedOperatorState().stream(), operatorSubtaskState.getRawOperatorState().stream()).filter((v0) -> {
                        return Objects.nonNull(v0);
                    }).flatMap(operatorStateHandle -> {
                        return operatorStateHandle.getStateNameToPartitionOffsets().values().stream();
                    }).anyMatch(stateMetaInfo -> {
                        return stateMetaInfo.getDistributionMode() == OperatorStateHandle.Mode.UNION;
                    })) {
                        return true;
                    }
                }
            }
        }
        return false;
    }

    private void fulfillFullyFinishedOrFinishedOnRestoreOperatorStates(Map<OperatorID, OperatorState> map) {
        for (ExecutionJobVertex executionJobVertex : this.fullyFinishedOrFinishedOnRestoreVertices.values()) {
            for (OperatorIDPair operatorIDPair : executionJobVertex.getOperatorIDs()) {
                OperatorState operatorState = map.get(operatorIDPair.getGeneratedOperatorID());
                Preconditions.checkState(operatorState == null || !operatorState.hasSubtaskStates(), "There should be no states or only coordinator state reported for fully finished operators");
                map.put(operatorIDPair.getGeneratedOperatorID(), new FullyFinishedOperatorState(operatorIDPair.getUserDefinedOperatorName(), operatorIDPair.getUserDefinedOperatorUid(), operatorIDPair.getGeneratedOperatorID(), executionJobVertex.getParallelism(), executionJobVertex.getMaxParallelism()));
            }
        }
    }

    private void fulfillSubtaskStateForPartiallyFinishedOperators(Map<OperatorID, OperatorState> map) {
        for (Execution execution : this.finishedTasks) {
            ExecutionJobVertex jobVertex = execution.getVertex().getJobVertex();
            for (OperatorIDPair operatorIDPair : jobVertex.getOperatorIDs()) {
                OperatorState operatorState = map.get(operatorIDPair.getGeneratedOperatorID());
                if (operatorState == null || !operatorState.isFullyFinished()) {
                    if (operatorState == null) {
                        operatorState = new OperatorState(operatorIDPair.getUserDefinedOperatorName(), operatorIDPair.getUserDefinedOperatorUid(), operatorIDPair.getGeneratedOperatorID(), jobVertex.getParallelism(), jobVertex.getMaxParallelism());
                        map.put(operatorIDPair.getGeneratedOperatorID(), operatorState);
                    }
                    operatorState.putState(execution.getParallelSubtaskIndex(), FinishedOperatorSubtaskState.INSTANCE);
                }
            }
        }
    }
}
