package org.apache.flink.runtime.checkpoint;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.class */
public class CheckpointCoordinatorFailureTest extends TestLogger {

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest$FailingCompletedCheckpointStore.class */
    public static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
        private final Exception addCheckpointFailure;

        public FailingCompletedCheckpointStore(Exception exc) {
            this.addCheckpointFailure = exc;
        }

        public void addCheckpoint(CompletedCheckpoint completedCheckpoint, CheckpointsCleaner checkpointsCleaner, Runnable runnable) throws Exception {
            throw this.addCheckpointFailure;
        }

        public CompletedCheckpoint getLatestCheckpoint() throws Exception {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) throws Exception {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public int getNumberOfRetainedCheckpoints() {
            return -1;
        }

        public int getMaxNumberOfRetainedCheckpoints() {
            return 1;
        }

        public boolean requiresExternalizedCheckpoints() {
            return false;
        }
    }

    @Test
    public void testFailingCompletedCheckpointStoreAdd() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build();
        ExecutionVertex executionVertex = build.getJobVertex(jobVertexID).getTaskVertices()[0];
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(build).setCompletedCheckpointStore(new FailingCompletedCheckpointStore(new Exception("The failing completed checkpoint store failed again... :-("))).setTimer(manuallyTriggeredScheduledExecutor).build();
        build2.triggerCheckpoint(false);
        manuallyTriggeredScheduledExecutor.triggerAll();
        Assert.assertEquals(1L, build2.getNumberOfPendingCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) build2.getPendingCheckpoints().values().iterator().next();
        Assert.assertFalse(pendingCheckpoint.isDisposed());
        long longValue = ((Long) build2.getPendingCheckpoints().keySet().iterator().next()).longValue();
        KeyedStateHandle keyedStateHandle = (KeyedStateHandle) Mockito.mock(KeyedStateHandle.class);
        OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.spy(OperatorSubtaskState.builder().setManagedOperatorState((OperatorStateHandle) Mockito.mock(OperatorStreamStateHandle.class)).setRawOperatorState((OperatorStateHandle) Mockito.mock(OperatorStreamStateHandle.class)).setManagedKeyedState(keyedStateHandle).setRawKeyedState((KeyedStateHandle) Mockito.mock(KeyedStateHandle.class)).setInputChannelState(StateObjectCollection.singleton(new InputChannelStateHandle(new InputChannelInfo(0, 1), (StreamStateHandle) Mockito.mock(StreamStateHandle.class), Collections.singletonList(1L)))).setResultSubpartitionState(StateObjectCollection.singleton(new ResultSubpartitionStateHandle(new ResultSubpartitionInfo(0, 1), (StreamStateHandle) Mockito.mock(StreamStateHandle.class), Collections.singletonList(1L)))).build());
        TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
        taskStateSnapshot.putSubtaskStateByOperatorID(new OperatorID(), operatorSubtaskState);
        Mockito.when(taskStateSnapshot.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID(executionVertex.getJobvertexId()))).thenReturn(operatorSubtaskState);
        try {
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), executionVertex.getCurrentExecutionAttempt().getAttemptId(), longValue, new CheckpointMetrics(), taskStateSnapshot), "Unknown location");
            Assert.fail("Expected a checkpoint exception because the completed checkpoint store could not store the completed checkpoint.");
        } catch (CheckpointException e) {
        }
        Assert.assertTrue(pendingCheckpoint.isDisposed());
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState)).discardState();
        ((OperatorStateHandle) Mockito.verify(operatorSubtaskState.getManagedOperatorState().iterator().next())).discardState();
        ((OperatorStateHandle) Mockito.verify(operatorSubtaskState.getRawOperatorState().iterator().next())).discardState();
        ((KeyedStateHandle) Mockito.verify(operatorSubtaskState.getManagedKeyedState().iterator().next())).discardState();
        ((KeyedStateHandle) Mockito.verify(operatorSubtaskState.getRawKeyedState().iterator().next())).discardState();
        ((StreamStateHandle) Mockito.verify(((InputChannelStateHandle) operatorSubtaskState.getInputChannelState().iterator().next()).getDelegate())).discardState();
        ((StreamStateHandle) Mockito.verify(((ResultSubpartitionStateHandle) operatorSubtaskState.getResultSubpartitionState().iterator().next()).getDelegate())).discardState();
    }

    @Test
    public void testCleanupForGenericFailure() throws Exception {
        testStoringFailureHandling(new FlinkRuntimeException("Expected exception"), 1);
    }

    @Test
    public void testCleanupOmissionForPossibleInconsistentStateException() throws Exception {
        testStoringFailureHandling(new PossibleInconsistentStateException(), 0);
    }

    private void testStoringFailureHandling(Exception exc, int i) throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph build = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build();
        ExecutionAttemptID attemptId = build.getJobVertex(jobVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        CheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter();
        ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        FailingCompletedCheckpointStore failingCompletedCheckpointStore = new FailingCompletedCheckpointStore(exc);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        CheckpointCoordinator build2 = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setExecutionGraph(build).setCheckpointIDCounter(standaloneCheckpointIDCounter).setCheckpointsCleaner(new CheckpointsCleaner() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorFailureTest.1
            private static final long serialVersionUID = 2029876992397573325L;

            public void cleanCheckpointOnFailedStoring(CompletedCheckpoint completedCheckpoint, Executor executor) {
                atomicInteger.incrementAndGet();
                super.cleanCheckpointOnFailedStoring(completedCheckpoint, executor);
            }
        }).setCompletedCheckpointStore(failingCompletedCheckpointStore).setTimer(manuallyTriggeredScheduledExecutor).build();
        build2.triggerSavepoint(this.tmpFolder.newFolder().getAbsolutePath());
        manuallyTriggeredScheduledExecutor.triggerAll();
        try {
            build2.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(build.getJobID(), attemptId, standaloneCheckpointIDCounter.getLast()), "unknown location");
            Assert.fail("CheckpointException should have been thrown.");
        } catch (CheckpointException e) {
            Assert.assertThat(e.getCheckpointFailureReason(), CoreMatchers.is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE));
        }
        Assert.assertThat(Integer.valueOf(atomicInteger.get()), CoreMatchers.is(Integer.valueOf(i)));
    }
}
