/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.TestFinishedOnRestoreStreamOperator;
import org.apache.flink.streaming.util.CompletingCheckpointResponder;
import org.apache.flink.util.FlinkRuntimeException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class StreamTaskFinalCheckpointsTest {
    private static final long CONCURRENT_EVENT_WAIT_PERIOD_MS = 500L;

    StreamTaskFinalCheckpointsTest() {
    }

    @Test
    void testCheckpointDoneOnFinishedOperator() throws Exception {
        FinishingOperator finishingOperator = new FinishingOperator();
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO);
        StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain((StreamOperator<?>)finishingOperator).build();
        harness.setAutoProcess(false);
        harness.processElement(new StreamRecord((Object)1));
        harness.streamTask.operatorChain.finishOperators(harness.streamTask.getActionExecutor(), StopMode.DRAIN);
        Assertions.assertThat((boolean)FinishingOperator.finished).isTrue();
        harness.getTaskStateManager().getWaitForReportLatch().reset();
        harness.streamTask.triggerCheckpointOnBarrier(new CheckpointMetaData(2L, 0L), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder().setBytesProcessedDuringAlignment(0L).setAlignmentDurationNanos(0L));
        harness.getTaskStateManager().getWaitForReportLatch().await();
        Assertions.assertThat((long)harness.getTaskStateManager().getReportedCheckpointId()).isEqualTo(2L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testNotWaitingForAllRecordsProcessedIfCheckpointNotEnabled() throws Exception {
        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[2];
        try {
            for (int i = 0; i < partitionWriters.length; ++i) {
                partitionWriters[i] = PartitionTestUtils.createPartition((ResultPartitionType)ResultPartitionType.PIPELINED_BOUNDED);
                partitionWriters[i].setup();
            }
            try (StreamTaskMailboxTestHarness testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addJobConfig(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(1L)).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addAdditionalOutput(partitionWriters).setupOperatorChain((StreamOperator<?>)new EmptyOperator()).finishForSingletonOperatorChain((TypeSerializer)StringSerializer.INSTANCE, (StreamPartitioner)new BroadcastPartitioner())).build();){
                testHarness.endInput();
                for (ResultPartitionWriter writer : partitionWriters) {
                    Assertions.assertThat((int)((PipelinedResultPartition)writer).getNumberOfQueuedBuffers()).isZero();
                }
            }
        }
        finally {
            for (ResultPartitionWriter writer : partitionWriters) {
                if (writer == null) continue;
                writer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testWaitingForFinalCheckpoint() throws Exception {
        ResultPartition[] partitionWriters = new ResultPartition[2];
        try {
            for (int i = 0; i < partitionWriters.length; ++i) {
                partitionWriters[i] = PartitionTestUtils.createPartition((ResultPartitionType)ResultPartitionType.PIPELINED_BOUNDED);
                partitionWriters[i].setup();
            }
            int lastCheckpointId = 6;
            CompletingCheckpointResponder checkpointResponder = new CompletingCheckpointResponder();
            try (StreamTaskMailboxTestHarness<String> testHarness = this.createTestHarness(partitionWriters, checkpointResponder, false);){
                CompletableFuture<Boolean> checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 2L);
                StreamTaskFinalCheckpointsTest.processMailTillCheckpointSucceeds(testHarness, checkpointFuture);
                Assertions.assertThat((long)testHarness.getTaskStateManager().getReportedCheckpointId()).isEqualTo(2L);
                testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 0, 0);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 0, 0);
                checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 4L);
                StreamTaskFinalCheckpointsTest.processMailTillCheckpointSucceeds(testHarness, checkpointFuture);
                Assertions.assertThat((long)testHarness.getTaskStateManager().getReportedCheckpointId()).isEqualTo(4L);
                testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 0, 1);
                testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 0, 2);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 0, 1);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 0, 2);
                checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, lastCheckpointId);
                checkpointFuture.thenAccept(ignored -> {
                    for (ResultPartition resultPartition : partitionWriters) {
                        resultPartition.onSubpartitionAllDataProcessed(0);
                    }
                });
                testHarness.finishProcessing();
                Assertions.assertThat(checkpointFuture).isDone();
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assertions.assertThat((long)testHarness.getTaskStateManager().getReportedCheckpointId()).isEqualTo(6L);
                Assertions.assertThat((long)testHarness.getTaskStateManager().getNotifiedCompletedCheckpointId()).isEqualTo(6L);
                for (ResultPartition resultPartition : partitionWriters) {
                    Assertions.assertThat((int)resultPartition.getNumberOfQueuedBuffers()).isEqualTo(4);
                }
            }
        }
        finally {
            for (ResultPartition writer : partitionWriters) {
                if (writer == null) continue;
                writer.close();
            }
        }
    }

    private StreamTaskMailboxTestHarness<String> createTestHarness(CompletingCheckpointResponder checkpointResponder) throws Exception {
        return this.createTestHarness(null, checkpointResponder, false);
    }

    private StreamTaskMailboxTestHarness<String> createTestHarness(@Nullable ResultPartition[] partitionWriters, CompletingCheckpointResponder checkpointResponder, boolean enableUnalignedCheckpoint) throws Exception {
        StreamTaskMailboxTestHarnessBuilder testHarnessBuilder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        if (partitionWriters != null) {
            testHarnessBuilder = testHarnessBuilder.addAdditionalOutput((ResultPartitionWriter[])partitionWriters);
        }
        StreamTaskMailboxTestHarness<String> testHarness = ((StreamTaskMailboxTestHarnessBuilder)testHarnessBuilder.addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 3).addJobConfig(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(1L)).addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED, enableUnalignedCheckpoint).setCheckpointResponder(checkpointResponder).setupOperatorChain((StreamOperator<?>)new EmptyOperator()).finishForSingletonOperatorChain((TypeSerializer)StringSerializer.INSTANCE)).build();
        checkpointResponder.setHandlers(arg_0 -> testHarness.streamTask.notifyCheckpointCompleteAsync(arg_0), (arg_0, arg_1) -> testHarness.streamTask.notifyCheckpointAbortAsync(arg_0, arg_1));
        return testHarness;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testWaitingForFinalCheckpointNotTheFirsNotifiedComplete() throws Exception {
        ResultPartition[] partitionWriters = new ResultPartition[2];
        try {
            for (int i = 0; i < partitionWriters.length; ++i) {
                partitionWriters[i] = PartitionTestUtils.createPartition((ResultPartitionType)ResultPartitionType.PIPELINED_BOUNDED);
                partitionWriters[i].setup();
            }
            CompletingCheckpointResponder checkpointResponder = new CompletingCheckpointResponder();
            try (StreamTaskMailboxTestHarness<String> testHarness = this.createTestHarness(partitionWriters, checkpointResponder, false);){
                checkpointResponder.completeCheckpoints(Collections.singletonList(3L));
                testHarness.waitForTaskCompletion();
                CompletableFuture<Boolean> firstCheckpoint = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 1L);
                firstCheckpoint.thenAccept(ignored -> {
                    for (ResultPartition resultPartition : partitionWriters) {
                        resultPartition.onSubpartitionAllDataProcessed(0);
                    }
                });
                testHarness.processAll();
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 2L);
                testHarness.processAll();
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 3L);
                testHarness.processAll();
                testHarness.finishProcessing();
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assertions.assertThat((long)testHarness.getTaskStateManager().getReportedCheckpointId()).isEqualTo(3L);
                Assertions.assertThat((long)testHarness.getTaskStateManager().getNotifiedCompletedCheckpointId()).isEqualTo(3L);
                for (ResultPartition resultPartition : partitionWriters) {
                    Assertions.assertThat((int)resultPartition.getNumberOfQueuedBuffers()).isEqualTo(4);
                }
            }
        }
        finally {
            for (ResultPartition writer : partitionWriters) {
                if (writer == null) continue;
                writer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testTriggerStopWithSavepointWhenWaitingForFinalCheckpoint() throws Exception {
        ResultPartition[] partitionWriters = new ResultPartition[2];
        try {
            for (int i = 0; i < partitionWriters.length; ++i) {
                partitionWriters[i] = PartitionTestUtils.createPartition((ResultPartitionType)ResultPartitionType.PIPELINED_BOUNDED);
                partitionWriters[i].setup();
            }
            final int finalCheckpointId = 6;
            final int syncSavepointId = 7;
            CompletingCheckpointResponder checkpointResponder = new CompletingCheckpointResponder(){

                @Override
                public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
                    if ((long)syncSavepointId == checkpointId) {
                        super.acknowledgeCheckpoint(jobID, executionAttemptID, finalCheckpointId, checkpointMetrics, subtaskState);
                        try {
                            Thread.sleep(500L);
                        }
                        catch (InterruptedException e) {
                            throw new FlinkRuntimeException((Throwable)e);
                        }
                        super.acknowledgeCheckpoint(jobID, executionAttemptID, syncSavepointId, checkpointMetrics, subtaskState);
                    }
                }
            };
            try (StreamTaskMailboxTestHarness<String> testHarness = this.createTestHarness(partitionWriters, checkpointResponder, false);){
                testHarness.waitForTaskCompletion();
                CompletableFuture<Boolean> checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, finalCheckpointId);
                checkpointFuture.thenAccept(ignored -> {
                    for (ResultPartition resultPartition : partitionWriters) {
                        resultPartition.onSubpartitionAllDataProcessed(0);
                    }
                });
                CompletableFuture<Boolean> savepointFuture = StreamTaskFinalCheckpointsTest.triggerStopWithSavepointDrain(testHarness, syncSavepointId);
                testHarness.finishProcessing();
                Assertions.assertThat(checkpointFuture).isDone();
                Assertions.assertThat(savepointFuture).isDone();
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assertions.assertThat((long)testHarness.getTaskStateManager().getReportedCheckpointId()).isEqualTo((long)syncSavepointId);
                Assertions.assertThat((long)testHarness.getTaskStateManager().getNotifiedCompletedCheckpointId()).isEqualTo((long)syncSavepointId);
                for (ResultPartition resultPartition : partitionWriters) {
                    Assertions.assertThat((int)resultPartition.getNumberOfQueuedBuffers()).isEqualTo(3);
                }
            }
        }
        finally {
            for (ResultPartition writer : partitionWriters) {
                if (writer == null) continue;
                writer.close();
            }
        }
    }

    @Test
    void testTriggerStopWithSavepointWhenWaitingForFinalCheckpointOnSourceTask() throws Exception {
        this.doTestTriggerStopWithSavepointWhenWaitingForFinalCheckpointOnSourceTask(true);
    }

    @Test
    void testTriggerStopWithSavepointNoDrainWhenWaitingForFinalCheckpointOnSourceTask() throws Exception {
        this.doTestTriggerStopWithSavepointWhenWaitingForFinalCheckpointOnSourceTask(false);
    }

    private void doTestTriggerStopWithSavepointWhenWaitingForFinalCheckpointOnSourceTask(boolean drain) throws Exception {
        final int finalCheckpointId = 6;
        final int syncSavepointId = 7;
        CompletingCheckpointResponder checkpointResponder = new CompletingCheckpointResponder(){

            @Override
            public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
                if ((long)syncSavepointId == checkpointId) {
                    super.acknowledgeCheckpoint(jobID, executionAttemptID, finalCheckpointId, checkpointMetrics, subtaskState);
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException e) {
                        throw new FlinkRuntimeException((Throwable)e);
                    }
                    super.acknowledgeCheckpoint(jobID, executionAttemptID, syncSavepointId, checkpointMetrics, subtaskState);
                }
            }
        };
        try (StreamTaskMailboxTestHarness<String> testHarness = new StreamTaskMailboxTestHarnessBuilder(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addJobConfig(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(1L)).setCheckpointResponder(checkpointResponder).setupOutputForSingletonOperatorChain((StreamOperator<?>)new StreamSource((SourceFunction)new ImmediatelyFinishingSource())).build();){
            checkpointResponder.setHandlers(arg_0 -> testHarness.streamTask.notifyCheckpointCompleteAsync(arg_0), (arg_0, arg_1) -> testHarness.streamTask.notifyCheckpointAbortAsync(arg_0, arg_1));
            testHarness.streamTask.runMailboxLoop();
            CompletableFuture<Boolean> checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, finalCheckpointId);
            CompletableFuture<Boolean> savepointFuture = drain ? StreamTaskFinalCheckpointsTest.triggerStopWithSavepointDrain(testHarness, syncSavepointId) : StreamTaskFinalCheckpointsTest.triggerStopWithSavepointNoDrain(testHarness, syncSavepointId);
            testHarness.finishProcessing();
            Assertions.assertThat(checkpointFuture).isDone();
            Assertions.assertThat(savepointFuture).isDone();
            testHarness.getTaskStateManager().getWaitForReportLatch().await();
            Assertions.assertThat((long)testHarness.getTaskStateManager().getReportedCheckpointId()).isEqualTo((long)syncSavepointId);
            Assertions.assertThat((long)testHarness.getTaskStateManager().getNotifiedCompletedCheckpointId()).isEqualTo((long)syncSavepointId);
        }
    }

    @Test
    void testTriggeringAlignedNoTimeoutCheckpointWithFinishedChannels() throws Exception {
        this.testTriggeringCheckpointWithFinishedChannels(CheckpointOptions.alignedNoTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
    }

    @Test
    void testTriggeringUnalignedCheckpointWithFinishedChannels() throws Exception {
        this.testTriggeringCheckpointWithFinishedChannels(CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
    }

    @Test
    void testTriggeringAlignedWithTimeoutCheckpointWithFinishedChannels() throws Exception {
        this.testTriggeringCheckpointWithFinishedChannels(CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)10L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testTriggeringCheckpointWithFinishedChannels(CheckpointOptions checkpointOptions) throws Exception {
        ResultPartition[] partitionWriters = new ResultPartition[2];
        try {
            for (int i = 0; i < partitionWriters.length; ++i) {
                partitionWriters[i] = PartitionTestUtils.createPartition((ResultPartitionType)ResultPartitionType.PIPELINED_BOUNDED);
                partitionWriters[i].setup();
            }
            try (StreamTaskMailboxTestHarness<String> testHarness = this.createTestHarness(partitionWriters, new CompletingCheckpointResponder(), checkpointOptions.isUnalignedCheckpoint() || checkpointOptions.isTimeoutable());){
                int numChannels = testHarness.inputGates[0].getInputGate().getNumberOfInputChannels();
                int[] resumedCount = new int[numChannels];
                for (int i = 0; i < numChannels; ++i) {
                    TestInputChannel inputChannel = (TestInputChannel)testHarness.inputGates[0].getInputGate().getChannel(i);
                    inputChannel.setActionOnResumed(() -> {
                        int n = inputChannel.getChannelIndex();
                        resumedCount[n] = resumedCount[n] + 1;
                    });
                }
                CompletableFuture<Boolean> checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 2L, checkpointOptions);
                StreamTaskFinalCheckpointsTest.processMailTillCheckpointSucceeds(testHarness, checkpointFuture);
                Assertions.assertThat((long)testHarness.getTaskStateManager().getReportedCheckpointId()).isEqualTo(2L);
                Assertions.assertThat((int[])resumedCount).containsExactly(new int[]{0, 0, 0});
                testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 0, 0);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 0, 0);
                checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 4L, checkpointOptions);
                StreamTaskFinalCheckpointsTest.processMailTillCheckpointSucceeds(testHarness, checkpointFuture);
                Assertions.assertThat((long)testHarness.getTaskStateManager().getReportedCheckpointId()).isEqualTo(4L);
                Assertions.assertThat((int[])resumedCount).containsExactly(new int[]{0, 0, 0});
                testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 0, 1);
                testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 0, 2);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 0, 1);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 0, 2);
                checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 6L, checkpointOptions);
                checkpointFuture.thenAccept(ignored -> {
                    for (ResultPartition resultPartition : partitionWriters) {
                        resultPartition.onSubpartitionAllDataProcessed(0);
                    }
                });
                testHarness.finishProcessing();
                Assertions.assertThat(checkpointFuture).isDone();
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assertions.assertThat((long)testHarness.getTaskStateManager().getReportedCheckpointId()).isEqualTo(6L);
                Assertions.assertThat((int[])resumedCount).containsExactly(new int[]{0, 0, 0});
                for (ResultPartition resultPartition : partitionWriters) {
                    Assertions.assertThat((int)resultPartition.getNumberOfQueuedBuffers()).isEqualTo(4);
                }
            }
        }
        finally {
            for (ResultPartition writer : partitionWriters) {
                if (writer == null) continue;
                writer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testReportOperatorsFinishedInCheckpoint() throws Exception {
        ResultPartition[] partitionWriters = new ResultPartition[2];
        try {
            for (int i = 0; i < partitionWriters.length; ++i) {
                partitionWriters[i] = PartitionTestUtils.createPartition((ResultPartitionType)ResultPartitionType.PIPELINED_BOUNDED);
                partitionWriters[i].setup();
            }
            CompletingCheckpointResponder checkpointResponder = new CompletingCheckpointResponder();
            try (StreamTaskMailboxTestHarness<String> testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 1).addAdditionalOutput((ResultPartitionWriter[])partitionWriters).setCheckpointResponder(checkpointResponder).addJobConfig(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(1L)).setupOperatorChain((StreamOperator<?>)new StatefulOperator()).finishForSingletonOperatorChain((TypeSerializer)StringSerializer.INSTANCE)).build();){
                checkpointResponder.setHandlers(arg_0 -> testHarness.streamTask.notifyCheckpointCompleteAsync(arg_0), (arg_0, arg_1) -> testHarness.streamTask.notifyCheckpointAbortAsync(arg_0, arg_1));
                CompletableFuture<Boolean> checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 2L);
                StreamTaskFinalCheckpointsTest.processMailTillCheckpointSucceeds(testHarness, checkpointFuture);
                Assertions.assertThat((long)testHarness.getTaskStateManager().getReportedCheckpointId()).isEqualTo(2L);
                Assertions.assertThat((boolean)((TaskStateSnapshot)testHarness.getTaskStateManager().getJobManagerTaskStateSnapshotsByCheckpointId().get(2L)).isTaskFinished()).isFalse();
                testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 0, 0);
                checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 4L);
                checkpointFuture.thenAccept(ignored -> {
                    for (ResultPartition resultPartition : partitionWriters) {
                        resultPartition.onSubpartitionAllDataProcessed(0);
                    }
                });
                testHarness.processAll();
                testHarness.finishProcessing();
                Assertions.assertThat(checkpointFuture).isDone();
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assertions.assertThat((boolean)((TaskStateSnapshot)testHarness.getTaskStateManager().getJobManagerTaskStateSnapshotsByCheckpointId().get(4L)).isTaskFinished()).isTrue();
            }
        }
        finally {
            for (ResultPartition writer : partitionWriters) {
                if (writer == null) continue;
                writer.close();
            }
        }
    }

    static CompletableFuture<Boolean> triggerCheckpoint(StreamTaskMailboxTestHarness<String> testHarness, long checkpointId) {
        return StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation());
    }

    static CompletableFuture<Boolean> triggerCheckpoint(StreamTaskMailboxTestHarness<String> testHarness, long checkpointId, CheckpointOptions checkpointOptions) {
        testHarness.getTaskStateManager().getWaitForReportLatch().reset();
        return testHarness.getStreamTask().triggerCheckpointAsync(new CheckpointMetaData(checkpointId, checkpointId * 1000L), checkpointOptions);
    }

    static CompletableFuture<Boolean> triggerStopWithSavepointDrain(StreamTaskMailboxTestHarness<String> testHarness, long checkpointId) {
        return StreamTaskFinalCheckpointsTest.triggerStopWithSavepoint(testHarness, checkpointId, SavepointType.terminate((SavepointFormatType)SavepointFormatType.CANONICAL));
    }

    static CompletableFuture<Boolean> triggerStopWithSavepointNoDrain(StreamTaskMailboxTestHarness<String> testHarness, long checkpointId) {
        return StreamTaskFinalCheckpointsTest.triggerStopWithSavepoint(testHarness, checkpointId, SavepointType.suspend((SavepointFormatType)SavepointFormatType.CANONICAL));
    }

    static CompletableFuture<Boolean> triggerStopWithSavepoint(StreamTaskMailboxTestHarness<String> testHarness, long checkpointId, SavepointType checkpointType) {
        testHarness.getTaskStateManager().getWaitForReportLatch().reset();
        return testHarness.getStreamTask().triggerCheckpointAsync(new CheckpointMetaData(checkpointId, checkpointId * 1000L), CheckpointOptions.alignedNoTimeout((SnapshotType)checkpointType, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
    }

    static void processMailTillCheckpointSucceeds(StreamTaskMailboxTestHarness<String> testHarness, Future<Boolean> checkpointFuture) throws Exception {
        while (!checkpointFuture.isDone()) {
            testHarness.processSingleStep();
        }
        testHarness.getTaskStateManager().getWaitForReportLatch().await();
    }

    @Test
    void testWaitingForPendingCheckpointsOnFinished() throws Exception {
        final long delayedCheckpointId = 2L;
        CompletingCheckpointResponder responder = new CompletingCheckpointResponder(){

            @Override
            public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
                if (delayedCheckpointId == checkpointId) {
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException e) {
                        throw new FlinkRuntimeException((Throwable)e);
                    }
                } else {
                    super.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, subtaskState);
                }
            }
        };
        try (StreamTaskMailboxTestHarness<String> harness = this.createTestHarness(responder);){
            harness.waitForTaskCompletion();
            harness.streamTask.triggerCheckpointOnBarrier(new CheckpointMetaData(1L, 101L), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder().setBytesProcessedDuringAlignment(0L).setAlignmentDurationNanos(0L));
            harness.streamTask.triggerCheckpointOnBarrier(new CheckpointMetaData(delayedCheckpointId, 101L), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder().setBytesProcessedDuringAlignment(0L).setAlignmentDurationNanos(0L));
            harness.processAll();
            harness.finishProcessing();
            Assertions.assertThat((long)harness.getTaskStateManager().getReportedCheckpointId()).isEqualTo(delayedCheckpointId);
        }
    }

    @Test
    void testOperatorSkipLifeCycleIfFinishedOnRestore() throws Exception {
        try (StreamTaskMailboxTestHarness harness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 3).setCollectNetworkEvents().setTaskStateSnapshot(1L, TaskStateSnapshot.FINISHED_ON_RESTORE).setupOperatorChain((StreamOperator<?>)new TestFinishedOnRestoreStreamOperator()).chain((OneInputStreamOperator)new TestFinishedOnRestoreStreamOperator(), (TypeSerializer)StringSerializer.INSTANCE).finish()).build();){
            harness.processAll();
            harness.getTaskStateManager().getWaitForReportLatch().reset();
            CheckpointMetaData checkpointMetaData = new CheckpointMetaData(2L, 2L);
            CheckpointOptions checkpointOptions = new CheckpointOptions((SnapshotType)CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault());
            harness.streamTask.triggerCheckpointOnBarrier(checkpointMetaData, checkpointOptions, new CheckpointMetricsBuilder().setBytesProcessedDuringAlignment(0L).setAlignmentDurationNanos(0L));
            harness.getTaskStateManager().getWaitForReportLatch().await();
            Assertions.assertThat((long)harness.getTaskStateManager().getReportedCheckpointId()).isEqualTo(2L);
            harness.streamTask.notifyCheckpointCompleteAsync(2L);
            harness.streamTask.notifyCheckpointAbortAsync(3L, 2L);
            harness.processAll();
            harness.processElement(Watermark.MAX_WATERMARK, 0, 0);
            harness.processElement(Watermark.MAX_WATERMARK, 0, 1);
            harness.processElement(Watermark.MAX_WATERMARK, 0, 2);
            harness.waitForTaskCompletion();
            harness.finishProcessing();
            Assertions.assertThat(harness.getOutput()).containsExactly(new Object[]{new CheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions), Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)});
        }
    }

    @Test
    void testWaitingForUnalignedChannelStatesIfFinishedOnRestore() throws Exception {
        OperatorID operatorId = new OperatorID();
        try (StreamTaskMailboxTestHarness harness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addJobConfig(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(1L)).addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED, true).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 3).setCollectNetworkEvents().setTaskStateSnapshot(1L, TaskStateSnapshot.FINISHED_ON_RESTORE).setupOperatorChain((StreamOperator<?>)new TestFinishedOnRestoreStreamOperator()).chain(operatorId, (OneInputStreamOperator)new TestFinishedOnRestoreStreamOperator(operatorId), (TypeSerializer)StringSerializer.INSTANCE).finish()).build();){
            harness.processAll();
            TestCheckpointResponder checkpointResponder = harness.getCheckpointResponder();
            checkpointResponder.setAcknowledgeLatch(new OneShotLatch());
            checkpointResponder.setDeclinedLatch(new OneShotLatch());
            CheckpointBarrier unalignedBarrier = new CheckpointBarrier(2L, 2L, CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
            harness.processEvent((AbstractEvent)unalignedBarrier, 0, 0);
            Thread.sleep(500L);
            harness.processEvent((AbstractEvent)unalignedBarrier, 0, 1);
            harness.processEvent((AbstractEvent)unalignedBarrier, 0, 2);
            CommonTestUtils.waitUntilCondition(() -> checkpointResponder.getAcknowledgeLatch().isTriggered() || checkpointResponder.getDeclinedLatch().isTriggered());
            Assertions.assertThat(checkpointResponder.getAcknowledgeReports().stream().map(TestCheckpointResponder.AbstractReport::getCheckpointId).collect(Collectors.toList())).containsExactly((Object[])new Long[]{2L});
            Assertions.assertThat(checkpointResponder.getDeclineReports().stream().map(TestCheckpointResponder.AbstractReport::getCheckpointId).collect(Collectors.toList())).isEmpty();
        }
    }

    private static class StatefulOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private ListState<Integer> state;

        private StatefulOperator() {
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.state = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor("test", Integer.class));
        }

        public void processElement(StreamRecord<String> element) throws Exception {
        }
    }

    private static class FinishingOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        static boolean finished = false;

        private FinishingOperator() {
        }

        public void processElement(StreamRecord<String> element) throws Exception {
        }

        public void finish() throws Exception {
            finished = true;
        }
    }

    private static class EmptyOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private EmptyOperator() {
        }

        public void processElement(StreamRecord<String> element) throws Exception {
        }
    }

    private static class ImmediatelyFinishingSource
    implements SourceFunction<String> {
        private ImmediatelyFinishingSource() {
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
        }

        public void cancel() {
        }
    }
}

