package org.apache.flink.streaming.runtime.tasks;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.testutils.ExceptionallyDoneFuture;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.class */
class AsyncCheckpointRunnableTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest$TestEnvironment.class */
    public static class TestEnvironment extends StreamMockEnvironment {
        CheckpointException cause;

        TestEnvironment() {
            this(new Configuration(), new Configuration(), new ExecutionConfig(), 1L, new MockInputSplitProvider(), 1, new TestTaskStateManager());
        }

        TestEnvironment(Configuration configuration, Configuration configuration2, ExecutionConfig executionConfig, long j, MockInputSplitProvider mockInputSplitProvider, int i, TaskStateManager taskStateManager) {
            super(configuration, configuration2, executionConfig, j, mockInputSplitProvider, i, taskStateManager);
            this.cause = null;
        }

        @Override // org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment
        public void declineCheckpoint(long j, CheckpointException checkpointException) {
            this.cause = checkpointException;
        }

        CheckpointException getCause() {
            return this.cause;
        }
    }

    AsyncCheckpointRunnableTest() {
    }

    @Test
    void testReportIncompleteStats() {
        TestEnvironment testEnvironment = new TestEnvironment();
        new AsyncCheckpointRunnable(new HashMap(), new CheckpointMetaData(1000L, 1L), new CheckpointMetricsBuilder(), 0L, "Task Name", asyncCheckpointRunnable -> {
        }, testEnvironment, (str, th) -> {
        }, false, false, () -> {
            return true;
        }).close();
        Assertions.assertThat(testEnvironment.getTaskStateManager().getReportedCheckpointId()).isEqualTo(1000L);
    }

    @Test
    void testDeclineWithAsyncCheckpointExceptionWhenRunning() {
        testAsyncCheckpointException(true);
    }

    @Test
    void testDeclineWithAsyncCheckpointExceptionWhenNotRunning() {
        testAsyncCheckpointException(false);
    }

    private void testAsyncCheckpointException(boolean z) {
        HashMap hashMap = new HashMap();
        hashMap.put(new OperatorID(), new OperatorSnapshotFutures(ExceptionallyDoneFuture.of(new RuntimeException("Async Checkpoint Exception")), DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty())));
        TestEnvironment testEnvironment = new TestEnvironment();
        createAsyncRunnable(hashMap, testEnvironment, false, z).run();
        if (z) {
            Assertions.assertThat(testEnvironment.getCause().getCheckpointFailureReason()).isSameAs(CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
        } else {
            Assertions.assertThat(testEnvironment.getCause()).isNull();
        }
    }

    @Test
    void testDeclineAsyncCheckpoint() {
        CheckpointFailureReason checkpointFailureReason = CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM;
        HashMap hashMap = new HashMap();
        hashMap.put(new OperatorID(), new OperatorSnapshotFutures(DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()), DoneFuture.of(SnapshotResult.empty()), ExceptionallyDoneFuture.of(new CheckpointException(checkpointFailureReason)), DoneFuture.of(SnapshotResult.empty())));
        TestEnvironment testEnvironment = new TestEnvironment();
        createAsyncRunnable(hashMap, testEnvironment, false, true).run();
        Assertions.assertThat(testEnvironment.getCause().getCheckpointFailureReason()).isSameAs(checkpointFailureReason);
    }

    @Test
    void testReportFinishedOnRestoreTaskSnapshots() {
        TestEnvironment testEnvironment = new TestEnvironment();
        AsyncCheckpointRunnable createAsyncRunnable = createAsyncRunnable(new HashMap(), testEnvironment, true, true);
        createAsyncRunnable.run();
        TestTaskStateManager taskStateManager = testEnvironment.getTaskStateManager();
        Assertions.assertThat(taskStateManager.getReportedCheckpointId()).isEqualTo(createAsyncRunnable.getCheckpointId());
        Assertions.assertThat(taskStateManager.getLastJobManagerTaskStateSnapshot()).isEqualTo(TaskStateSnapshot.FINISHED_ON_RESTORE);
        Assertions.assertThat(taskStateManager.getLastTaskManagerTaskStateSnapshot()).isEqualTo(TaskStateSnapshot.FINISHED_ON_RESTORE);
        Assertions.assertThat(createAsyncRunnable.getFinishedFuture()).isDone();
    }

    private AsyncCheckpointRunnable createAsyncRunnable(Map<OperatorID, OperatorSnapshotFutures> map, TestEnvironment testEnvironment, boolean z, boolean z2) {
        return new AsyncCheckpointRunnable(map, new CheckpointMetaData(1L, 1L), new CheckpointMetricsBuilder().setBytesProcessedDuringAlignment(0L).setAlignmentDurationNanos(0L), 1L, "Task Name", asyncCheckpointRunnable -> {
        }, testEnvironment, (str, th) -> {
        }, z, false, () -> {
            return Boolean.valueOf(z2);
        });
    }
}
