/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
import org.junit.Assert;
import org.junit.Test;

public class RecreateOnResetOperatorCoordinatorTest {
    private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
    private static final int NUM_SUBTASKS = 1;

    @Test
    public void testQuiesceableContextNotQuiesced() throws TaskNotRunningException {
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, 1);
        RecreateOnResetOperatorCoordinator.QuiesceableContext quiesceableContext = new RecreateOnResetOperatorCoordinator.QuiesceableContext((OperatorCoordinator.Context)context);
        TestingEvent event = new TestingEvent();
        quiesceableContext.sendEvent((OperatorEvent)event, 0);
        quiesceableContext.failJob((Throwable)new Exception());
        Assert.assertEquals((Object)OPERATOR_ID, (Object)quiesceableContext.getOperatorId());
        Assert.assertEquals((long)1L, (long)quiesceableContext.currentParallelism());
        Assert.assertEquals(Collections.singletonList(event), context.getEventsToOperatorBySubtaskId(0));
        Assert.assertTrue((boolean)context.isJobFailed());
    }

    @Test
    public void testQuiescedContext() throws TaskNotRunningException {
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, 1);
        RecreateOnResetOperatorCoordinator.QuiesceableContext quiesceableContext = new RecreateOnResetOperatorCoordinator.QuiesceableContext((OperatorCoordinator.Context)context);
        quiesceableContext.quiesce();
        quiesceableContext.sendEvent((OperatorEvent)new TestingEvent(), 0);
        quiesceableContext.failJob((Throwable)new Exception());
        Assert.assertEquals((Object)OPERATOR_ID, (Object)quiesceableContext.getOperatorId());
        Assert.assertEquals((long)1L, (long)quiesceableContext.currentParallelism());
        Assert.assertTrue((boolean)context.getEventsToOperator().isEmpty());
        Assert.assertFalse((boolean)context.isJobFailed());
    }

    @Test
    public void testResetToCheckpoint() throws Exception {
        TestingCoordinatorProvider provider = new TestingCoordinatorProvider(null);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, 1);
        RecreateOnResetOperatorCoordinator coordinator = this.createCoordinator(provider, context);
        RecreateOnResetOperatorCoordinator.QuiesceableContext contextBeforeReset = coordinator.getQuiesceableContext();
        TestingOperatorCoordinator internalCoordinatorBeforeReset = this.getInternalCoordinator(coordinator);
        byte[] stateToRestore = new byte[]{};
        coordinator.resetToCheckpoint(1L, stateToRestore);
        coordinator.waitForAllAsyncCallsFinish();
        Assert.assertTrue((boolean)contextBeforeReset.isQuiesced());
        Assert.assertNull((Object)internalCoordinatorBeforeReset.getLastRestoredCheckpointState());
        TestingOperatorCoordinator internalCoordinatorAfterReset = this.getInternalCoordinator(coordinator);
        Assert.assertEquals((Object)stateToRestore, (Object)internalCoordinatorAfterReset.getLastRestoredCheckpointState());
    }

    @Test
    public void testResetToCheckpointTimeout() throws Exception {
        long closingTimeoutMs = 1L;
        TestingCoordinatorProvider provider = new TestingCoordinatorProvider(new CountDownLatch(1));
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, 1);
        RecreateOnResetOperatorCoordinator coordinator = (RecreateOnResetOperatorCoordinator)provider.create(context, 1L);
        coordinator.resetToCheckpoint(2L, new byte[0]);
        CommonTestUtils.waitUtil(context::isJobFailed, (Duration)Duration.ofSeconds(5L), (String)"The job should fail due to resetToCheckpoint() timeout.");
    }

    @Test
    public void testMethodCallsOnLongResetToCheckpoint() throws Exception {
        long closingTimeoutMs = Long.MAX_VALUE;
        CountDownLatch blockOnCloseLatch = new CountDownLatch(1);
        TestingCoordinatorProvider provider = new TestingCoordinatorProvider(blockOnCloseLatch);
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, 1);
        RecreateOnResetOperatorCoordinator coordinator = (RecreateOnResetOperatorCoordinator)provider.create(context, Long.MAX_VALUE);
        byte[] restoredState = new byte[]{};
        TestingEvent testingEvent = new TestingEvent();
        long completedCheckpointId = 1234L;
        coordinator.resetToCheckpoint(2L, restoredState);
        coordinator.handleEventFromOperator(1, (OperatorEvent)testingEvent);
        coordinator.subtaskFailed(1, (Throwable)new Exception("Subtask Failure Exception."));
        coordinator.notifyCheckpointComplete(1234L);
        Assert.assertEquals((long)1L, (long)provider.getCreatedCoordinators().size());
        blockOnCloseLatch.countDown();
        CompletableFuture checkpointFuture = new CompletableFuture();
        coordinator.checkpointCoordinator(5678L, checkpointFuture);
        coordinator.waitForAllAsyncCallsFinish();
        TestingOperatorCoordinator internalCoordinatorAfterReset = this.getInternalCoordinator(coordinator);
        Assert.assertEquals(checkpointFuture, internalCoordinatorAfterReset.getLastTriggeredCheckpoint());
        Assert.assertEquals(provider.getCreatedCoordinators().get(1), (Object)internalCoordinatorAfterReset);
        Assert.assertEquals((Object)restoredState, (Object)internalCoordinatorAfterReset.getLastRestoredCheckpointState());
        Assert.assertEquals((Object)testingEvent, (Object)internalCoordinatorAfterReset.getNextReceivedOperatorEvent());
        Assert.assertEquals(Collections.singletonList(1), internalCoordinatorAfterReset.getFailedTasks());
        Assert.assertEquals((long)1234L, (long)internalCoordinatorAfterReset.getLastCheckpointComplete());
    }

    @Test(timeout=30000L)
    public void testConsecutiveResetToCheckpoint() throws Exception {
        long closingTimeoutMs = Long.MAX_VALUE;
        int numResets = 1000;
        TestingCoordinatorProvider provider = new TestingCoordinatorProvider();
        MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, 1);
        RecreateOnResetOperatorCoordinator coordinator = (RecreateOnResetOperatorCoordinator)provider.create(context, Long.MAX_VALUE);
        for (int i = 0; i < 1000; ++i) {
            coordinator.handleEventFromOperator(1, (OperatorEvent)new TestingEvent(i));
            coordinator.subtaskFailed(i, (Throwable)new Exception());
            CompletableFuture<byte[]> future = CompletableFuture.completedFuture(new byte[i]);
            coordinator.checkpointCoordinator((long)i, future);
            int loop = i;
            future.thenRun(() -> coordinator.notifyCheckpointComplete((long)loop));
            coordinator.resetToCheckpoint((long)i, new byte[i + 1]);
        }
        coordinator.waitForAllAsyncCallsFinish();
        for (TestingOperatorCoordinator internalCoordinator : provider.getCreatedCoordinators()) {
            int indexOfCoordinator = 0;
            byte[] lastRestoredState = internalCoordinator.getLastRestoredCheckpointState();
            if (lastRestoredState != null) {
                indexOfCoordinator = lastRestoredState.length;
            }
            TestingEvent testingEvent = (TestingEvent)internalCoordinator.getNextReceivedOperatorEvent();
            List<Integer> failedTasks = internalCoordinator.getFailedTasks();
            Assert.assertTrue((testingEvent == null || testingEvent.getId() == indexOfCoordinator ? 1 : 0) != 0);
            Assert.assertTrue((failedTasks.isEmpty() || failedTasks.size() == 1 && failedTasks.get(0) == indexOfCoordinator ? 1 : 0) != 0);
            Assert.assertTrue((!internalCoordinator.hasCompleteCheckpoint() || internalCoordinator.getLastCheckpointComplete() == (long)indexOfCoordinator ? 1 : 0) != 0);
            Assert.assertTrue((!internalCoordinator.hasTriggeredCheckpoint() || internalCoordinator.getLastTriggeredCheckpoint().get().length == indexOfCoordinator ? 1 : 0) != 0);
        }
        coordinator.close();
        TestingOperatorCoordinator internalCoordinator = this.getInternalCoordinator(coordinator);
        CommonTestUtils.waitUtil(internalCoordinator::isClosed, (Duration)Duration.ofSeconds(5L), (String)"Timed out when waiting for the coordinator to close.");
    }

    public void testFailureInCreateCoordinator() {
    }

    private RecreateOnResetOperatorCoordinator createCoordinator(TestingCoordinatorProvider provider, OperatorCoordinator.Context context) throws Exception {
        return (RecreateOnResetOperatorCoordinator)provider.create(context);
    }

    private TestingOperatorCoordinator getInternalCoordinator(RecreateOnResetOperatorCoordinator coordinator) throws Exception {
        return (TestingOperatorCoordinator)coordinator.getInternalCoordinator();
    }

    private static class TestingEvent
    implements OperatorEvent {
        private static final long serialVersionUID = -3289352911927668275L;
        private final int id;

        private TestingEvent() {
            this(-1);
        }

        private TestingEvent(int id) {
            this.id = id;
        }

        private int getId() {
            return this.id;
        }
    }

    private static class TestingCoordinatorProvider
    extends RecreateOnResetOperatorCoordinator.Provider {
        private static final long serialVersionUID = 4184184580789587013L;
        private final CountDownLatch blockOnCloseLatch;
        private final List<TestingOperatorCoordinator> createdCoordinators;

        public TestingCoordinatorProvider() {
            this(null);
        }

        public TestingCoordinatorProvider(CountDownLatch blockOnCloseLatch) {
            super(OPERATOR_ID);
            this.blockOnCloseLatch = blockOnCloseLatch;
            this.createdCoordinators = new ArrayList<TestingOperatorCoordinator>();
        }

        protected OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
            TestingOperatorCoordinator testingCoordinator = new TestingOperatorCoordinator(context, this.blockOnCloseLatch);
            this.createdCoordinators.add(testingCoordinator);
            return testingCoordinator;
        }

        private List<TestingOperatorCoordinator> getCreatedCoordinators() {
            return this.createdCoordinators;
        }
    }
}

