package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.SpeculativeExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader;
import org.apache.flink.runtime.executiongraph.failover.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartBackoffTimeStrategyFactoryLoader;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.event.FileSystemJobEventStore;
import org.apache.flink.runtime.jobmaster.event.JobEventManager;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.DefaultExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
import org.apache.flink.runtime.scheduler.SimpleExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.strategy.AllFinishedInputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.DefaultInputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.InputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.PartialFinishedInputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.util.SlotSelectionStrategyUtils;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.class */
public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory {
    private static final Logger LOG = LoggerFactory.getLogger(AdaptiveBatchSchedulerFactory.class);

    @Override // org.apache.flink.runtime.scheduler.SchedulerNGFactory
    public SchedulerNG createInstance(Logger logger, ExecutionPlan executionPlan, Executor executor, Configuration configuration, SlotPoolService slotPoolService, ScheduledExecutorService scheduledExecutorService, ClassLoader classLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, Duration duration, BlobWriter blobWriter, JobManagerJobMetricGroup jobManagerJobMetricGroup, Duration duration2, ShuffleMaster<?> shuffleMaster, JobMasterPartitionTracker jobMasterPartitionTracker, ExecutionDeploymentTracker executionDeploymentTracker, long j, ComponentMainThreadExecutor componentMainThreadExecutor, FatalErrorHandler fatalErrorHandler, JobStatusListener jobStatusListener, Collection<FailureEnricher> collection, BlocklistOperations blocklistOperations) throws Exception {
        ExecutionConfig executionConfig;
        if (executionPlan instanceof JobGraph) {
            executionConfig = executionPlan.getSerializedExecutionConfig().deserializeValue(classLoader);
        } else {
            if (!(executionPlan instanceof StreamGraph)) {
                throw new FlinkException("Unsupported execution plan " + executionPlan.getClass().getCanonicalName());
            }
            executionConfig = ((StreamGraph) executionPlan).getExecutionConfig();
        }
        ExecutionSlotAllocatorFactory createExecutionSlotAllocatorFactory = createExecutionSlotAllocatorFactory(configuration, (SlotPool) slotPoolService.castInto(SlotPool.class).orElseThrow(() -> {
            return new IllegalStateException("The AdaptiveBatchScheduler requires a SlotPool.");
        }));
        RestartBackoffTimeStrategy create = RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(executionPlan.getJobConfiguration(), configuration, executionPlan.isCheckpointingEnabled()).create();
        logger.info("Using restart back off time strategy {} for {} ({}).", new Object[]{create, executionPlan.getName(), executionPlan.getJobID()});
        return createScheduler(logger, executionPlan, executionConfig, executor, configuration, scheduledExecutorService, classLoader, checkpointRecoveryFactory, duration, blobWriter, jobManagerJobMetricGroup, shuffleMaster, jobMasterPartitionTracker, executionDeploymentTracker, j, componentMainThreadExecutor, jobStatusListener, collection, blocklistOperations, new DefaultExecutionOperations(), createExecutionSlotAllocatorFactory, create, new ScheduledExecutorServiceAdapter(scheduledExecutorService), DefaultVertexParallelismAndInputInfosDecider.from(getDefaultMaxParallelism(configuration, executionConfig), ((Double) executionPlan.getJobConfiguration().get(BatchExecutionOptionsInternal.ADAPTIVE_SKEWED_OPTIMIZATION_SKEWED_FACTOR)).doubleValue(), ((MemorySize) executionPlan.getJobConfiguration().get(BatchExecutionOptionsInternal.ADAPTIVE_SKEWED_OPTIMIZATION_SKEWED_THRESHOLD)).getBytes(), configuration), ((Boolean) configuration.get(BatchExecutionOptions.JOB_RECOVERY_ENABLED)).booleanValue() && shuffleMaster.supportsBatchSnapshot() ? new DefaultBatchJobRecoveryHandler(new JobEventManager(new FileSystemJobEventStore(executionPlan.getJobID(), configuration)), configuration, executionPlan.getJobID()) : new DummyBatchJobRecoveryHandler());
    }

    @VisibleForTesting
    public static AdaptiveBatchScheduler createScheduler(Logger logger, ExecutionPlan executionPlan, ExecutionConfig executionConfig, Executor executor, Configuration configuration, ScheduledExecutorService scheduledExecutorService, ClassLoader classLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, Duration duration, BlobWriter blobWriter, JobManagerJobMetricGroup jobManagerJobMetricGroup, ShuffleMaster<?> shuffleMaster, JobMasterPartitionTracker jobMasterPartitionTracker, ExecutionDeploymentTracker executionDeploymentTracker, long j, ComponentMainThreadExecutor componentMainThreadExecutor, JobStatusListener jobStatusListener, Collection<FailureEnricher> collection, BlocklistOperations blocklistOperations, ExecutionOperations executionOperations, ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, RestartBackoffTimeStrategy restartBackoffTimeStrategy, ScheduledExecutor scheduledExecutor, VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider, BatchJobRecoveryHandler batchJobRecoveryHandler) throws Exception {
        Preconditions.checkState(executionPlan.getJobType() == JobType.BATCH, "Adaptive batch scheduler only supports batch jobs");
        checkAllExchangesAreSupported(executionPlan);
        boolean booleanValue = ((Boolean) configuration.get(BatchExecutionOptions.SPECULATIVE_ENABLED)).booleanValue();
        JobManagerOptions.HybridPartitionDataConsumeConstraint orDecideHybridPartitionDataConsumeConstraint = getOrDecideHybridPartitionDataConsumeConstraint(configuration, booleanValue);
        DefaultExecutionGraphFactory defaultExecutionGraphFactory = new DefaultExecutionGraphFactory(configuration, classLoader, executionDeploymentTracker, scheduledExecutorService, executor, duration, jobManagerJobMetricGroup, blobWriter, shuffleMaster, jobMasterPartitionTracker, true, createExecutionJobVertexFactory(booleanValue), orDecideHybridPartitionDataConsumeConstraint == JobManagerOptions.HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS);
        VertexwiseSchedulingStrategy.Factory factory = new VertexwiseSchedulingStrategy.Factory(loadInputConsumableDeciderFactory(orDecideHybridPartitionDataConsumeConstraint));
        int defaultMaxParallelism = getDefaultMaxParallelism(configuration, executionConfig);
        AdaptiveExecutionHandler create = AdaptiveExecutionHandlerFactory.create(executionPlan, batchJobRecoveryHandler instanceof DefaultBatchJobRecoveryHandler, classLoader, scheduledExecutorService);
        return new AdaptiveBatchScheduler(logger, create, executor, configuration, componentMainThreadExecutor2 -> {
        }, scheduledExecutor, classLoader, new CheckpointsCleaner(), checkpointRecoveryFactory, jobManagerJobMetricGroup, factory, FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(configuration), restartBackoffTimeStrategy, executionOperations, new ExecutionVertexVersioner(), executionSlotAllocatorFactory, j, componentMainThreadExecutor, jobStatusListener, collection, defaultExecutionGraphFactory, shuffleMaster, duration, vertexParallelismAndInputInfosDecider, defaultMaxParallelism, blocklistOperations, orDecideHybridPartitionDataConsumeConstraint, batchJobRecoveryHandler, create.createExecutionPlanSchedulingContext(defaultMaxParallelism));
    }

    public static InputConsumableDecider.Factory loadInputConsumableDeciderFactory(JobManagerOptions.HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint) {
        switch (hybridPartitionDataConsumeConstraint) {
            case ALL_PRODUCERS_FINISHED:
                return AllFinishedInputConsumableDecider.Factory.INSTANCE;
            case ONLY_FINISHED_PRODUCERS:
                return PartialFinishedInputConsumableDecider.Factory.INSTANCE;
            case UNFINISHED_PRODUCERS:
                return DefaultInputConsumableDecider.Factory.INSTANCE;
            default:
                throw new IllegalStateException(hybridPartitionDataConsumeConstraint + "is not supported.");
        }
    }

    public static JobManagerOptions.HybridPartitionDataConsumeConstraint getOrDecideHybridPartitionDataConsumeConstraint(Configuration configuration, boolean z) {
        JobManagerOptions.HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint = (JobManagerOptions.HybridPartitionDataConsumeConstraint) configuration.getOptional(JobManagerOptions.HYBRID_PARTITION_DATA_CONSUME_CONSTRAINT).orElseGet(() -> {
            JobManagerOptions.HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint2 = z ? JobManagerOptions.HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS : JobManagerOptions.HybridPartitionDataConsumeConstraint.UNFINISHED_PRODUCERS;
            LOG.info("Set {} to {} as it is not configured", JobManagerOptions.HYBRID_PARTITION_DATA_CONSUME_CONSTRAINT.key(), hybridPartitionDataConsumeConstraint2.name());
            return hybridPartitionDataConsumeConstraint2;
        });
        if (z) {
            Preconditions.checkState(hybridPartitionDataConsumeConstraint != JobManagerOptions.HybridPartitionDataConsumeConstraint.UNFINISHED_PRODUCERS, "For speculative execution, only supports consume finished partition now.");
        }
        return hybridPartitionDataConsumeConstraint;
    }

    private static ExecutionSlotAllocatorFactory createExecutionSlotAllocatorFactory(Configuration configuration, SlotPool slotPool) {
        return new SimpleExecutionSlotAllocator.Factory(new PhysicalSlotProviderImpl(SlotSelectionStrategyUtils.selectSlotSelectionStrategy(JobType.BATCH, configuration), slotPool), false);
    }

    private static ExecutionJobVertex.Factory createExecutionJobVertexFactory(boolean z) {
        return z ? new SpeculativeExecutionJobVertex.Factory() : new ExecutionJobVertex.Factory();
    }

    private static void checkAllExchangesAreSupported(ExecutionPlan executionPlan) {
        String format = String.format("At the moment, adaptive batch scheduler requires batch workloads to be executed with types of all edges being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure '%s' to '%s' or '%s/%s'. ", ExecutionOptions.BATCH_SHUFFLE_MODE.key(), BatchShuffleMode.ALL_EXCHANGES_BLOCKING, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
        if (!(executionPlan instanceof JobGraph)) {
            Iterator<StreamNode> it = ((StreamGraph) executionPlan).getStreamNodes().iterator();
            while (it.hasNext()) {
                Iterator<StreamEdge> it2 = it.next().getOutEdges().iterator();
                while (it2.hasNext()) {
                    Preconditions.checkState(!it2.next().getExchangeMode().equals(StreamExchangeMode.PIPELINED), format);
                }
            }
            return;
        }
        Iterator<JobVertex> it3 = ((JobGraph) executionPlan).getVertices().iterator();
        while (it3.hasNext()) {
            for (IntermediateDataSet intermediateDataSet : it3.next().getProducedDataSets()) {
                Preconditions.checkState(intermediateDataSet.getResultType().isBlockingOrBlockingPersistentResultPartition() || intermediateDataSet.getResultType().isHybridResultPartition(), format);
            }
        }
    }

    static int getDefaultMaxParallelism(Configuration configuration, ExecutionConfig executionConfig) {
        return ((Integer) configuration.getOptional(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM).orElse(Integer.valueOf(executionConfig.getParallelism() == -1 ? BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM.defaultValue().intValue() : executionConfig.getParallelism()))).intValue();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNGFactory
    public JobManagerOptions.SchedulerType getSchedulerType() {
        return JobManagerOptions.SchedulerType.AdaptiveBatch;
    }
}
