package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.LifeCycleMonitor;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTaskTest;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.CompletingCheckpointResponder;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.class */
class MultipleInputStreamTaskChainedSourcesCheckpointingTest {
    private static final int MAX_STEPS = 100;

    @Parameter
    private boolean objectReuse;
    private final CheckpointMetaData metaData = new CheckpointMetaData(1, System.currentTimeMillis());

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest$LifeCycleMonitorMultipleInputOperator.class */
    static class LifeCycleMonitorMultipleInputOperator extends TestFinishedOnRestoreStreamOperator implements MultipleInputStreamOperator<String> {

        /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest$LifeCycleMonitorMultipleInputOperator$TestFinishedOnRestoreInput.class */
        private static class TestFinishedOnRestoreInput implements Input {
            private TestFinishedOnRestoreInput() {
            }

            public void processElement(StreamRecord streamRecord) throws Exception {
                throw new IllegalStateException("This should never be called");
            }

            public void processWatermark(Watermark watermark) throws Exception {
                throw new IllegalStateException("This should never be called");
            }

            public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
                throw new IllegalStateException("This should never be called");
            }

            public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
                throw new IllegalStateException("This should never be called");
            }

            public void setKeyContextElement(StreamRecord streamRecord) throws Exception {
                throw new IllegalStateException("This should never be called");
            }
        }

        public List<Input> getInputs() {
            ArrayList arrayList = new ArrayList();
            arrayList.add(new TestFinishedOnRestoreInput());
            arrayList.add(new TestFinishedOnRestoreInput());
            arrayList.add(new TestFinishedOnRestoreInput());
            return arrayList;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest$LifeCycleMonitorMultipleInputOperatorFactory.class */
    static class LifeCycleMonitorMultipleInputOperatorFactory extends AbstractStreamOperatorFactory<String> {
        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            return new LifeCycleMonitorMultipleInputOperator();
        }

        public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
            return LifeCycleMonitorMultipleInputOperator.class;
        }
    }

    MultipleInputStreamTaskChainedSourcesCheckpointingTest() {
    }

    @Parameters(name = "objectReuse = {0}")
    private static Collection<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    @TestTemplate
    void testSourceCheckpointFirst() throws Exception {
        StreamTaskMailboxTestHarness<String> buildTestHarness = MultipleInputStreamTaskTest.buildTestHarness(this.objectReuse);
        try {
            buildTestHarness.setAutoProcess(false);
            ArrayDeque arrayDeque = new ArrayDeque();
            CheckpointBarrier createBarrier = createBarrier(buildTestHarness);
            addRecordsAndBarriers(buildTestHarness, createBarrier);
            CompletableFuture triggerCheckpointAsync = buildTestHarness.getStreamTask().triggerCheckpointAsync(this.metaData, createBarrier.getCheckpointOptions());
            Objects.requireNonNull(triggerCheckpointAsync);
            processSingleStepUntil(buildTestHarness, triggerCheckpointAsync::isDone);
            arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
            ArrayList arrayList = new ArrayList(buildTestHarness.getOutput());
            Assertions.assertThat(arrayList.subList(0, arrayDeque.size())).containsExactlyInAnyOrderElementsOf(arrayDeque);
            Assertions.assertThat(arrayList.get(arrayDeque.size())).isEqualTo(createBarrier);
            if (buildTestHarness != null) {
                buildTestHarness.close();
            }
        } catch (Throwable th) {
            if (buildTestHarness != null) {
                try {
                    buildTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testSourceCheckpointFirstUnaligned() throws Exception {
        StreamTaskMailboxTestHarness<String> buildTestHarness = MultipleInputStreamTaskTest.buildTestHarness(true, this.objectReuse);
        try {
            buildTestHarness.setAutoProcess(false);
            ArrayDeque arrayDeque = new ArrayDeque();
            addRecords(buildTestHarness);
            CheckpointBarrier createBarrier = createBarrier(buildTestHarness);
            CompletableFuture triggerCheckpointAsync = buildTestHarness.getStreamTask().triggerCheckpointAsync(this.metaData, createBarrier.getCheckpointOptions());
            Objects.requireNonNull(triggerCheckpointAsync);
            processSingleStepUntil(buildTestHarness, triggerCheckpointAsync::isDone);
            Assertions.assertThat(buildTestHarness.getOutput()).containsExactly(new Object[]{createBarrier});
            buildTestHarness.processAll();
            arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
            Assertions.assertThat(new ArrayList(buildTestHarness.getOutput()).subList(1, arrayDeque.size() + 1)).containsExactlyInAnyOrderElementsOf(arrayDeque);
            if (buildTestHarness != null) {
                buildTestHarness.close();
            }
        } catch (Throwable th) {
            if (buildTestHarness != null) {
                try {
                    buildTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testSourceCheckpointLast() throws Exception {
        StreamTaskMailboxTestHarness<String> buildTestHarness = MultipleInputStreamTaskTest.buildTestHarness(this.objectReuse);
        try {
            buildTestHarness.setAutoProcess(false);
            ArrayDeque arrayDeque = new ArrayDeque();
            CheckpointBarrier createBarrier = createBarrier(buildTestHarness);
            addRecordsAndBarriers(buildTestHarness, createBarrier);
            buildTestHarness.processAll();
            CompletableFuture triggerCheckpointAsync = buildTestHarness.getStreamTask().triggerCheckpointAsync(this.metaData, createBarrier.getCheckpointOptions());
            Objects.requireNonNull(triggerCheckpointAsync);
            processSingleStepUntil(buildTestHarness, triggerCheckpointAsync::isDone);
            arrayDeque.add(new StreamRecord("42", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("42", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("42", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
            ArrayList arrayList = new ArrayList(buildTestHarness.getOutput());
            Assertions.assertThat(arrayList.subList(0, arrayDeque.size())).containsExactlyInAnyOrderElementsOf(arrayDeque);
            Assertions.assertThat(arrayList.get(arrayDeque.size())).isEqualTo(createBarrier);
            if (buildTestHarness != null) {
                buildTestHarness.close();
            }
        } catch (Throwable th) {
            if (buildTestHarness != null) {
                try {
                    buildTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testSourceCheckpointLastUnaligned() throws Exception {
        StreamTaskMailboxTestHarness<String> buildTestHarness = MultipleInputStreamTaskTest.buildTestHarness(true, this.objectReuse);
        try {
            buildTestHarness.setAutoProcess(false);
            ArrayDeque arrayDeque = new ArrayDeque();
            addNetworkRecords(buildTestHarness);
            CheckpointBarrier createBarrier = createBarrier(buildTestHarness);
            addBarriers(buildTestHarness, createBarrier);
            buildTestHarness.processAll();
            MultipleInputStreamTaskTest.addSourceRecords(buildTestHarness, 1, 1337, 1337, 1337);
            buildTestHarness.processAll();
            arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
            arrayDeque.add(createBarrier);
            Assertions.assertThat(buildTestHarness.getOutput()).containsExactlyInAnyOrderElementsOf(arrayDeque);
            if (buildTestHarness != null) {
                buildTestHarness.close();
            }
        } catch (Throwable th) {
            if (buildTestHarness != null) {
                try {
                    buildTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testStopWithSavepointDrainWaitsForSourcesFinish() throws Exception {
        StreamTaskMailboxTestHarness<String> build = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).setCollectNetworkEvents().modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(this.objectReuse)).modifyStreamConfig(streamConfig -> {
            streamConfig.setCheckpointingEnabled(true);
        }).addInput(BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory(new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1), WatermarkStrategy.noWatermarks()), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO).addSourceInput(new SourceOperatorFactory(new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1), WatermarkStrategy.noWatermarks()), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MultipleInputStreamTaskTest.MapToStringMultipleInputOperatorFactory(4, true)).build();
        try {
            build.setAutoProcess(false);
            ArrayDeque arrayDeque = new ArrayDeque();
            CheckpointBarrier createStopWithSavepointDrainBarrier = createStopWithSavepointDrainBarrier();
            build.processElement(new StreamRecord("44", Long.MIN_VALUE), 0);
            build.processEvent(new EndOfData(StopMode.DRAIN), 0);
            build.processEvent(createStopWithSavepointDrainBarrier, 0);
            build.processElement(new StreamRecord(Double.valueOf(47.0d), Long.MIN_VALUE), 1);
            build.processEvent(new EndOfData(StopMode.DRAIN), 1);
            build.processEvent(createStopWithSavepointDrainBarrier, 1);
            MultipleInputStreamTaskTest.addSourceRecords(build, 1, Boundedness.CONTINUOUS_UNBOUNDED, 1, 2);
            MultipleInputStreamTaskTest.addSourceRecords(build, 2, Boundedness.CONTINUOUS_UNBOUNDED, 3, 4);
            build.processAll();
            CompletableFuture triggerCheckpointAsync = build.getStreamTask().triggerCheckpointAsync(this.metaData, createStopWithSavepointDrainBarrier.getCheckpointOptions());
            Objects.requireNonNull(triggerCheckpointAsync);
            processSingleStepUntil(build, triggerCheckpointAsync::isDone);
            arrayDeque.add(new StreamRecord("3", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("47.0", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("44", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("1", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("4", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("2", Long.MIN_VALUE));
            ArrayList arrayList = new ArrayList(build.getOutput());
            Assertions.assertThat(arrayList.subList(0, arrayDeque.size())).containsExactlyInAnyOrderElementsOf(arrayDeque);
            Assertions.assertThat(arrayList.subList(arrayList.size() - 3, arrayList.size())).containsExactly(new Object[]{new StreamRecord("FINISH"), new EndOfData(StopMode.DRAIN), createStopWithSavepointDrainBarrier});
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testOnlyOneSource() throws Exception {
        StreamTaskMailboxTestHarness<String> build = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(this.objectReuse)).addSourceInput(new SourceOperatorFactory(new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MultipleInputStreamTaskTest.MapToStringMultipleInputOperatorFactory(1)).build();
        try {
            build.setAutoProcess(false);
            ArrayDeque arrayDeque = new ArrayDeque();
            MultipleInputStreamTaskTest.addSourceRecords(build, 0, 42, 43, 44);
            processSingleStepUntil(build, () -> {
                return Boolean.valueOf(!build.getOutput().isEmpty());
            });
            arrayDeque.add(new StreamRecord("42", Long.MIN_VALUE));
            CheckpointBarrier createBarrier = createBarrier(build);
            CompletableFuture triggerCheckpointAsync = build.getStreamTask().triggerCheckpointAsync(this.metaData, createBarrier.getCheckpointOptions());
            Objects.requireNonNull(triggerCheckpointAsync);
            processSingleStepUntil(build, triggerCheckpointAsync::isDone);
            ArrayList arrayList = new ArrayList(build.getOutput());
            Assertions.assertThat(arrayList.subList(0, arrayDeque.size())).containsExactlyInAnyOrderElementsOf(arrayDeque);
            Assertions.assertThat(arrayList.get(arrayDeque.size())).isEqualTo(createBarrier);
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testTriggerAlignedNoTimeoutCheckpointWithFinishedChannelsAndSourceChain() throws Exception {
        testTriggerCheckpointWithFinishedChannelsAndSourceChain(CheckpointOptions.alignedNoTimeout(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()));
    }

    @TestTemplate
    void testTriggerUnalignedCheckpointWithFinishedChannelsAndSourceChain() throws Exception {
        testTriggerCheckpointWithFinishedChannelsAndSourceChain(CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()));
    }

    @TestTemplate
    void testTriggerAlignedWithTimeoutCheckpointWithFinishedChannelsAndSourceChain() throws Exception {
        testTriggerCheckpointWithFinishedChannelsAndSourceChain(CheckpointOptions.alignedWithTimeout(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault(), 10L));
    }

    private void testTriggerCheckpointWithFinishedChannelsAndSourceChain(CheckpointOptions checkpointOptions) throws Exception {
        ResultPartition[] resultPartitionArr = new ResultPartition[2];
        for (int i = 0; i < resultPartitionArr.length; i++) {
            try {
                resultPartitionArr[i] = PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
                resultPartitionArr[i].setup();
            } finally {
                for (ResultPartition resultPartition : resultPartitionArr) {
                    if (resultPartition != null) {
                        resultPartition.close();
                    }
                }
            }
        }
        CompletingCheckpointResponder completingCheckpointResponder = new CompletingCheckpointResponder();
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyStreamConfig(streamConfig -> {
            streamConfig.setCheckpointingEnabled(true);
            streamConfig.setUnalignedCheckpointsEnabled(checkpointOptions.isUnalignedCheckpoint() || checkpointOptions.isTimeoutable());
        }).modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(this.objectReuse)).setCheckpointResponder(completingCheckpointResponder).addInput(BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory(new MultipleInputStreamTaskTest.LifeCycleTrackingMockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1), WatermarkStrategy.noWatermarks()), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO).addSourceInput(new SourceOperatorFactory(new MultipleInputStreamTaskTest.LifeCycleTrackingMockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1), WatermarkStrategy.noWatermarks()), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO).addAdditionalOutput(resultPartitionArr).setupOperatorChain((StreamOperatorFactory<?>) new MultipleInputStreamTaskTest.MapToStringMultipleInputOperatorFactory(4)).finishForSingletonOperatorChain(StringSerializer.INSTANCE)).build();
        try {
            StreamTask<OUT, ?> streamTask = build.streamTask;
            Objects.requireNonNull(streamTask);
            Consumer<Long> consumer = (v1) -> {
                r1.notifyCheckpointCompleteAsync(v1);
            };
            StreamTask<OUT, ?> streamTask2 = build.streamTask;
            Objects.requireNonNull(streamTask2);
            completingCheckpointResponder.setHandlers(consumer, (v1, v2) -> {
                r2.notifyCheckpointAbortAsync(v1, v2);
            });
            build.getStreamTask().getCheckpointBarrierHandler().get();
            StreamTaskFinalCheckpointsTest.triggerCheckpoint(build, 2L, checkpointOptions);
            build.processAll();
            build.processEvent(new EndOfData(StopMode.DRAIN), 0, 0);
            build.processEvent(new EndOfData(StopMode.DRAIN), 1, 0);
            build.processEvent(EndOfPartitionEvent.INSTANCE, 0, 0);
            build.processEvent(EndOfPartitionEvent.INSTANCE, 1, 0);
            build.getTaskStateManager().getWaitForReportLatch().await();
            Assertions.assertThat(build.getTaskStateManager().getReportedCheckpointId()).isEqualTo(2L);
            CompletableFuture<Boolean> triggerCheckpoint = StreamTaskFinalCheckpointsTest.triggerCheckpoint(build, 4L, checkpointOptions);
            triggerCheckpoint.thenAccept(bool -> {
                for (ResultPartition resultPartition2 : resultPartitionArr) {
                    resultPartition2.onSubpartitionAllDataProcessed(0);
                }
            });
            build.processAll();
            build.finishProcessing();
            Assertions.assertThat(triggerCheckpoint).isDone();
            build.getTaskStateManager().getWaitForReportLatch().await();
            Assertions.assertThat(build.getTaskStateManager().getReportedCheckpointId()).isEqualTo(4L);
            for (ResultPartition resultPartition2 : resultPartitionArr) {
                Assertions.assertThat(resultPartition2.getNumberOfQueuedBuffers()).isEqualTo(3);
            }
            if (build != null) {
                build.close();
            }
        } finally {
        }
    }

    @TestTemplate
    void testSkipExecutionsIfFinishedOnRestoreWithSourceChained() throws Exception {
        OperatorID operatorID = new OperatorID();
        OperatorID operatorID2 = new OperatorID();
        OperatorID operatorID3 = new OperatorID();
        ArrayList arrayList = new ArrayList();
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyStreamConfig(streamConfig -> {
            streamConfig.setCheckpointingEnabled(true);
        }).modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(this.objectReuse)).addInput(BasicTypeInfo.INT_TYPE_INFO).addAdditionalOutput(new RecordOrEventCollectingResultPartitionWriter<StreamElement>(arrayList, new StreamElementSerializer(IntSerializer.INSTANCE)) { // from class: org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskChainedSourcesCheckpointingTest.1
            public void notifyEndOfData(StopMode stopMode) throws IOException {
                broadcastEvent(new EndOfData(stopMode), false);
            }
        }).addSourceInput(operatorID, new SourceOperatorFactory(new SourceOperatorStreamTaskTest.LifeCycleMonitorSource(Boundedness.CONTINUOUS_UNBOUNDED, 1), WatermarkStrategy.noWatermarks()), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO).addSourceInput(operatorID2, new SourceOperatorFactory(new SourceOperatorStreamTaskTest.LifeCycleMonitorSource(Boundedness.CONTINUOUS_UNBOUNDED, 1), WatermarkStrategy.noWatermarks()), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO).setTaskStateSnapshot(1L, TaskStateSnapshot.FINISHED_ON_RESTORE).setupOperatorChain(operatorID3, (StreamOperatorFactory<?>) new LifeCycleMonitorMultipleInputOperatorFactory()).chain(new TestFinishedOnRestoreStreamOperator(), StringSerializer.INSTANCE).finish()).build();
        try {
            build.processElement(Watermark.MAX_WATERMARK);
            Assertions.assertThat(arrayList).isEmpty();
            build.waitForTaskCompletion();
            Assertions.assertThat(arrayList).containsExactly(new Object[]{Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)});
            for (StreamOperatorWrapper streamOperatorWrapper : build.getStreamTask().operatorChain.getAllOperators()) {
                if (streamOperatorWrapper.getStreamOperator() instanceof SourceOperator) {
                    streamOperatorWrapper.getStreamOperator().getSourceReader().getLifeCycleMonitor().assertCallTimes(0, LifeCycleMonitor.LifeCyclePhase.values());
                }
            }
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void addRecordsAndBarriers(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness, CheckpointBarrier checkpointBarrier) throws Exception {
        addRecords(streamTaskMailboxTestHarness);
        addBarriers(streamTaskMailboxTestHarness, checkpointBarrier);
    }

    private CheckpointBarrier createStopWithSavepointDrainBarrier() {
        return new CheckpointBarrier(this.metaData.getCheckpointId(), this.metaData.getTimestamp(), CheckpointOptions.alignedNoTimeout(SavepointType.terminate(SavepointFormatType.CANONICAL), CheckpointStorageLocationReference.getDefault()));
    }

    private CheckpointBarrier createBarrier(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness) {
        StreamConfig configuration = streamTaskMailboxTestHarness.getStreamTask().getConfiguration();
        return new CheckpointBarrier(this.metaData.getCheckpointId(), this.metaData.getTimestamp(), CheckpointOptions.forConfig(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault(), configuration.isExactlyOnceCheckpointMode(), configuration.isUnalignedCheckpointsEnabled(), configuration.getAlignedCheckpointTimeout().toMillis()));
    }

    private void addBarriers(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness, CheckpointBarrier checkpointBarrier) throws Exception {
        streamTaskMailboxTestHarness.processEvent(checkpointBarrier, 0);
        streamTaskMailboxTestHarness.processEvent(checkpointBarrier, 1);
    }

    private void addRecords(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness) throws Exception {
        MultipleInputStreamTaskTest.addSourceRecords(streamTaskMailboxTestHarness, 1, 42, 42, 42);
        addNetworkRecords(streamTaskMailboxTestHarness);
    }

    private void addNetworkRecords(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness) throws Exception {
        streamTaskMailboxTestHarness.processElement(new StreamRecord("44", Long.MIN_VALUE), 0);
        streamTaskMailboxTestHarness.processElement(new StreamRecord("44", Long.MIN_VALUE), 0);
        streamTaskMailboxTestHarness.processElement(new StreamRecord(Double.valueOf(47.0d), Long.MIN_VALUE), 1);
        streamTaskMailboxTestHarness.processElement(new StreamRecord(Double.valueOf(47.0d), Long.MIN_VALUE), 1);
    }

    private void processSingleStepUntil(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness, Supplier<Boolean> supplier) throws Exception {
        Assertions.assertThat(supplier.get()).isFalse();
        for (int i = 0; i < MAX_STEPS && !supplier.get().booleanValue(); i++) {
            streamTaskMailboxTestHarness.processSingleStep();
        }
        Assertions.assertThat(supplier.get()).isTrue();
    }
}
