package org.apache.flink.streaming.runtime.tasks;

import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.Preconditions;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.class */
public class SynchronousCheckpointTest {
    private StreamTaskUnderTest streamTaskUnderTest;
    private CompletableFuture<Void> taskInvocation;
    private LinkedBlockingQueue<Event> eventQueue = new LinkedBlockingQueue<>();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest$Event.class */
    private enum Event {
        TASK_INITIALIZED
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest$StreamTaskUnderTest.class */
    public static class StreamTaskUnderTest extends StreamTaskTest.NoOpStreamTask {
        private Queue<Event> eventQueue;
        private volatile boolean stopped;

        StreamTaskUnderTest(Environment environment, Queue<Event> queue) {
            super(environment);
            this.eventQueue = (Queue) Preconditions.checkNotNull(queue);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask
        public void init() {
            this.eventQueue.add(Event.TASK_INITIALIZED);
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            if (this.stopped || isCanceled()) {
                controller.allActionsCompleted();
            }
        }

        void stopTask() {
            this.stopped = true;
        }
    }

    @Before
    public void setupTestEnvironment() throws InterruptedException {
        this.taskInvocation = CompletableFuture.runAsync(() -> {
            this.streamTaskUnderTest = createTask(this.eventQueue);
            try {
                this.streamTaskUnderTest.invoke();
            } catch (RuntimeException e) {
                throw e;
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }, Executors.newSingleThreadExecutor());
        Assert.assertThat(this.eventQueue.take(), Matchers.is(Event.TASK_INITIALIZED));
    }

    @Test(timeout = 20000)
    public void synchronousCheckpointBlocksUntilNotificationForCorrectCheckpointComes() throws Exception {
        launchSynchronousSavepointAndWaitForSyncSavepointIdToBeSet();
        Assert.assertTrue(this.streamTaskUnderTest.getSynchronousSavepointId().isPresent());
        this.streamTaskUnderTest.notifyCheckpointCompleteAsync(41L).get();
        Assert.assertTrue(this.streamTaskUnderTest.getSynchronousSavepointId().isPresent());
        this.streamTaskUnderTest.notifyCheckpointCompleteAsync(42L).get();
        Assert.assertFalse(this.streamTaskUnderTest.getSynchronousSavepointId().isPresent());
        this.streamTaskUnderTest.stopTask();
        waitUntilMainExecutionThreadIsFinished();
        Assert.assertFalse(this.streamTaskUnderTest.isCanceled());
    }

    @Test(timeout = 10000)
    public void cancelShouldAlsoCancelPendingSynchronousCheckpoint() throws Throwable {
        launchSynchronousSavepointAndWaitForSyncSavepointIdToBeSet();
        Assert.assertTrue(this.streamTaskUnderTest.getSynchronousSavepointId().isPresent());
        this.streamTaskUnderTest.cancel();
        waitUntilMainExecutionThreadIsFinished();
        Assert.assertTrue(this.streamTaskUnderTest.isCanceled());
    }

    private void launchSynchronousSavepointAndWaitForSyncSavepointIdToBeSet() throws InterruptedException {
        this.streamTaskUnderTest.triggerCheckpointAsync(new CheckpointMetaData(42L, System.currentTimeMillis()), new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, CheckpointStorageLocationReference.getDefault()), false);
        waitForSyncSavepointIdToBeSet(this.streamTaskUnderTest);
    }

    private void waitUntilMainExecutionThreadIsFinished() {
        try {
            this.taskInvocation.get();
        } catch (Exception e) {
            Assert.assertThat(e.getCause(), Matchers.is(Matchers.instanceOf(CancelTaskException.class)));
        }
    }

    private void waitForSyncSavepointIdToBeSet(StreamTask streamTask) throws InterruptedException {
        while (!streamTask.getSynchronousSavepointId().isPresent()) {
            Thread.sleep(10L);
            if (this.taskInvocation.isDone()) {
                Assert.fail("Task has been terminated too early");
            }
        }
    }

    private static StreamTaskUnderTest createTask(Queue<Event> queue) {
        return new StreamTaskUnderTest(new DummyEnvironment("test", 1, 0), queue);
    }
}
