/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategyTest;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator;
import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculatorContext;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator;
import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.NoOpFailJobCall;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphCheckpointPlanCalculatorContext;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class CheckpointCoordinatorMasterHooksTest {
    @Test
    public void testDeduplicateOnRegister() throws Exception {
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build();
        CheckpointCoordinator cc = this.instantiateCheckpointCoordinator(graph);
        MasterTriggerRestoreHook hook1 = (MasterTriggerRestoreHook)Mockito.mock(MasterTriggerRestoreHook.class);
        Mockito.when((Object)hook1.getIdentifier()).thenReturn((Object)"test id");
        MasterTriggerRestoreHook hook2 = (MasterTriggerRestoreHook)Mockito.mock(MasterTriggerRestoreHook.class);
        Mockito.when((Object)hook2.getIdentifier()).thenReturn((Object)"test id");
        MasterTriggerRestoreHook hook3 = (MasterTriggerRestoreHook)Mockito.mock(MasterTriggerRestoreHook.class);
        Mockito.when((Object)hook3.getIdentifier()).thenReturn((Object)"anotherId");
        Assert.assertTrue((boolean)cc.addMasterHook(hook1));
        Assert.assertFalse((boolean)cc.addMasterHook(hook2));
        Assert.assertTrue((boolean)cc.addMasterHook(hook3));
    }

    @Test
    public void testNullOrInvalidId() throws Exception {
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build();
        CheckpointCoordinator cc = this.instantiateCheckpointCoordinator(graph);
        try {
            cc.addMasterHook(null);
            Assert.fail((String)"expected an exception");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        try {
            cc.addMasterHook((MasterTriggerRestoreHook)Mockito.mock(MasterTriggerRestoreHook.class));
            Assert.fail((String)"expected an exception");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        try {
            MasterTriggerRestoreHook hook = (MasterTriggerRestoreHook)Mockito.mock(MasterTriggerRestoreHook.class);
            Mockito.when((Object)hook.getIdentifier()).thenReturn((Object)"        ");
            cc.addMasterHook(hook);
            Assert.fail((String)"expected an exception");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testHookReset() throws Exception {
        String id1 = "id1";
        String id2 = "id2";
        MasterTriggerRestoreHook hook1 = (MasterTriggerRestoreHook)CheckpointCoordinatorMasterHooksTest.mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when((Object)hook1.getIdentifier()).thenReturn((Object)"id1");
        MasterTriggerRestoreHook hook2 = (MasterTriggerRestoreHook)CheckpointCoordinatorMasterHooksTest.mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when((Object)hook2.getIdentifier()).thenReturn((Object)"id2");
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build();
        CheckpointCoordinator cc = this.instantiateCheckpointCoordinator(graph);
        cc.addMasterHook(hook1);
        cc.addMasterHook(hook2);
        cc.restoreLatestCheckpointedStateToAll(Collections.emptySet(), false);
        ((MasterTriggerRestoreHook)Mockito.verify((Object)hook1, (VerificationMode)Mockito.times((int)1))).reset();
        ((MasterTriggerRestoreHook)Mockito.verify((Object)hook2, (VerificationMode)Mockito.times((int)1))).reset();
        cc.shutdown();
        ((MasterTriggerRestoreHook)Mockito.verify((Object)hook1, (VerificationMode)Mockito.times((int)1))).close();
        ((MasterTriggerRestoreHook)Mockito.verify((Object)hook2, (VerificationMode)Mockito.times((int)1))).close();
    }

    @Test
    public void testHooksAreCalledOnTrigger() throws Exception {
        String id1 = "id1";
        String id2 = "id2";
        String state1 = "the-test-string-state";
        byte[] state1serialized = new CheckpointCoordinatorTestingUtils.StringSerializer().serialize("the-test-string-state");
        long state2 = 987654321L;
        byte[] state2serialized = new LongSerializer().serialize(987654321L);
        MasterTriggerRestoreHook statefulHook1 = (MasterTriggerRestoreHook)CheckpointCoordinatorMasterHooksTest.mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when((Object)statefulHook1.getIdentifier()).thenReturn((Object)"id1");
        Mockito.when((Object)statefulHook1.createCheckpointDataSerializer()).thenReturn((Object)new CheckpointCoordinatorTestingUtils.StringSerializer());
        Mockito.when((Object)statefulHook1.triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor)Mockito.any(Executor.class))).thenReturn(CompletableFuture.completedFuture("the-test-string-state"));
        MasterTriggerRestoreHook statefulHook2 = (MasterTriggerRestoreHook)CheckpointCoordinatorMasterHooksTest.mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when((Object)statefulHook2.getIdentifier()).thenReturn((Object)"id2");
        Mockito.when((Object)statefulHook2.createCheckpointDataSerializer()).thenReturn((Object)new LongSerializer());
        Mockito.when((Object)statefulHook2.triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor)Mockito.any(Executor.class))).thenReturn(CompletableFuture.completedFuture(987654321L));
        MasterTriggerRestoreHook statelessHook = (MasterTriggerRestoreHook)CheckpointCoordinatorMasterHooksTest.mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when((Object)statelessHook.getIdentifier()).thenReturn((Object)"some-id");
        JobVertexID jobVertexId = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexId).build();
        ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        CheckpointCoordinator cc = this.instantiateCheckpointCoordinator(graph, (ScheduledExecutor)manuallyTriggeredScheduledExecutor);
        cc.addMasterHook(statefulHook1);
        cc.addMasterHook(statelessHook);
        cc.addMasterHook(statefulHook2);
        CompletableFuture checkpointFuture = cc.triggerCheckpoint(false);
        manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse((boolean)checkpointFuture.isCompletedExceptionally());
        Assert.assertEquals((long)1L, (long)cc.getNumberOfPendingCheckpoints());
        ((MasterTriggerRestoreHook)Mockito.verify((Object)statefulHook1, (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor)Mockito.any(Executor.class));
        ((MasterTriggerRestoreHook)Mockito.verify((Object)statefulHook2, (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor)Mockito.any(Executor.class));
        ((MasterTriggerRestoreHook)Mockito.verify((Object)statelessHook, (VerificationMode)Mockito.times((int)1))).triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor)Mockito.any(Executor.class));
        ExecutionAttemptID attemptID = graph.getJobVertex(jobVertexId).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        long checkpointId = ((PendingCheckpoint)cc.getPendingCheckpoints().values().iterator().next()).getCheckpointId();
        cc.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID, checkpointId), "Unknown location");
        Assert.assertEquals((long)0L, (long)cc.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)1L, (long)cc.getNumberOfRetainedSuccessfulCheckpoints());
        CompletedCheckpoint chk = cc.getCheckpointStore().getLatestCheckpoint();
        Collection masterStates = chk.getMasterHookStates();
        Assert.assertEquals((long)2L, (long)masterStates.size());
        for (MasterState ms : masterStates) {
            if (ms.name().equals("id1")) {
                Assert.assertArrayEquals((byte[])state1serialized, (byte[])ms.bytes());
                Assert.assertEquals((long)77L, (long)ms.version());
                continue;
            }
            if (ms.name().equals("id2")) {
                Assert.assertArrayEquals((byte[])state2serialized, (byte[])ms.bytes());
                Assert.assertEquals((long)5L, (long)ms.version());
                continue;
            }
            Assert.fail((String)("unrecognized state name: " + ms.name()));
        }
    }

    @Test
    public void testHooksAreCalledOnRestore() throws Exception {
        String id1 = "id1";
        String id2 = "id2";
        String state1 = "the-test-string-state";
        byte[] state1serialized = new CheckpointCoordinatorTestingUtils.StringSerializer().serialize("the-test-string-state");
        long state2 = 987654321L;
        byte[] state2serialized = new LongSerializer().serialize(987654321L);
        List<MasterState> masterHookStates = Arrays.asList(new MasterState("id1", state1serialized, 77), new MasterState("id2", state2serialized, 5));
        MasterTriggerRestoreHook statefulHook1 = (MasterTriggerRestoreHook)CheckpointCoordinatorMasterHooksTest.mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when((Object)statefulHook1.getIdentifier()).thenReturn((Object)"id1");
        Mockito.when((Object)statefulHook1.createCheckpointDataSerializer()).thenReturn((Object)new CheckpointCoordinatorTestingUtils.StringSerializer());
        Mockito.when((Object)statefulHook1.triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor)Mockito.any(Executor.class))).thenThrow(new Throwable[]{new Exception("not expected")});
        MasterTriggerRestoreHook statefulHook2 = (MasterTriggerRestoreHook)CheckpointCoordinatorMasterHooksTest.mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when((Object)statefulHook2.getIdentifier()).thenReturn((Object)"id2");
        Mockito.when((Object)statefulHook2.createCheckpointDataSerializer()).thenReturn((Object)new LongSerializer());
        Mockito.when((Object)statefulHook2.triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor)Mockito.any(Executor.class))).thenThrow(new Throwable[]{new Exception("not expected")});
        MasterTriggerRestoreHook statelessHook = (MasterTriggerRestoreHook)CheckpointCoordinatorMasterHooksTest.mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when((Object)statelessHook.getIdentifier()).thenReturn((Object)"some-id");
        JobID jid = new JobID();
        long checkpointId = 13L;
        CompletedCheckpoint checkpoint = new CompletedCheckpoint(jid, 13L, 123L, 125L, Collections.emptyMap(), masterHookStates, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation(), null);
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build();
        CheckpointCoordinator cc = this.instantiateCheckpointCoordinator(graph);
        cc.addMasterHook(statefulHook1);
        cc.addMasterHook(statelessHook);
        cc.addMasterHook(statefulHook2);
        cc.getCheckpointStore().addCheckpointAndSubsumeOldestOne(checkpoint, new CheckpointsCleaner(), () -> {});
        cc.restoreLatestCheckpointedStateToAll(Collections.emptySet(), false);
        ((MasterTriggerRestoreHook)Mockito.verify((Object)statefulHook1, (VerificationMode)Mockito.times((int)1))).restoreCheckpoint(Matchers.eq((long)13L), Matchers.eq((Object)"the-test-string-state"));
        ((MasterTriggerRestoreHook)Mockito.verify((Object)statefulHook2, (VerificationMode)Mockito.times((int)1))).restoreCheckpoint(Matchers.eq((long)13L), (Object)Matchers.eq((long)987654321L));
        ((MasterTriggerRestoreHook)Mockito.verify((Object)statelessHook, (VerificationMode)Mockito.times((int)1))).restoreCheckpoint(Matchers.eq((long)13L), Matchers.isNull(Void.class));
    }

    @Test
    public void checkUnMatchedStateOnRestore() throws Exception {
        String id1 = "id1";
        String id2 = "id2";
        String state1 = "the-test-string-state";
        byte[] state1serialized = new CheckpointCoordinatorTestingUtils.StringSerializer().serialize("the-test-string-state");
        long state2 = 987654321L;
        byte[] state2serialized = new LongSerializer().serialize(987654321L);
        List<MasterState> masterHookStates = Arrays.asList(new MasterState("id1", state1serialized, 77), new MasterState("id2", state2serialized, 5));
        MasterTriggerRestoreHook statefulHook = (MasterTriggerRestoreHook)CheckpointCoordinatorMasterHooksTest.mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when((Object)statefulHook.getIdentifier()).thenReturn((Object)"id1");
        Mockito.when((Object)statefulHook.createCheckpointDataSerializer()).thenReturn((Object)new CheckpointCoordinatorTestingUtils.StringSerializer());
        Mockito.when((Object)statefulHook.triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor)Mockito.any(Executor.class))).thenThrow(new Throwable[]{new Exception("not expected")});
        MasterTriggerRestoreHook statelessHook = (MasterTriggerRestoreHook)CheckpointCoordinatorMasterHooksTest.mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when((Object)statelessHook.getIdentifier()).thenReturn((Object)"some-id");
        JobID jid = new JobID();
        long checkpointId = 44L;
        CompletedCheckpoint checkpoint = new CompletedCheckpoint(jid, 44L, 123L, 125L, Collections.emptyMap(), masterHookStates, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation(), null);
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build();
        CheckpointCoordinator cc = this.instantiateCheckpointCoordinator(graph);
        cc.addMasterHook(statefulHook);
        cc.addMasterHook(statelessHook);
        cc.getCheckpointStore().addCheckpointAndSubsumeOldestOne(checkpoint, new CheckpointsCleaner(), () -> {});
        try {
            cc.restoreLatestCheckpointedStateToAll(Collections.emptySet(), false);
            Assert.fail((String)"exception expected");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        cc.restoreLatestCheckpointedStateToAll(Collections.emptySet(), true);
        ((MasterTriggerRestoreHook)Mockito.verify((Object)statefulHook, (VerificationMode)Mockito.times((int)1))).restoreCheckpoint(Matchers.eq((long)44L), Matchers.eq((Object)"the-test-string-state"));
        ((MasterTriggerRestoreHook)Mockito.verify((Object)statelessHook, (VerificationMode)Mockito.times((int)1))).restoreCheckpoint(Matchers.eq((long)44L), Matchers.isNull(Void.class));
    }

    @Test
    public void ensureRegisteredAtHookTime() throws Exception {
        String id = "id";
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build();
        ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        final CheckpointCoordinator cc = this.instantiateCheckpointCoordinator(graph, (ScheduledExecutor)manuallyTriggeredScheduledExecutor);
        MasterTriggerRestoreHook hook = (MasterTriggerRestoreHook)CheckpointCoordinatorMasterHooksTest.mockGeneric(MasterTriggerRestoreHook.class);
        Mockito.when((Object)hook.getIdentifier()).thenReturn((Object)"id");
        Mockito.when((Object)hook.triggerCheckpoint(Mockito.anyLong(), Mockito.anyLong(), (Executor)Mockito.any(Executor.class))).thenAnswer((Answer)new Answer<CompletableFuture<Void>>(){

            public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
                Assert.assertEquals((long)1L, (long)cc.getNumberOfPendingCheckpoints());
                long checkpointId = (Long)invocation.getArguments()[0];
                Assert.assertNotNull(cc.getPendingCheckpoints().get(checkpointId));
                return null;
            }
        });
        cc.addMasterHook(hook);
        CompletableFuture checkpointFuture = cc.triggerCheckpoint(false);
        manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertFalse((boolean)checkpointFuture.isCompletedExceptionally());
    }

    @Test
    public void testSerializationFailsOnTrigger() {
    }

    @Test
    public void testHookCallFailsOnTrigger() {
    }

    @Test
    public void testDeserializationFailsOnRestore() {
    }

    @Test
    public void testHookCallFailsOnRestore() {
    }

    @Test
    public void testTypeIncompatibleWithSerializerOnStore() {
    }

    @Test
    public void testTypeIncompatibleWithHookOnRestore() {
    }

    private CheckpointCoordinator instantiateCheckpointCoordinator(ExecutionGraph executionGraph) {
        return this.instantiateCheckpointCoordinator(executionGraph, (ScheduledExecutor)new ManuallyTriggeredScheduledExecutor());
    }

    private CheckpointCoordinator instantiateCheckpointCoordinator(ExecutionGraph graph, ScheduledExecutor testingScheduledExecutor) {
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(10000000L, 600000L, 0L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, 0, 0L);
        Executor executor = Executors.directExecutor();
        return new CheckpointCoordinator(graph.getJobID(), chkConfig, Collections.emptyList(), (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(10), (CheckpointStorage)new MemoryStateBackend(), executor, new CheckpointsCleaner(), testingScheduledExecutor, new CheckpointFailureManager(0, (CheckpointFailureManager.FailJobCallback)NoOpFailJobCall.INSTANCE), (CheckpointPlanCalculator)new DefaultCheckpointPlanCalculator(graph.getJobID(), (CheckpointPlanCalculatorContext)new ExecutionGraphCheckpointPlanCalculatorContext(graph), graph.getVerticesTopologically(), false), new ExecutionAttemptMappingProvider(graph.getAllExecutionVertices()), new CheckpointStatsTracker(1, (MetricGroup)new WatermarkStrategyTest.DummyMetricGroup()));
    }

    private static <T> T mockGeneric(Class<?> clazz) {
        Class<?> typedClass = clazz;
        return (T)Mockito.mock(typedClass);
    }

    private static final class LongSerializer
    implements SimpleVersionedSerializer<Long> {
        static final int VERSION = 5;

        private LongSerializer() {
        }

        public int getVersion() {
            return 5;
        }

        public byte[] serialize(Long checkpointData) throws IOException {
            byte[] bytes = new byte[8];
            ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).putLong(0, checkpointData);
            return bytes;
        }

        public Long deserialize(int version, byte[] serialized) throws IOException {
            Assert.assertEquals((long)5L, (long)version);
            Assert.assertEquals((long)8L, (long)serialized.length);
            return ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN).getLong(0);
        }
    }
}

