package org.apache.flink.runtime.scheduler;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.scheduler.LocalInputPreferredSlotSharingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TernaryBoolean;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/SchedulerTestingUtils.class */
public class SchedulerTestingUtils {
    private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 600000;
    private static final Time DEFAULT_TIMEOUT;
    static final /* synthetic */ boolean $assertionsDisabled;

    private SchedulerTestingUtils() {
    }

    public static DefaultScheduler createScheduler(JobGraph jobGraph, ComponentMainThreadExecutor componentMainThreadExecutor, ScheduledExecutorService scheduledExecutorService) throws Exception {
        return new DefaultSchedulerBuilder(jobGraph, componentMainThreadExecutor, scheduledExecutorService).build();
    }

    public static void enableCheckpointing(JobGraph jobGraph) {
        enableCheckpointing(jobGraph, null, null);
    }

    public static void enableCheckpointing(JobGraph jobGraph, @Nullable StateBackend stateBackend, @Nullable CheckpointStorage checkpointStorage) {
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(Long.MAX_VALUE, DEFAULT_CHECKPOINT_TIMEOUT_MS, 0L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, false, false, 0, 0L);
        SerializedValue serializedValue = null;
        if (stateBackend != null) {
            try {
                serializedValue = new SerializedValue(stateBackend);
            } catch (IOException e) {
                throw new RuntimeException("could not serialize state backend", e);
            }
        }
        SerializedValue serializedValue2 = null;
        if (checkpointStorage != null) {
            try {
                serializedValue2 = new SerializedValue(checkpointStorage);
            } catch (IOException e2) {
                throw new RuntimeException("could not serialize checkpoint storage", e2);
            }
        }
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(checkpointCoordinatorConfiguration, serializedValue, TernaryBoolean.UNDEFINED, serializedValue2, (SerializedValue) null));
    }

    public static Collection<ExecutionAttemptID> getAllCurrentExecutionAttempts(DefaultScheduler defaultScheduler) {
        return (Collection) StreamSupport.stream(defaultScheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().spliterator(), false).map(archivedExecutionVertex -> {
            return archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        }).collect(Collectors.toList());
    }

    public static ExecutionState getExecutionState(DefaultScheduler defaultScheduler, JobVertexID jobVertexID, int i) {
        return getJobVertex(defaultScheduler, jobVertexID).getTaskVertices()[i].getCurrentExecutionAttempt().getState();
    }

    public static void failExecution(DefaultScheduler defaultScheduler, JobVertexID jobVertexID, int i) {
        defaultScheduler.updateTaskExecutionState(new TaskExecutionState(getAttemptId(defaultScheduler, jobVertexID, i), ExecutionState.FAILED, new Exception("test task failure")));
    }

    public static void canceledExecution(DefaultScheduler defaultScheduler, JobVertexID jobVertexID, int i) {
        defaultScheduler.updateTaskExecutionState(new TaskExecutionState(getAttemptId(defaultScheduler, jobVertexID, i), ExecutionState.CANCELED, new Exception("test task failure")));
    }

    public static void setExecutionToState(ExecutionState executionState, DefaultScheduler defaultScheduler, JobVertexID jobVertexID, int i) {
        defaultScheduler.updateTaskExecutionState(new TaskExecutionState(getAttemptId(defaultScheduler, jobVertexID, i), executionState));
    }

    public static void setAllExecutionsToRunning(DefaultScheduler defaultScheduler) {
        getAllCurrentExecutionAttempts(defaultScheduler).forEach(executionAttemptID -> {
            defaultScheduler.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.INITIALIZING));
            defaultScheduler.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.RUNNING));
        });
    }

    public static void setAllExecutionsToCancelled(DefaultScheduler defaultScheduler) {
        Iterator<ExecutionAttemptID> it = getAllCurrentExecutionAttempts(defaultScheduler).iterator();
        while (it.hasNext()) {
            Assert.assertTrue("could not switch task to RUNNING", defaultScheduler.updateTaskExecutionState(new TaskExecutionState(it.next(), ExecutionState.CANCELED)));
        }
    }

    public static void acknowledgePendingCheckpoint(DefaultScheduler defaultScheduler, long j) throws CheckpointException {
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(defaultScheduler);
        JobID jobId = defaultScheduler.getJobId();
        Iterator<ExecutionAttemptID> it = getAllCurrentExecutionAttempts(defaultScheduler).iterator();
        while (it.hasNext()) {
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, it.next(), j), "Unknown location");
        }
    }

    public static CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler defaultScheduler) throws Exception {
        return getCheckpointCoordinator(defaultScheduler).triggerCheckpoint(false);
    }

    public static void acknowledgeCurrentCheckpoint(DefaultScheduler defaultScheduler) {
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(defaultScheduler);
        Assert.assertEquals("Coordinator has not ", 1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        while (pendingCheckpoint.getNumberOfNonAcknowledgedOperatorCoordinators() > 0) {
            try {
                Thread.sleep(1L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                Assert.fail("interrupted");
            }
        }
        getAllCurrentExecutionAttempts(defaultScheduler).forEach(executionAttemptID -> {
            defaultScheduler.acknowledgeCheckpoint(pendingCheckpoint.getJobId(), executionAttemptID, pendingCheckpoint.getCheckpointId(), new CheckpointMetrics(), (TaskStateSnapshot) null);
        });
    }

    public static CompletedCheckpoint takeCheckpoint(DefaultScheduler defaultScheduler) throws Exception {
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(defaultScheduler);
        checkpointCoordinator.triggerCheckpoint(false);
        Assert.assertEquals("test setup inconsistent", 1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        CompletableFuture completionFuture = pendingCheckpoint.getCompletionFuture();
        acknowledgePendingCheckpoint(defaultScheduler, pendingCheckpoint.getCheckpointId());
        CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) completionFuture.getNow(null);
        Assert.assertNotNull("checkpoint not complete", completedCheckpoint);
        return completedCheckpoint;
    }

    public static CheckpointCoordinator getCheckpointCoordinator(SchedulerBase schedulerBase) {
        return schedulerBase.getCheckpointCoordinator();
    }

    private static ExecutionJobVertex getJobVertex(DefaultScheduler defaultScheduler, JobVertexID jobVertexID) {
        return defaultScheduler.getExecutionVertex(new ExecutionVertexID(jobVertexID, 0)).getJobVertex();
    }

    public static ExecutionAttemptID getAttemptId(DefaultScheduler defaultScheduler, JobVertexID jobVertexID, int i) {
        ExecutionJobVertex jobVertex = getJobVertex(defaultScheduler, jobVertexID);
        if ($assertionsDisabled || jobVertex != null) {
            return jobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getAttemptId();
        }
        throw new AssertionError();
    }

    public static SlotSharingExecutionSlotAllocatorFactory newSlotSharingExecutionSlotAllocatorFactory() {
        return newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.createWithInfiniteSlotCreation());
    }

    public static SlotSharingExecutionSlotAllocatorFactory newSlotSharingExecutionSlotAllocatorFactory(PhysicalSlotProvider physicalSlotProvider) {
        return newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider, DEFAULT_TIMEOUT);
    }

    public static SlotSharingExecutionSlotAllocatorFactory newSlotSharingExecutionSlotAllocatorFactory(PhysicalSlotProvider physicalSlotProvider, Time time) {
        return new SlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider, true, new TestingPhysicalSlotRequestBulkChecker(), time, new LocalInputPreferredSlotSharingStrategy.Factory());
    }

    static {
        $assertionsDisabled = !SchedulerTestingUtils.class.desiredAssertionStatus();
        DEFAULT_TIMEOUT = Time.seconds(300L);
    }
}
