package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategyTest;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphCheckpointPlanCalculatorContext;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.class */
class CheckpointCoordinatorMasterHooksTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest$LongSerializer.class */
    private static final class LongSerializer implements SimpleVersionedSerializer<Long> {
        static final int VERSION = 5;

        private LongSerializer() {
        }

        public int getVersion() {
            return VERSION;
        }

        public byte[] serialize(Long l) throws IOException {
            byte[] bArr = new byte[8];
            ByteBuffer.wrap(bArr).order(ByteOrder.LITTLE_ENDIAN).putLong(0, l.longValue());
            return bArr;
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Long m21deserialize(int i, byte[] bArr) throws IOException {
            Assertions.assertThat(i).isEqualTo(VERSION);
            Assertions.assertThat(bArr.length).isEqualTo(8);
            return Long.valueOf(ByteBuffer.wrap(bArr).order(ByteOrder.LITTLE_ENDIAN).getLong(0));
        }
    }

    CheckpointCoordinatorMasterHooksTest() {
    }

    @Test
    void testDeduplicateOnRegister() throws Exception {
        CheckpointCoordinator instantiateCheckpointCoordinator = instantiateCheckpointCoordinator(new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()));
        MasterTriggerRestoreHook masterTriggerRestoreHook = (MasterTriggerRestoreHook) Mockito.mock(MasterTriggerRestoreHook.class);
        Mockito.when(masterTriggerRestoreHook.getIdentifier()).thenReturn("test id");
        MasterTriggerRestoreHook masterTriggerRestoreHook2 = (MasterTriggerRestoreHook) Mockito.mock(MasterTriggerRestoreHook.class);
        Mockito.when(masterTriggerRestoreHook2.getIdentifier()).thenReturn("test id");
        MasterTriggerRestoreHook masterTriggerRestoreHook3 = (MasterTriggerRestoreHook) Mockito.mock(MasterTriggerRestoreHook.class);
        Mockito.when(masterTriggerRestoreHook3.getIdentifier()).thenReturn("anotherId");
        Assertions.assertThat(instantiateCheckpointCoordinator.addMasterHook(masterTriggerRestoreHook)).isTrue();
        Assertions.assertThat(instantiateCheckpointCoordinator.addMasterHook(masterTriggerRestoreHook2)).isFalse();
        Assertions.assertThat(instantiateCheckpointCoordinator.addMasterHook(masterTriggerRestoreHook3)).isTrue();
    }

    @Test
    void testNullOrInvalidId() throws Exception {
        CheckpointCoordinator instantiateCheckpointCoordinator = instantiateCheckpointCoordinator(new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()));
        try {
            instantiateCheckpointCoordinator.addMasterHook((MasterTriggerRestoreHook) null);
            Assertions.fail("expected an exception");
        } catch (NullPointerException e) {
        }
        try {
            instantiateCheckpointCoordinator.addMasterHook((MasterTriggerRestoreHook) Mockito.mock(MasterTriggerRestoreHook.class));
            Assertions.fail("expected an exception");
        } catch (IllegalArgumentException e2) {
        }
        try {
            MasterTriggerRestoreHook masterTriggerRestoreHook = (MasterTriggerRestoreHook) Mockito.mock(MasterTriggerRestoreHook.class);
            Mockito.when(masterTriggerRestoreHook.getIdentifier()).thenReturn("        ");
            instantiateCheckpointCoordinator.addMasterHook(masterTriggerRestoreHook);
            Assertions.fail("expected an exception");
        } catch (IllegalArgumentException e3) {
        }
    }

    @Test
    void testHookReset() throws Exception {
        MasterTriggerRestoreHook masterTriggerRestoreHook = (MasterTriggerRestoreHook) mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when(masterTriggerRestoreHook.getIdentifier()).thenReturn("id1");
        MasterTriggerRestoreHook masterTriggerRestoreHook2 = (MasterTriggerRestoreHook) mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when(masterTriggerRestoreHook2.getIdentifier()).thenReturn("id2");
        CheckpointCoordinator instantiateCheckpointCoordinator = instantiateCheckpointCoordinator(new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()));
        instantiateCheckpointCoordinator.addMasterHook(masterTriggerRestoreHook);
        instantiateCheckpointCoordinator.addMasterHook(masterTriggerRestoreHook2);
        instantiateCheckpointCoordinator.restoreLatestCheckpointedStateToAll(Collections.emptySet(), false);
        ((MasterTriggerRestoreHook) Mockito.verify(masterTriggerRestoreHook, Mockito.times(1))).reset();
        ((MasterTriggerRestoreHook) Mockito.verify(masterTriggerRestoreHook2, Mockito.times(1))).reset();
        instantiateCheckpointCoordinator.shutdown();
        ((MasterTriggerRestoreHook) Mockito.verify(masterTriggerRestoreHook, Mockito.times(1))).close();
        ((MasterTriggerRestoreHook) Mockito.verify(masterTriggerRestoreHook2, Mockito.times(1))).close();
    }

    @Test
    void testHooksAreCalledOnTrigger() throws Exception {
        byte[] serialize = new CheckpointCoordinatorTestingUtils.StringSerializer().serialize("the-test-string-state");
        byte[] serialize2 = new LongSerializer().serialize((Long) 987654321L);
        MasterTriggerRestoreHook masterTriggerRestoreHook = (MasterTriggerRestoreHook) mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when(masterTriggerRestoreHook.getIdentifier()).thenReturn("id1");
        Mockito.when(masterTriggerRestoreHook.createCheckpointDataSerializer()).thenReturn(new CheckpointCoordinatorTestingUtils.StringSerializer());
        Mockito.when(masterTriggerRestoreHook.triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor) Mockito.any(Executor.class))).thenReturn(CompletableFuture.completedFuture("the-test-string-state"));
        MasterTriggerRestoreHook masterTriggerRestoreHook2 = (MasterTriggerRestoreHook) mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when(masterTriggerRestoreHook2.getIdentifier()).thenReturn("id2");
        Mockito.when(masterTriggerRestoreHook2.createCheckpointDataSerializer()).thenReturn(new LongSerializer());
        Mockito.when(masterTriggerRestoreHook2.triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor) Mockito.any(Executor.class))).thenReturn(CompletableFuture.completedFuture(987654321L));
        MasterTriggerRestoreHook masterTriggerRestoreHook3 = (MasterTriggerRestoreHook) mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when(masterTriggerRestoreHook3.getIdentifier()).thenReturn("some-id");
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        CheckpointCoordinator instantiateCheckpointCoordinator = instantiateCheckpointCoordinator(build, manuallyTriggeredScheduledExecutor);
        instantiateCheckpointCoordinator.addMasterHook(masterTriggerRestoreHook);
        instantiateCheckpointCoordinator.addMasterHook(masterTriggerRestoreHook3);
        instantiateCheckpointCoordinator.addMasterHook(masterTriggerRestoreHook2);
        CompletableFuture triggerCheckpoint = instantiateCheckpointCoordinator.triggerCheckpoint(false);
        manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(triggerCheckpoint).isNotCompletedExceptionally();
        Assertions.assertThat(instantiateCheckpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
        ((MasterTriggerRestoreHook) Mockito.verify(masterTriggerRestoreHook, Mockito.times(1))).triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor) Mockito.any(Executor.class));
        ((MasterTriggerRestoreHook) Mockito.verify(masterTriggerRestoreHook2, Mockito.times(1))).triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor) Mockito.any(Executor.class));
        ((MasterTriggerRestoreHook) Mockito.verify(masterTriggerRestoreHook3, Mockito.times(1))).triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor) Mockito.any(Executor.class));
        instantiateCheckpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId(), ((PendingCheckpoint) instantiateCheckpointCoordinator.getPendingCheckpoints().values().iterator().next()).getCheckpointID()), "Unknown location");
        Assertions.assertThat(instantiateCheckpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat(instantiateCheckpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
        Collection<MasterState> masterHookStates = instantiateCheckpointCoordinator.getCheckpointStore().getLatestCheckpoint().getMasterHookStates();
        Assertions.assertThat(masterHookStates.size()).isEqualTo(2);
        for (MasterState masterState : masterHookStates) {
            if (masterState.name().equals("id1")) {
                Assertions.assertThat(masterState.bytes()).isEqualTo(serialize);
                Assertions.assertThat(masterState.version()).isEqualTo(77);
            } else if (masterState.name().equals("id2")) {
                Assertions.assertThat(masterState.bytes()).isEqualTo(serialize2);
                Assertions.assertThat(masterState.version()).isEqualTo(5);
            } else {
                Assertions.fail("unrecognized state name: " + masterState.name());
            }
        }
    }

    @Test
    void testHooksAreCalledOnRestore() throws Exception {
        List asList = Arrays.asList(new MasterState("id1", new CheckpointCoordinatorTestingUtils.StringSerializer().serialize("the-test-string-state"), 77), new MasterState("id2", new LongSerializer().serialize((Long) 987654321L), 5));
        MasterTriggerRestoreHook masterTriggerRestoreHook = (MasterTriggerRestoreHook) mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when(masterTriggerRestoreHook.getIdentifier()).thenReturn("id1");
        Mockito.when(masterTriggerRestoreHook.createCheckpointDataSerializer()).thenReturn(new CheckpointCoordinatorTestingUtils.StringSerializer());
        Mockito.when(masterTriggerRestoreHook.triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor) Mockito.any(Executor.class))).thenThrow(new Throwable[]{new Exception("not expected")});
        MasterTriggerRestoreHook masterTriggerRestoreHook2 = (MasterTriggerRestoreHook) mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when(masterTriggerRestoreHook2.getIdentifier()).thenReturn("id2");
        Mockito.when(masterTriggerRestoreHook2.createCheckpointDataSerializer()).thenReturn(new LongSerializer());
        Mockito.when(masterTriggerRestoreHook2.triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor) Mockito.any(Executor.class))).thenThrow(new Throwable[]{new Exception("not expected")});
        MasterTriggerRestoreHook masterTriggerRestoreHook3 = (MasterTriggerRestoreHook) mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when(masterTriggerRestoreHook3.getIdentifier()).thenReturn("some-id");
        CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(new JobID(), 13L, 123L, 125L, Collections.emptyMap(), asList, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), new TestCompletedCheckpointStorageLocation(), (CompletedCheckpointStats) null);
        CheckpointCoordinator instantiateCheckpointCoordinator = instantiateCheckpointCoordinator(new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()));
        instantiateCheckpointCoordinator.addMasterHook(masterTriggerRestoreHook);
        instantiateCheckpointCoordinator.addMasterHook(masterTriggerRestoreHook3);
        instantiateCheckpointCoordinator.addMasterHook(masterTriggerRestoreHook2);
        instantiateCheckpointCoordinator.getCheckpointStore().addCheckpointAndSubsumeOldestOne(completedCheckpoint, new CheckpointsCleaner(), () -> {
        });
        instantiateCheckpointCoordinator.restoreLatestCheckpointedStateToAll(Collections.emptySet(), false);
        ((MasterTriggerRestoreHook) Mockito.verify(masterTriggerRestoreHook, Mockito.times(1))).restoreCheckpoint(Matchers.eq(13L), Matchers.eq("the-test-string-state"));
        ((MasterTriggerRestoreHook) Mockito.verify(masterTriggerRestoreHook2, Mockito.times(1))).restoreCheckpoint(Matchers.eq(13L), Long.valueOf(Matchers.eq(987654321L)));
        ((MasterTriggerRestoreHook) Mockito.verify(masterTriggerRestoreHook3, Mockito.times(1))).restoreCheckpoint(Matchers.eq(13L), Matchers.isNull(Void.class));
    }

    @Test
    void checkUnMatchedStateOnRestore() throws Exception {
        List asList = Arrays.asList(new MasterState("id1", new CheckpointCoordinatorTestingUtils.StringSerializer().serialize("the-test-string-state"), 77), new MasterState("id2", new LongSerializer().serialize((Long) 987654321L), 5));
        MasterTriggerRestoreHook masterTriggerRestoreHook = (MasterTriggerRestoreHook) mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when(masterTriggerRestoreHook.getIdentifier()).thenReturn("id1");
        Mockito.when(masterTriggerRestoreHook.createCheckpointDataSerializer()).thenReturn(new CheckpointCoordinatorTestingUtils.StringSerializer());
        Mockito.when(masterTriggerRestoreHook.triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor) Mockito.any(Executor.class))).thenThrow(new Throwable[]{new Exception("not expected")});
        MasterTriggerRestoreHook masterTriggerRestoreHook2 = (MasterTriggerRestoreHook) mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when(masterTriggerRestoreHook2.getIdentifier()).thenReturn("some-id");
        CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(new JobID(), 44L, 123L, 125L, Collections.emptyMap(), asList, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), new TestCompletedCheckpointStorageLocation(), (CompletedCheckpointStats) null);
        CheckpointCoordinator instantiateCheckpointCoordinator = instantiateCheckpointCoordinator(new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()));
        instantiateCheckpointCoordinator.addMasterHook(masterTriggerRestoreHook);
        instantiateCheckpointCoordinator.addMasterHook(masterTriggerRestoreHook2);
        instantiateCheckpointCoordinator.getCheckpointStore().addCheckpointAndSubsumeOldestOne(completedCheckpoint, new CheckpointsCleaner(), () -> {
        });
        try {
            instantiateCheckpointCoordinator.restoreLatestCheckpointedStateToAll(Collections.emptySet(), false);
            Assertions.fail("exception expected");
        } catch (IllegalStateException e) {
        }
        instantiateCheckpointCoordinator.restoreLatestCheckpointedStateToAll(Collections.emptySet(), true);
        ((MasterTriggerRestoreHook) Mockito.verify(masterTriggerRestoreHook, Mockito.times(1))).restoreCheckpoint(Matchers.eq(44L), Matchers.eq("the-test-string-state"));
        ((MasterTriggerRestoreHook) Mockito.verify(masterTriggerRestoreHook2, Mockito.times(1))).restoreCheckpoint(Matchers.eq(44L), Matchers.isNull(Void.class));
    }

    @Test
    void ensureRegisteredAtHookTime() throws Exception {
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        final CheckpointCoordinator instantiateCheckpointCoordinator = instantiateCheckpointCoordinator(build, manuallyTriggeredScheduledExecutor);
        MasterTriggerRestoreHook masterTriggerRestoreHook = (MasterTriggerRestoreHook) mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when(masterTriggerRestoreHook.getIdentifier()).thenReturn("id");
        Mockito.when(masterTriggerRestoreHook.triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor) Mockito.any(Executor.class))).thenAnswer(new Answer<CompletableFuture<Void>>() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorMasterHooksTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public CompletableFuture<Void> m20answer(InvocationOnMock invocationOnMock) throws Throwable {
                Assertions.assertThat(instantiateCheckpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
                Assertions.assertThat(instantiateCheckpointCoordinator.getPendingCheckpoints()).containsKey(Long.valueOf(((Long) invocationOnMock.getArguments()[0]).longValue()));
                return null;
            }
        });
        instantiateCheckpointCoordinator.addMasterHook(masterTriggerRestoreHook);
        CompletableFuture triggerCheckpoint = instantiateCheckpointCoordinator.triggerCheckpoint(false);
        manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat(triggerCheckpoint).isNotCompletedExceptionally();
    }

    @Test
    void testSerializationFailsOnTrigger() {
    }

    @Test
    void testHookCallFailsOnTrigger() {
    }

    @Test
    void testDeserializationFailsOnRestore() {
    }

    @Test
    void testHookCallFailsOnRestore() {
    }

    @Test
    void testTypeIncompatibleWithSerializerOnStore() {
    }

    @Test
    void testTypeIncompatibleWithHookOnRestore() {
    }

    private CheckpointCoordinator instantiateCheckpointCoordinator(ExecutionGraph executionGraph) {
        return instantiateCheckpointCoordinator(executionGraph, new ManuallyTriggeredScheduledExecutor());
    }

    private CheckpointCoordinator instantiateCheckpointCoordinator(ExecutionGraph executionGraph, ScheduledExecutor scheduledExecutor) {
        return new CheckpointCoordinator(executionGraph.getJobID(), new CheckpointCoordinatorConfiguration(10000000L, 600000L, 0L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, 0, 0L), Collections.emptyList(), new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(10), new MemoryStateBackend(), Executors.directExecutor(), new CheckpointsCleaner(), scheduledExecutor, new CheckpointFailureManager(0, NoOpFailJobCall.INSTANCE), new DefaultCheckpointPlanCalculator(executionGraph.getJobID(), new ExecutionGraphCheckpointPlanCalculatorContext(executionGraph), executionGraph.getVerticesTopologically(), false), new CheckpointStatsTracker(1, new WatermarkStrategyTest.DummyMetricGroup(), new JobID()));
    }

    private static <T> T mockGeneric(Class<?> cls) {
        return (T) Mockito.mock(cls);
    }
}
