package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.LifeCycleMonitor;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.SerializedValue;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.class */
class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
    private static final int NUM_RECORDS = 10;
    private static final OperatorID OPERATOR_ID = new OperatorID();
    public static final CheckpointStorageLocationReference SAVEPOINT_LOCATION = new CheckpointStorageLocationReference("Savepoint".getBytes());
    public static final CheckpointStorageLocationReference CHECKPOINT_LOCATION = new CheckpointStorageLocationReference("Checkpoint".getBytes());

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest$LifeCycleMonitorSource.class */
    static class LifeCycleMonitorSource extends MockSource {
        public LifeCycleMonitorSource(Boundedness boundedness, int i) {
            super(boundedness, i);
        }

        public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext sourceReaderContext) {
            return new LifeCycleMonitorSourceReader();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest$LifeCycleMonitorSourceReader.class */
    static class LifeCycleMonitorSourceReader extends MockSourceReader {
        private final LifeCycleMonitor lifeCycleMonitor = new LifeCycleMonitor();

        LifeCycleMonitorSourceReader() {
        }

        public void start() {
            this.lifeCycleMonitor.incrementCallTime(LifeCycleMonitor.LifeCyclePhase.OPEN);
            super.start();
        }

        public InputStatus pollNext(ReaderOutput<Integer> readerOutput) throws Exception {
            this.lifeCycleMonitor.incrementCallTime(LifeCycleMonitor.LifeCyclePhase.PROCESS_ELEMENT);
            return super.pollNext(readerOutput);
        }

        public void close() throws Exception {
            this.lifeCycleMonitor.incrementCallTime(LifeCycleMonitor.LifeCyclePhase.CLOSE);
            super.close();
        }

        public LifeCycleMonitor getLifeCycleMonitor() {
            return this.lifeCycleMonitor;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest$TestingExternallyInducedSource.class */
    private static class TestingExternallyInducedSource extends MockSource {
        private static final long serialVersionUID = 3078454109555893721L;
        private final TestingExternallyInducedSourceReader reader;

        private TestingExternallyInducedSource(TestingExternallyInducedSourceReader testingExternallyInducedSourceReader) {
            super(Boundedness.CONTINUOUS_UNBOUNDED, 1);
            this.reader = testingExternallyInducedSourceReader;
        }

        public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext sourceReaderContext) {
            return this.reader;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest$TestingExternallyInducedSourceReader.class */
    public static class TestingExternallyInducedSourceReader implements ExternallyInducedSourceReader<Integer, MockSourceSplit>, Serializable {
        private static final long CHECKPOINT_ID = 1234;
        private final int numEventsBeforeCheckpoint;
        private final int totalNumEvents;
        private int numEmittedEvents = 0;
        private boolean checkpointed = false;
        private int checkpointedAt = -1;
        private long checkpointedId;

        TestingExternallyInducedSourceReader(int i, int i2) {
            this.numEventsBeforeCheckpoint = i;
            this.totalNumEvents = i2;
        }

        public Optional<Long> shouldTriggerCheckpoint() {
            return (this.numEmittedEvents != this.numEventsBeforeCheckpoint || this.checkpointed) ? Optional.empty() : Optional.of(Long.valueOf(CHECKPOINT_ID));
        }

        public void start() {
        }

        public InputStatus pollNext(ReaderOutput<Integer> readerOutput) throws Exception {
            if (this.numEmittedEvents == this.numEventsBeforeCheckpoint - 1) {
                this.numEmittedEvents++;
                return InputStatus.NOTHING_AVAILABLE;
            }
            if (this.numEmittedEvents >= this.totalNumEvents) {
                return InputStatus.END_OF_INPUT;
            }
            this.numEmittedEvents++;
            return InputStatus.MORE_AVAILABLE;
        }

        public List<MockSourceSplit> snapshotState(long j) {
            this.checkpointed = true;
            this.checkpointedAt = this.numEmittedEvents;
            this.checkpointedId = j;
            return Collections.emptyList();
        }

        public CompletableFuture<Void> isAvailable() {
            return CompletableFuture.completedFuture(null);
        }

        public void addSplits(List<MockSourceSplit> list) {
        }

        public void notifyNoMoreSplits() {
        }

        public void close() throws Exception {
        }
    }

    SourceOperatorStreamTaskTest() {
    }

    @Test
    void testMetrics() throws Exception {
        testMetrics(SourceOperatorStreamTask::new, new SourceOperatorFactory(new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()), abstractDoubleAssert -> {
            abstractDoubleAssert.isLessThanOrEqualTo(1000000.0d);
        });
    }

    @Test
    void testSnapshotAndRestore() throws Exception {
        executeAndWaitForCheckpoint(2L, executeAndWaitForCheckpoint(1L, null, IntStream.range(0, NUM_RECORDS)), IntStream.range(NUM_RECORDS, 20));
    }

    @Test
    void testSnapshotAndAdvanceToEndOfEventTime() throws Exception {
        StreamTaskMailboxTestHarness<Integer> createTestHarness = createTestHarness(1L, null);
        Throwable th = null;
        try {
            try {
                getAndMaybeAssignSplit(createTestHarness);
                CheckpointOptions checkpointOptions = new CheckpointOptions(SavepointType.terminate(SavepointFormatType.CANONICAL), CheckpointStorageLocationReference.getDefault());
                triggerCheckpointWaitForFinish(createTestHarness, 1L, checkpointOptions);
                LinkedList linkedList = new LinkedList();
                linkedList.add(Watermark.MAX_WATERMARK);
                linkedList.add(new EndOfData(StopMode.DRAIN));
                linkedList.add(new CheckpointBarrier(1L, 1L, checkpointOptions));
                TestHarnessUtil.assertOutputEquals("Output was not correct.", linkedList, createTestHarness.getOutput());
                if (createTestHarness != null) {
                    if (0 == 0) {
                        createTestHarness.close();
                        return;
                    }
                    try {
                        createTestHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTestHarness != null) {
                if (th != null) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testEmittingMaxWatermarkAfterReadingAllRecords() throws Exception {
        StreamTaskMailboxTestHarness<Integer> createTestHarness = createTestHarness();
        Throwable th = null;
        try {
            createTestHarness.processAll();
            createTestHarness.finishProcessing();
            LinkedList linkedList = new LinkedList();
            linkedList.add(Watermark.MAX_WATERMARK);
            linkedList.add(new EndOfData(StopMode.DRAIN));
            Assertions.assertThat(createTestHarness.getOutput()).containsExactlyElementsOf(linkedList);
            if (createTestHarness != null) {
                if (0 == 0) {
                    createTestHarness.close();
                    return;
                }
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testNotEmittingMaxWatermarkAfterCancelling() throws Exception {
        StreamTaskMailboxTestHarness<Integer> createTestHarness = createTestHarness();
        Throwable th = null;
        try {
            createTestHarness.getStreamTask().cancel();
            createTestHarness.finishProcessing();
            Assertions.assertThat(createTestHarness.getOutput()).hasSize(0);
            if (createTestHarness != null) {
                if (0 == 0) {
                    createTestHarness.close();
                    return;
                }
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (0 != 0) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    static Stream<?> provideExternallyInducedParameters() {
        return Stream.of((Object[]) new CheckpointOptions[]{CheckpointOptions.alignedNoTimeout(SavepointType.savepoint(SavepointFormatType.CANONICAL), SAVEPOINT_LOCATION), CheckpointOptions.alignedNoTimeout(SavepointType.terminate(SavepointFormatType.CANONICAL), SAVEPOINT_LOCATION), CheckpointOptions.alignedNoTimeout(SavepointType.suspend(SavepointFormatType.CANONICAL), SAVEPOINT_LOCATION), CheckpointOptions.alignedNoTimeout(CheckpointType.CHECKPOINT, CHECKPOINT_LOCATION), CheckpointOptions.alignedWithTimeout(CheckpointType.CHECKPOINT, CHECKPOINT_LOCATION, 123L), CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, CHECKPOINT_LOCATION), CheckpointOptions.notExactlyOnce(CheckpointType.CHECKPOINT, CHECKPOINT_LOCATION)}).flatMap(checkpointOptions -> {
            return Stream.of(new Object[]{checkpointOptions, true}, new Object[]{checkpointOptions, false});
        });
    }

    /* JADX WARN: Removed duplicated region for block: B:13:0x0142  */
    /* JADX WARN: Removed duplicated region for block: B:25:0x0194 A[ORIG_RETURN, RETURN] */
    @org.junit.jupiter.params.provider.MethodSource({"provideExternallyInducedParameters"})
    @org.junit.jupiter.params.ParameterizedTest
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    void testExternallyInducedSource(org.apache.flink.runtime.checkpoint.CheckpointOptions r13, boolean r14) throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 405
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTaskTest.testExternallyInducedSource(org.apache.flink.runtime.checkpoint.CheckpointOptions, boolean):void");
    }

    @Test
    void testSkipExecutionIfFinishedOnRestore() throws Exception {
        TaskStateSnapshot taskStateSnapshot = TaskStateSnapshot.FINISHED_ON_RESTORE;
        StreamOperatorFactory<?> sourceOperatorFactory = new SourceOperatorFactory<>(new LifeCycleMonitorSource(Boundedness.CONTINUOUS_UNBOUNDED, NUM_RECORDS), WatermarkStrategy.noWatermarks());
        ArrayList arrayList = new ArrayList();
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).setTaskStateSnapshot(1L, taskStateSnapshot).addAdditionalOutput(new RecordOrEventCollectingResultPartitionWriter<StreamElement>(arrayList, new StreamElementSerializer(IntSerializer.INSTANCE)) { // from class: org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTaskTest.1
            public void notifyEndOfData(StopMode stopMode) throws IOException {
                broadcastEvent(new EndOfData(stopMode), false);
            }
        }).setupOperatorChain(sourceOperatorFactory).chain((OneInputStreamOperator) new TestFinishedOnRestoreStreamOperator(), (TypeSerializer) StringSerializer.INSTANCE).finish()).build();
        Throwable th = null;
        try {
            build.getStreamTask().invoke();
            build.processAll();
            Assertions.assertThat(arrayList).containsExactly(new Object[]{Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)});
            build.getStreamTask().getMainOperator().getSourceReader().getLifeCycleMonitor().assertCallTimes(0, LifeCycleMonitor.LifeCyclePhase.values());
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testTriggeringStopWithSavepointWithDrain() throws Exception {
        StreamOperatorFactory<?> sourceOperatorFactory = new SourceOperatorFactory<>(new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 2), WatermarkStrategy.noWatermarks());
        final CompletableFuture completableFuture = new CompletableFuture();
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain(sourceOperatorFactory).setCheckpointResponder(new TestCheckpointResponder() { // from class: org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTaskTest.2
            public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics, TaskStateSnapshot taskStateSnapshot) {
                super.acknowledgeCheckpoint(jobID, executionAttemptID, j, checkpointMetrics, taskStateSnapshot);
                completableFuture.complete(null);
            }
        }).build();
        Throwable th = null;
        try {
            try {
                CompletableFuture triggerCheckpointAsync = build.streamTask.triggerCheckpointAsync(new CheckpointMetaData(2L, 2L), CheckpointOptions.alignedNoTimeout(SavepointType.terminate(SavepointFormatType.CANONICAL), SAVEPOINT_LOCATION));
                completableFuture.whenComplete((bool, th2) -> {
                    build.streamTask.notifyCheckpointCompleteAsync(2L);
                });
                build.waitForTaskCompletion();
                build.finishProcessing();
                Assertions.assertThat(triggerCheckpointAsync.isDone()).isTrue();
                Assertions.assertThat((Boolean) triggerCheckpointAsync.get()).isTrue();
                Assertions.assertThat(completableFuture.isDone()).isTrue();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    build.close();
                }
            }
            throw th5;
        }
    }

    private TaskStateSnapshot executeAndWaitForCheckpoint(long j, TaskStateSnapshot taskStateSnapshot, IntStream intStream) throws Exception {
        StreamTaskMailboxTestHarness<Integer> createTestHarness = createTestHarness(j, taskStateSnapshot);
        Throwable th = null;
        try {
            try {
                addRecords(getAndMaybeAssignSplit(createTestHarness), NUM_RECORDS);
                createTestHarness.processAll();
                CheckpointOptions forCheckpointWithDefaultLocation = CheckpointOptions.forCheckpointWithDefaultLocation();
                triggerCheckpointWaitForFinish(createTestHarness, j, forCheckpointWithDefaultLocation);
                LinkedList linkedList = new LinkedList();
                intStream.forEach(i -> {
                    linkedList.offer(new StreamRecord(Integer.valueOf(i), Long.MIN_VALUE));
                });
                linkedList.add(new CheckpointBarrier(j, j, forCheckpointWithDefaultLocation));
                Assertions.assertThat(createTestHarness.taskStateManager.getReportedCheckpointId()).isEqualTo(j);
                TestHarnessUtil.assertOutputEquals("Output was not correct.", linkedList, createTestHarness.getOutput());
                TaskStateSnapshot lastJobManagerTaskStateSnapshot = createTestHarness.taskStateManager.getLastJobManagerTaskStateSnapshot();
                if (createTestHarness != null) {
                    if (0 != 0) {
                        try {
                            createTestHarness.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createTestHarness.close();
                    }
                }
                return lastJobManagerTaskStateSnapshot;
            } finally {
            }
        } catch (Throwable th3) {
            if (createTestHarness != null) {
                if (th != null) {
                    try {
                        createTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestHarness.close();
                }
            }
            throw th3;
        }
    }

    private void triggerCheckpointWaitForFinish(StreamTaskMailboxTestHarness<Integer> streamTaskMailboxTestHarness, long j, CheckpointOptions checkpointOptions) throws Exception {
        streamTaskMailboxTestHarness.taskStateManager.getWaitForReportLatch().reset();
        CompletableFuture triggerCheckpointAsync = streamTaskMailboxTestHarness.getStreamTask().triggerCheckpointAsync(new CheckpointMetaData(j, j), checkpointOptions);
        getSourceReaderFromTask(streamTaskMailboxTestHarness).markAvailable();
        triggerCheckpointAsync.getClass();
        processUntil(streamTaskMailboxTestHarness, triggerCheckpointAsync::isDone);
        Future notifyCheckpointCompleteAsync = streamTaskMailboxTestHarness.getStreamTask().notifyCheckpointCompleteAsync(j);
        notifyCheckpointCompleteAsync.getClass();
        processUntil(streamTaskMailboxTestHarness, notifyCheckpointCompleteAsync::isDone);
        streamTaskMailboxTestHarness.taskStateManager.getWaitForReportLatch().await();
    }

    private void processUntil(StreamTaskMailboxTestHarness streamTaskMailboxTestHarness, Supplier<Boolean> supplier) throws Exception {
        do {
            streamTaskMailboxTestHarness.getStreamTask().runMailboxStep();
        } while (!supplier.get().booleanValue());
    }

    private StreamTaskMailboxTestHarness<Integer> createTestHarness() throws Exception {
        return createTestHarness(0L, null);
    }

    private StreamTaskMailboxTestHarness<Integer> createTestHarness(long j, TaskStateSnapshot taskStateSnapshot) throws Exception {
        return createTestHarness(new MockSource(Boundedness.BOUNDED, 1), j, taskStateSnapshot);
    }

    private StreamTaskMailboxTestHarness<Integer> createTestHarness(MockSource mockSource, long j, TaskStateSnapshot taskStateSnapshot) throws Exception {
        StreamOperatorFactory<?> sourceOperatorFactory = new SourceOperatorFactory<>(mockSource, WatermarkStrategy.noWatermarks());
        StreamTaskMailboxTestHarnessBuilder streamTaskMailboxTestHarnessBuilder = new StreamTaskMailboxTestHarnessBuilder(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO);
        if (taskStateSnapshot != null) {
            streamTaskMailboxTestHarnessBuilder.setTaskStateSnapshot(j, taskStateSnapshot);
        }
        return streamTaskMailboxTestHarnessBuilder.setCollectNetworkEvents().setupOutputForSingletonOperatorChain(sourceOperatorFactory, OPERATOR_ID).build();
    }

    private MockSourceSplit getAndMaybeAssignSplit(StreamTaskMailboxTestHarness<Integer> streamTaskMailboxTestHarness) throws Exception {
        List assignedSplits = getSourceReaderFromTask(streamTaskMailboxTestHarness).getAssignedSplits();
        if (assignedSplits.isEmpty()) {
            streamTaskMailboxTestHarness.getStreamTask().dispatchOperatorEvent(OPERATOR_ID, new SerializedValue(new AddSplitEvent(Collections.singletonList(new MockSourceSplit(0, 0)), new MockSourceSplitSerializer())));
            while (assignedSplits.isEmpty()) {
                streamTaskMailboxTestHarness.getStreamTask().runMailboxStep();
            }
            getSourceReaderFromTask(streamTaskMailboxTestHarness).markAvailable();
        }
        return (MockSourceSplit) assignedSplits.get(0);
    }

    private void addRecords(MockSourceSplit mockSourceSplit, int i) {
        int index = mockSourceSplit.index();
        for (int i2 = index; i2 < index + i; i2++) {
            mockSourceSplit.addRecord(i2);
        }
    }

    private MockSourceReader getSourceReaderFromTask(StreamTaskMailboxTestHarness<Integer> streamTaskMailboxTestHarness) {
        return streamTaskMailboxTestHarness.getStreamTask().mainOperator.getSourceReader();
    }
}
