package org.apache.flink.runtime.scheduler.strategy;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.scheduler.DeploymentOption;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.class */
public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
    private static final Predicate<SchedulingExecutionVertex<?, ?>> IS_IN_CREATED_EXECUTION_STATE = schedulingExecutionVertex -> {
        return ExecutionState.CREATED == schedulingExecutionVertex.getState();
    };
    private final SchedulerOperations schedulerOperations;
    private final SchedulingTopology<?, ?> schedulingTopology;
    private final Map<ExecutionVertexID, DeploymentOption> deploymentOptions = new HashMap();
    private final InputDependencyConstraintChecker inputConstraintChecker = new InputDependencyConstraintChecker();

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy$Factory.class */
    public static class Factory implements SchedulingStrategyFactory {
        @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory
        public SchedulingStrategy createInstance(SchedulerOperations schedulerOperations, SchedulingTopology<?, ?> schedulingTopology) {
            return new LazyFromSourcesSchedulingStrategy(schedulerOperations, schedulingTopology);
        }
    }

    public LazyFromSourcesSchedulingStrategy(SchedulerOperations schedulerOperations, SchedulingTopology<?, ?> schedulingTopology) {
        this.schedulerOperations = (SchedulerOperations) Preconditions.checkNotNull(schedulerOperations);
        this.schedulingTopology = (SchedulingTopology) Preconditions.checkNotNull(schedulingTopology);
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void startScheduling() {
        DeploymentOption deploymentOption = new DeploymentOption(true);
        DeploymentOption deploymentOption2 = new DeploymentOption(false);
        for (V v : this.schedulingTopology.getVertices()) {
            DeploymentOption deploymentOption3 = deploymentOption2;
            for (R r : v.getProducedResults()) {
                if (r.getResultType().isPipelined()) {
                    deploymentOption3 = deploymentOption;
                }
                this.inputConstraintChecker.addSchedulingResultPartition(r);
            }
            this.deploymentOptions.put(v.getId(), deploymentOption3);
        }
        allocateSlotsAndDeployExecutionVertices(this.schedulingTopology.getVertices());
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void restartTasks(Set<ExecutionVertexID> set) {
        Stream<ExecutionVertexID> stream = set.stream();
        SchedulingTopology<?, ?> schedulingTopology = this.schedulingTopology;
        schedulingTopology.getClass();
        Stream flatMap = stream.map(schedulingTopology::getVertexOrThrow).flatMap(schedulingExecutionVertex -> {
            return IterableUtils.toStream(schedulingExecutionVertex.getProducedResults());
        });
        InputDependencyConstraintChecker inputDependencyConstraintChecker = this.inputConstraintChecker;
        inputDependencyConstraintChecker.getClass();
        flatMap.forEach(inputDependencyConstraintChecker::resetSchedulingResultPartition);
        allocateSlotsAndDeployExecutionVertices(SchedulingStrategyUtils.getVerticesFromIds(this.schedulingTopology, set));
    }

    /* JADX WARN: Type inference failed for: r0v4, types: [org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex] */
    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void onExecutionStateChange(ExecutionVertexID executionVertexID, ExecutionState executionState) {
        if (ExecutionState.FINISHED.equals(executionState)) {
            allocateSlotsAndDeployExecutionVertices((Set) IterableUtils.toStream(this.schedulingTopology.getVertexOrThrow(executionVertexID).getProducedResults()).filter(schedulingResultPartition -> {
                return schedulingResultPartition.getResultType().isBlocking();
            }).flatMap(schedulingResultPartition2 -> {
                return this.inputConstraintChecker.markSchedulingResultPartitionFinished(schedulingResultPartition2).stream();
            }).flatMap(schedulingResultPartition3 -> {
                return IterableUtils.toStream(schedulingResultPartition3.getConsumers());
            }).collect(Collectors.toSet()));
        }
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition, java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v8, types: [org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex] */
    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void onPartitionConsumable(ExecutionVertexID executionVertexID, ResultPartitionID resultPartitionID) {
        ?? resultPartitionOrThrow = this.schedulingTopology.getResultPartitionOrThrow(resultPartitionID.getPartitionId());
        if (resultPartitionOrThrow.getResultType().isPipelined()) {
            if (!Iterables.contains(this.schedulingTopology.getVertexOrThrow(executionVertexID).getProducedResults(), (Object) resultPartitionOrThrow)) {
                throw new IllegalStateException("partition " + resultPartitionID + " is not the produced partition of " + executionVertexID);
            }
            allocateSlotsAndDeployExecutionVertices(resultPartitionOrThrow.getConsumers());
        }
    }

    private void allocateSlotsAndDeployExecutionVertices(Iterable<? extends SchedulingExecutionVertex<?, ?>> iterable) {
        Set set = (Set) IterableUtils.toStream(iterable).filter(IS_IN_CREATED_EXECUTION_STATE.and(isInputConstraintSatisfied())).map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet());
        SchedulingTopology<?, ?> schedulingTopology = this.schedulingTopology;
        Map<ExecutionVertexID, DeploymentOption> map = this.deploymentOptions;
        map.getClass();
        this.schedulerOperations.allocateSlotsAndDeploy(SchedulingStrategyUtils.createExecutionVertexDeploymentOptionsInTopologicalOrder(schedulingTopology, set, (v1) -> {
            return r2.get(v1);
        }));
    }

    private Predicate<SchedulingExecutionVertex<?, ?>> isInputConstraintSatisfied() {
        InputDependencyConstraintChecker inputDependencyConstraintChecker = this.inputConstraintChecker;
        inputDependencyConstraintChecker.getClass();
        return inputDependencyConstraintChecker::check;
    }
}
