package org.apache.flink.runtime.checkpoint;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.class */
public class CheckpointCoordinatorTriggeringTest extends TestLogger {
    private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
    private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
    private CheckpointFailureManager failureManager;

    @Before
    public void setUp() throws Exception {
        this.failureManager = new CheckpointFailureManager(0, NoOpFailJobCall.INSTANCE);
        this.manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    public void testPeriodicTriggering() {
        try {
            JobID jobID = new JobID();
            final long currentTimeMillis = System.currentTimeMillis();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID3);
            final AtomicInteger atomicInteger = new AtomicInteger();
            ((Execution) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTriggeringTest.1
                private long lastId = -1;
                private long lastTs = -1;

                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Void m13answer(InvocationOnMock invocationOnMock) throws Throwable {
                    long longValue = ((Long) invocationOnMock.getArguments()[0]).longValue();
                    long longValue2 = ((Long) invocationOnMock.getArguments()[1]).longValue();
                    Assert.assertTrue(longValue > this.lastId);
                    Assert.assertTrue(longValue2 >= this.lastTs);
                    Assert.assertTrue(longValue2 >= currentTimeMillis);
                    this.lastId = longValue;
                    this.lastTs = longValue2;
                    atomicInteger.incrementAndGet();
                    return null;
                }
            }).when(mockExecutionVertex.getCurrentExecutionAttempt())).triggerCheckpoint(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (CheckpointOptions) ArgumentMatchers.any(CheckpointOptions.class));
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, new CheckpointCoordinatorConfiguration(10L, 200000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, 0), new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex3}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), this.manuallyTriggeredScheduledExecutor, SharedStateRegistry.DEFAULT_FACTORY, this.failureManager);
            checkpointCoordinator.startCheckpointScheduler();
            do {
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            } while (atomicInteger.get() < 5);
            Assert.assertEquals(5L, atomicInteger.get());
            checkpointCoordinator.stopCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            Assert.assertEquals(5L, atomicInteger.get());
            atomicInteger.set(0);
            checkpointCoordinator.startCheckpointScheduler();
            do {
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            } while (atomicInteger.get() < 5);
            Assert.assertEquals(5L, atomicInteger.get());
            checkpointCoordinator.stopCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            Assert.assertEquals(5L, atomicInteger.get());
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMinTimeBetweenCheckpointsInterval() throws Exception {
        JobID jobID = new JobID();
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(executionAttemptID);
        Execution currentExecutionAttempt = mockExecutionVertex.getCurrentExecutionAttempt();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        ((Execution) Mockito.doAnswer(invocationOnMock -> {
            linkedBlockingQueue.add((Long) invocationOnMock.getArguments()[0]);
            return null;
        }).when(currentExecutionAttempt)).triggerCheckpoint(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (CheckpointOptions) ArgumentMatchers.any(CheckpointOptions.class));
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, new CheckpointCoordinatorConfiguration(12L, 200000L, 50L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, 0), new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), this.manuallyTriggeredScheduledExecutor, SharedStateRegistry.DEFAULT_FACTORY, this.failureManager);
        try {
            checkpointCoordinator.startCheckpointScheduler();
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            Assert.assertEquals(1L, ((Long) linkedBlockingQueue.take()).longValue());
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jobID, executionAttemptID, 1L);
            long nanoTime = System.nanoTime();
            checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
            this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            while (linkedBlockingQueue.isEmpty()) {
                Thread.sleep(12L);
                this.manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
            }
            Long l = (Long) linkedBlockingQueue.take();
            long nanoTime2 = System.nanoTime();
            Assert.assertEquals(2L, l.longValue());
            long j = (nanoTime2 - nanoTime) / 1000000;
            if (j + 1 < 50) {
                Assert.fail("checkpoint came too early: delay was " + j + " but should have been at least 50");
            }
        } finally {
            checkpointCoordinator.stopCheckpointScheduler();
            checkpointCoordinator.shutdown(JobStatus.FINISHED);
        }
    }

    @Test
    public void testStopPeriodicScheduler() throws Exception {
        ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTestingUtils.mockExecutionVertex(new ExecutionAttemptID());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(new JobID(), new CheckpointCoordinatorConfiguration(600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, 0), new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), this.manuallyTriggeredScheduledExecutor, SharedStateRegistry.DEFAULT_FACTORY, this.failureManager);
        try {
            checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (String) null, true, false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assert.fail("The triggerCheckpoint call expected an exception");
        } catch (CheckpointException e) {
            Assert.assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, e.getCheckpointFailureReason());
        }
        try {
            checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (String) null, false, false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
        } catch (CheckpointException e2) {
            Assert.fail("Unexpected exception : " + e2.getCheckpointFailureReason().message());
        }
    }
}
