/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.executiongraph.failover.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.utils.ExecutionUtils;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.LocalInputPreferredSlotSharingStrategy;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SlotSharingStrategy;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotRequestBulkChecker;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;

public class SchedulerTestingUtils {
    private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 600000L;
    private static final long RETRY_INTERVAL_MILLIS = 10L;
    private static final int RETRY_ATTEMPTS = 6000;
    private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(300L);

    private SchedulerTestingUtils() {
    }

    public static DefaultScheduler createScheduler(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor, ScheduledExecutorService executorService) throws Exception {
        return new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, executorService).build();
    }

    public static void enableCheckpointing(JobGraph jobGraph) {
        SchedulerTestingUtils.enableCheckpointing(jobGraph, null, null);
    }

    public static void enableCheckpointing(JobGraph jobGraph, @Nullable StateBackend stateBackend, @Nullable CheckpointStorage checkpointStorage) {
        SchedulerTestingUtils.enableCheckpointing(jobGraph, stateBackend, checkpointStorage, Long.MAX_VALUE, false);
    }

    public static void enableCheckpointing(JobGraph jobGraph, @Nullable StateBackend stateBackend, @Nullable CheckpointStorage checkpointStorage, long checkpointInterval, boolean enableCheckpointsAfterTasksFinish) {
        CheckpointCoordinatorConfiguration config = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(checkpointInterval).setCheckpointTimeout(600000L).setMinPauseBetweenCheckpoints(0L).setMaxConcurrentCheckpoints(1).setCheckpointRetentionPolicy(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION).setExactlyOnce(false).setUnalignedCheckpointsEnabled(false).setTolerableCheckpointFailureNumber(0).setCheckpointIdOfIgnoredInFlightData(0L).setEnableCheckpointsAfterTasksFinish(enableCheckpointsAfterTasksFinish).build();
        SerializedValue serializedStateBackend = null;
        if (stateBackend != null) {
            try {
                serializedStateBackend = new SerializedValue((Object)stateBackend);
            }
            catch (IOException e) {
                throw new RuntimeException("could not serialize state backend", e);
            }
        }
        SerializedValue serializedCheckpointStorage = null;
        if (checkpointStorage != null) {
            try {
                serializedCheckpointStorage = new SerializedValue((Object)checkpointStorage);
            }
            catch (IOException e) {
                throw new RuntimeException("could not serialize checkpoint storage", e);
            }
        }
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(config, serializedStateBackend, TernaryBoolean.UNDEFINED, serializedCheckpointStorage, null));
    }

    public static Collection<ExecutionAttemptID> getAllCurrentExecutionAttempts(SchedulerNG scheduler) {
        return StreamSupport.stream(scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().spliterator(), false).map(vertex -> vertex.getCurrentExecutionAttempt().getAttemptId()).collect(Collectors.toList());
    }

    public static ExecutionState getExecutionState(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
        ExecutionJobVertex ejv = SchedulerTestingUtils.getJobVertex(scheduler, jvid);
        return ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getState();
    }

    public static void failExecution(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
        ExecutionAttemptID attemptID = SchedulerTestingUtils.getAttemptId(scheduler, jvid, subtask);
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptID, ExecutionState.FAILED, (Throwable)new Exception("test task failure")));
    }

    public static void canceledExecution(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
        ExecutionAttemptID attemptID = SchedulerTestingUtils.getAttemptId(scheduler, jvid, subtask);
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptID, ExecutionState.CANCELED, (Throwable)new Exception("test task failure")));
    }

    public static void setExecutionToState(ExecutionState executionState, DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
        ExecutionAttemptID attemptID = SchedulerTestingUtils.getAttemptId(scheduler, jvid, subtask);
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptID, executionState));
    }

    public static void setAllExecutionsToRunning(SchedulerNG scheduler) {
        SchedulerTestingUtils.getAllCurrentExecutionAttempts(scheduler).forEach(attemptId -> {
            scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.INITIALIZING));
            scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.RUNNING));
        });
    }

    public static void setAllExecutionsToCancelled(DefaultScheduler scheduler) {
        for (ExecutionAttemptID attemptId : SchedulerTestingUtils.getAllCurrentExecutionAttempts((SchedulerNG)scheduler)) {
            boolean setToRunning = scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.CANCELED));
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)setToRunning).as("could not switch task to RUNNING", new Object[0])).isTrue();
        }
    }

    public static void acknowledgePendingCheckpoint(DefaultScheduler scheduler, long checkpointId) throws CheckpointException {
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        JobID jid = scheduler.getJobId();
        for (ExecutionAttemptID attemptId : SchedulerTestingUtils.getAllCurrentExecutionAttempts((SchedulerNG)scheduler)) {
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, attemptId, checkpointId);
            checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
        }
    }

    public static void acknowledgePendingCheckpoint(SchedulerNG scheduler, int checkpointId, Map<OperatorID, OperatorSubtaskState> subtaskStateMap) {
        SchedulerTestingUtils.getAllCurrentExecutionAttempts(scheduler).forEach(executionAttemptID -> scheduler.acknowledgeCheckpoint(scheduler.requestJob().getJobId(), executionAttemptID, (long)checkpointId, new CheckpointMetrics(), new TaskStateSnapshot(subtaskStateMap)));
    }

    public static CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler scheduler) throws Exception {
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        return checkpointCoordinator.triggerCheckpoint(false);
    }

    public static void acknowledgeCurrentCheckpoint(DefaultScheduler scheduler) {
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        ((AbstractIntegerAssert)Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).as("Coordinator has not ", new Object[0])).isOne();
        PendingCheckpoint pc = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        while (pc.getNumberOfNonAcknowledgedOperatorCoordinators() > 0) {
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                Assertions.fail((String)"interrupted");
            }
        }
        SchedulerTestingUtils.getAllCurrentExecutionAttempts((SchedulerNG)scheduler).forEach(attemptId -> scheduler.acknowledgeCheckpoint(pc.getJobId(), attemptId, pc.getCheckpointID(), new CheckpointMetrics(), null));
    }

    public static CheckpointCoordinator getCheckpointCoordinator(SchedulerBase scheduler) {
        return scheduler.getCheckpointCoordinator();
    }

    public static void waitForJobStatusRunning(SchedulerNG scheduler) throws Exception {
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> scheduler.requestJobStatus() == JobStatus.RUNNING), 10L, 6000);
    }

    public static void waitForCheckpointInProgress(SchedulerNG scheduler) throws Exception {
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> scheduler.requestCheckpointStats().getCounts().getNumberOfInProgressCheckpoints() > 0), 10L, 6000);
    }

    public static void waitForCompletedCheckpoint(SchedulerNG scheduler) throws Exception {
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> scheduler.requestCheckpointStats().getCounts().getNumberOfCompletedCheckpoints() > 0L), 10L, 6000);
    }

    private static ExecutionJobVertex getJobVertex(DefaultScheduler scheduler, JobVertexID jobVertexId) {
        ExecutionVertexID id = new ExecutionVertexID(jobVertexId, 0);
        return scheduler.getExecutionVertex(id).getJobVertex();
    }

    public static ExecutionAttemptID getAttemptId(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
        ExecutionJobVertex ejv = SchedulerTestingUtils.getJobVertex(scheduler, jvid);
        assert (ejv != null);
        return ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
    }

    public static SlotSharingExecutionSlotAllocatorFactory newSlotSharingExecutionSlotAllocatorFactory() {
        return SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.createWithInfiniteSlotCreation());
    }

    public static SlotSharingExecutionSlotAllocatorFactory newSlotSharingExecutionSlotAllocatorFactory(PhysicalSlotProvider physicalSlotProvider) {
        return SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider, DEFAULT_TIMEOUT);
    }

    public static SlotSharingExecutionSlotAllocatorFactory newSlotSharingExecutionSlotAllocatorFactory(PhysicalSlotProvider physicalSlotProvider, Duration allocationTimeout) {
        return new SlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider, true, (PhysicalSlotRequestBulkChecker)new TestingPhysicalSlotRequestBulkChecker(), allocationTimeout, (SlotSharingStrategy.Factory)new LocalInputPreferredSlotSharingStrategy.Factory());
    }

    public static SchedulerBase createSchedulerAndDeploy(boolean isAdaptive, JobID jobId, JobVertex producer, JobVertex[] consumers, DistributionPattern distributionPattern, BlobWriter blobWriter, ComponentMainThreadExecutor mainThreadExecutor, ScheduledExecutorService ioExecutor, JobMasterPartitionTracker partitionTracker, ScheduledExecutorService scheduledExecutor, Configuration jobMasterConfiguration) throws Exception {
        ArrayList<JobVertex> vertices = new ArrayList<JobVertex>(Collections.singletonList(producer));
        IntermediateDataSetID dataSetId = new IntermediateDataSetID();
        for (JobVertex consumer : consumers) {
            JobVertexConnectionUtils.connectNewDataSetAsInput(consumer, producer, distributionPattern, ResultPartitionType.BLOCKING, dataSetId, false);
            vertices.add(consumer);
        }
        DefaultScheduler scheduler = SchedulerTestingUtils.createScheduler(isAdaptive, jobId, vertices, blobWriter, mainThreadExecutor, ioExecutor, partitionTracker, scheduledExecutor, jobMasterConfiguration);
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        TestingLogicalSlotBuilder slotBuilder = new TestingLogicalSlotBuilder();
        SchedulerTestingUtils.runUnsafe(() -> {
            SchedulerTestingUtils.initializeExecutionJobVertex(producer.getID(), executionGraph, isAdaptive);
            SchedulerTestingUtils.deployTasks(executionGraph, producer.getID(), slotBuilder);
        }, mainThreadExecutor);
        ExecutionUtils.waitForTaskDeploymentDescriptorsCreation(Objects.requireNonNull(executionGraph.getJobVertex(producer.getID())).getTaskVertices());
        SchedulerTestingUtils.runUnsafe(() -> ExecutionGraphTestUtils.finishJobVertex(executionGraph, producer.getID()), mainThreadExecutor);
        for (JobVertex consumer : consumers) {
            SchedulerTestingUtils.runUnsafe(() -> {
                SchedulerTestingUtils.initializeExecutionJobVertex(consumer.getID(), executionGraph, isAdaptive);
                SchedulerTestingUtils.deployTasks(executionGraph, consumer.getID(), slotBuilder);
            }, mainThreadExecutor);
            ExecutionUtils.waitForTaskDeploymentDescriptorsCreation(Objects.requireNonNull(executionGraph.getJobVertex(consumer.getID())).getTaskVertices());
        }
        return scheduler;
    }

    private static void initializeExecutionJobVertex(JobVertexID jobVertex, ExecutionGraph executionGraph, boolean adaptiveSchedulerEnabled) {
        if (!adaptiveSchedulerEnabled) {
            return;
        }
        try {
            executionGraph.initializeJobVertex(executionGraph.getJobVertex(jobVertex), System.currentTimeMillis());
            executionGraph.notifyNewlyInitializedJobVertices(Collections.singletonList(executionGraph.getJobVertex(jobVertex)));
        }
        catch (JobException exception) {
            throw new RuntimeException(exception);
        }
    }

    private static DefaultScheduler createScheduler(boolean isAdaptive, JobID jobId, List<JobVertex> jobVertices, BlobWriter blobWriter, ComponentMainThreadExecutor mainThreadExecutor, ScheduledExecutorService ioExecutor, JobMasterPartitionTracker partitionTracker, ScheduledExecutorService scheduledExecutor, Configuration jobMasterConfiguration) throws Exception {
        JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder().setJobId(jobId).addJobVertices(jobVertices).build();
        DefaultSchedulerBuilder builder = new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, scheduledExecutor).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0L)).setBlobWriter(blobWriter).setIoExecutor(ioExecutor).setPartitionTracker(partitionTracker).setJobMasterConfiguration(jobMasterConfiguration);
        return isAdaptive ? builder.buildAdaptiveBatchJobScheduler() : builder.build();
    }

    private static void deployTasks(ExecutionGraph executionGraph, JobVertexID jobVertexID, TestingLogicalSlotBuilder slotBuilder) throws JobException, ExecutionException, InterruptedException {
        for (ExecutionVertex vertex : Objects.requireNonNull(executionGraph.getJobVertex(jobVertexID)).getTaskVertices()) {
            TestingLogicalSlot slot = slotBuilder.createTestingLogicalSlot();
            Execution execution = vertex.getCurrentExecutionAttempt();
            execution.registerProducedPartitions(slot.getTaskManagerLocation()).get();
            execution.transitionState(ExecutionState.SCHEDULED);
            vertex.tryAssignResource((LogicalSlot)slot);
            vertex.deploy();
        }
    }

    public static TaskExecutionState createFinishedTaskExecutionState(ExecutionAttemptID attemptId, Map<IntermediateResultPartitionID, ResultPartitionBytes> resultPartitionBytes) {
        return new TaskExecutionState(attemptId, ExecutionState.FINISHED, null, null, new IOMetrics(0L, 0L, 0L, 0L, 0L, 0.0, 0L, resultPartitionBytes));
    }

    public static TaskExecutionState createFinishedTaskExecutionState(ExecutionAttemptID attemptId) {
        return SchedulerTestingUtils.createFinishedTaskExecutionState(attemptId, Collections.emptyMap());
    }

    public static TaskExecutionState createFailedTaskExecutionState(ExecutionAttemptID attemptId, Throwable failureCause) {
        return new TaskExecutionState(attemptId, ExecutionState.FAILED, failureCause);
    }

    public static TaskExecutionState createFailedTaskExecutionState(ExecutionAttemptID attemptId) {
        return SchedulerTestingUtils.createFailedTaskExecutionState(attemptId, new Exception("Expected failure cause"));
    }

    public static TaskExecutionState createCanceledTaskExecutionState(ExecutionAttemptID attemptId) {
        return new TaskExecutionState(attemptId, ExecutionState.CANCELED);
    }

    private static void runUnsafe(RunnableWithException callback, ComponentMainThreadExecutor executor) {
        try {
            CompletableFuture.runAsync(() -> {
                try {
                    callback.run();
                }
                catch (Throwable e) {
                    throw new RuntimeException("Exception shouldn't happen here.", e);
                }
            }, (Executor)executor).join();
        }
        catch (Exception e) {
            throw new RuntimeException("Exception shouldn't happen here.", e);
        }
    }
}

