package org.apache.flink.streaming.runtime.tasks;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.SharedResources;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.SerializedValue;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;

@Timeout(value = 10, unit = TimeUnit.SECONDS)
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.class */
class SynchronousCheckpointITCase {
    private static final LinkedBlockingQueue<Event> EVENT_QUEUE = new LinkedBlockingQueue<>();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase$Event.class */
    private enum Event {
        TASK_IS_RUNNING,
        PRE_TRIGGER_CHECKPOINT,
        PRE_NOTIFY_CHECKPOINT_COMPLETE,
        POST_NOTIFY_CHECKPOINT_COMPLETE,
        POST_TRIGGER_CHECKPOINT
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase$SynchronousCheckpointTestingTask.class */
    public static class SynchronousCheckpointTestingTask extends StreamTask {
        private boolean isRunning;

        public SynchronousCheckpointTestingTask(Environment environment) throws Exception {
            super(environment);
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            if (!this.isRunning) {
                this.isRunning = true;
                SynchronousCheckpointITCase.EVENT_QUEUE.put(Event.TASK_IS_RUNNING);
            }
            if (!isCanceled()) {
                controller.suspendDefaultAction();
            } else {
                controller.suspendDefaultAction();
                this.mailboxProcessor.suspend();
            }
        }

        public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            try {
                SynchronousCheckpointITCase.EVENT_QUEUE.put(Event.PRE_TRIGGER_CHECKPOINT);
                CompletableFuture<Boolean> triggerCheckpointAsync = super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
                SynchronousCheckpointITCase.EVENT_QUEUE.put(Event.POST_TRIGGER_CHECKPOINT);
                return triggerCheckpointAsync;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        public Future<Void> notifyCheckpointCompleteAsync(long j) {
            try {
                SynchronousCheckpointITCase.EVENT_QUEUE.put(Event.PRE_NOTIFY_CHECKPOINT_COMPLETE);
                Future<Void> notifyCheckpointCompleteAsync = super.notifyCheckpointCompleteAsync(j);
                SynchronousCheckpointITCase.EVENT_QUEUE.put(Event.POST_NOTIFY_CHECKPOINT_COMPLETE);
                return notifyCheckpointCompleteAsync;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        public Future<Void> notifyCheckpointAbortAsync(long j, long j2) {
            return CompletableFuture.completedFuture(null);
        }

        protected void init() {
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetricsBuilder) {
            throw new UnsupportedOperationException("Should not be called");
        }

        public void abortCheckpointOnBarrier(long j, CheckpointException checkpointException) {
            throw new UnsupportedOperationException("Should not be called");
        }

        protected void cleanUpInternal() {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase$TaskCleaner.class */
    private static class TaskCleaner implements AutoCloseable {
        private final Task task;

        private TaskCleaner(Task task) {
            this.task = task;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            this.task.cancelExecution();
            this.task.getExecutingThread().join(5000L);
        }
    }

    SynchronousCheckpointITCase() {
    }

    @Test
    void taskDispatcherThreadPoolAllowsForSynchronousCheckpoints() throws Exception {
        Task createTask = createTask(SynchronousCheckpointTestingTask.class);
        TaskCleaner taskCleaner = new TaskCleaner(createTask);
        try {
            createTask.startTaskThread();
            Assertions.assertThat(EVENT_QUEUE.take()).isEqualTo(Event.TASK_IS_RUNNING);
            Assertions.assertThat(EVENT_QUEUE).isEmpty();
            Assertions.assertThat(createTask.getExecutionState()).isEqualTo(ExecutionState.RUNNING);
            createTask.triggerCheckpointBarrier(42L, 156865867234L, new CheckpointOptions(SavepointType.suspend(SavepointFormatType.CANONICAL), CheckpointStorageLocationReference.getDefault()));
            Assertions.assertThat(EVENT_QUEUE.take()).isEqualTo(Event.PRE_TRIGGER_CHECKPOINT);
            Assertions.assertThat(EVENT_QUEUE.take()).isEqualTo(Event.POST_TRIGGER_CHECKPOINT);
            Assertions.assertThat(EVENT_QUEUE).isEmpty();
            createTask.notifyCheckpointComplete(42L);
            Assertions.assertThat(EVENT_QUEUE.take()).isEqualTo(Event.PRE_NOTIFY_CHECKPOINT_COMPLETE);
            Assertions.assertThat(EVENT_QUEUE.take()).isEqualTo(Event.POST_NOTIFY_CHECKPOINT_COMPLETE);
            Assertions.assertThat(EVENT_QUEUE).isEmpty();
            Assertions.assertThat(createTask.getExecutionState()).isEqualTo(ExecutionState.RUNNING);
            taskCleaner.close();
        } catch (Throwable th) {
            try {
                taskCleaner.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private Task createTask(Class<? extends TaskInvokable> cls) throws Exception {
        PartitionProducerStateChecker partitionProducerStateChecker = (PartitionProducerStateChecker) Mockito.mock(PartitionProducerStateChecker.class);
        Executor executor = (Executor) Mockito.mock(Executor.class);
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        TaskMetricGroup createUnregisteredTaskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
        JobInformation jobInformation = new JobInformation(new JobID(), JobType.STREAMING, "Job Name", new SerializedValue(new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "Test Task", 1, 1, cls.getName(), new Configuration());
        return new Task(jobInformation, taskInformation, ExecutionGraphTestUtils.createExecutionAttemptId(taskInformation.getJobVertexId()), new AllocationID(), Collections.emptyList(), Collections.emptyList(), (MemoryManager) Mockito.mock(MemoryManager.class), new SharedResources(), (IOManager) Mockito.mock(IOManager.class), build, new KvStateService(new KvStateRegistry(), (KvStateServer) null, (KvStateClientProxy) null), (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), new TaskEventDispatcher(), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, new TestTaskStateManager(), (TaskManagerActions) Mockito.mock(TaskManagerActions.class), (InputSplitProvider) Mockito.mock(InputSplitProvider.class), (CheckpointResponder) Mockito.mock(CheckpointResponder.class), new NoOpTaskOperatorEventGateway(), new TestGlobalAggregateManager(), TestingClassLoaderLease.newBuilder().build(), (FileCache) Mockito.mock(FileCache.class), new TestingTaskManagerRuntimeInfo(), createUnregisteredTaskMetricGroup, partitionProducerStateChecker, executor, new ChannelStateWriteRequestExecutorFactory(jobInformation.getJobId()));
    }
}
