package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorTestContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskChainedSourcesCheckpointingTest;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.CompletingCheckpointResponder;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsMapContaining;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.class */
public class MultipleInputStreamTaskTest {
    private static final List<String> LIFE_CYCLE_EVENTS = new ArrayList();

    @Parameterized.Parameter
    public boolean objectReuse;

    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$CopyProxySerializer.class */
    private static class CopyProxySerializer extends TypeSerializer<Integer> {
        SharedReference<List<Integer>> copiedElementsRef;

        public CopyProxySerializer(SharedReference<List<Integer>> sharedReference) {
            this.copiedElementsRef = sharedReference;
        }

        public boolean isImmutableType() {
            return false;
        }

        public TypeSerializer<Integer> duplicate() {
            return new CopyProxySerializer(this.copiedElementsRef);
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public Integer m149createInstance() {
            return IntSerializer.INSTANCE.createInstance();
        }

        public Integer copy(Integer num) {
            this.copiedElementsRef.applySync(list -> {
                return Boolean.valueOf(list.add(num));
            });
            return IntSerializer.INSTANCE.copy(num);
        }

        public Integer copy(Integer num, Integer num2) {
            this.copiedElementsRef.applySync(list -> {
                return Boolean.valueOf(list.add(num));
            });
            return IntSerializer.INSTANCE.copy(num, num2);
        }

        public int getLength() {
            return IntSerializer.INSTANCE.getLength();
        }

        public void serialize(Integer num, DataOutputView dataOutputView) throws IOException {
            IntSerializer.INSTANCE.serialize(num, dataOutputView);
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Integer m148deserialize(DataInputView dataInputView) throws IOException {
            return IntSerializer.INSTANCE.deserialize(dataInputView);
        }

        public Integer deserialize(Integer num, DataInputView dataInputView) throws IOException {
            return IntSerializer.INSTANCE.deserialize(num, dataInputView);
        }

        public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
            throw new UnsupportedOperationException();
        }

        public boolean equals(Object obj) {
            throw new UnsupportedOperationException();
        }

        public int hashCode() {
            throw new UnsupportedOperationException();
        }

        public TypeSerializerSnapshot<Integer> snapshotConfiguration() {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$DuplicatingOperator.class */
    static class DuplicatingOperator extends AbstractStreamOperatorV2<String> implements MultipleInputStreamOperator<String> {

        /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$DuplicatingOperator$DuplicatingInput.class */
        class DuplicatingInput extends AbstractInput<String, String> {
            public DuplicatingInput(AbstractStreamOperatorV2<String> abstractStreamOperatorV2, int i) {
                super(abstractStreamOperatorV2, i);
            }

            public void processElement(StreamRecord<String> streamRecord) throws Exception {
                this.output.collect(streamRecord);
                this.output.collect(streamRecord);
            }
        }

        public DuplicatingOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            super(streamOperatorParameters, 3);
        }

        public List<Input> getInputs() {
            return Arrays.asList(new DuplicatingInput(this, 1), new DuplicatingInput(this, 2), new DuplicatingInput(this, 3));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$LifeCycleTrackingMap.class */
    static class LifeCycleTrackingMap<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T>, BoundedOneInput {
        public static final String OPEN = "LifeCycleTrackingMap#open";
        public static final String CLOSE = "LifeCycleTrackingMap#close";
        public static final String END_INPUT = "LifeCycleTrackingMap#endInput";

        LifeCycleTrackingMap() {
        }

        public void processElement(StreamRecord<T> streamRecord) throws Exception {
            this.output.collect(streamRecord);
        }

        public void open() throws Exception {
            MultipleInputStreamTaskTest.LIFE_CYCLE_EVENTS.add(OPEN);
            super.open();
        }

        public void close() throws Exception {
            MultipleInputStreamTaskTest.LIFE_CYCLE_EVENTS.add(CLOSE);
            super.close();
        }

        public void endInput() throws Exception {
            MultipleInputStreamTaskTest.LIFE_CYCLE_EVENTS.add(END_INPUT);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$LifeCycleTrackingMapToStringMultipleInputOperator.class */
    static class LifeCycleTrackingMapToStringMultipleInputOperator extends MapToStringMultipleInputOperator implements BoundedMultiInput {
        public static final String OPEN = "MultipleInputOperator#open";
        public static final String CLOSE = "MultipleInputOperator#close";
        public static final String FINISH = "MultipleInputOperator#finish";
        public static final String END_INPUT = "MultipleInputOperator#endInput";
        private static final long serialVersionUID = 1;

        public LifeCycleTrackingMapToStringMultipleInputOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            super(streamOperatorParameters, 3);
        }

        @Override // org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskTest.MapToStringMultipleInputOperator
        public void open() throws Exception {
            MultipleInputStreamTaskTest.LIFE_CYCLE_EVENTS.add(OPEN);
            super.open();
        }

        @Override // org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskTest.MapToStringMultipleInputOperator
        public void close() throws Exception {
            MultipleInputStreamTaskTest.LIFE_CYCLE_EVENTS.add(CLOSE);
            super.close();
        }

        public void endInput(int i) {
            MultipleInputStreamTaskTest.LIFE_CYCLE_EVENTS.add(END_INPUT);
        }

        @Override // org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskTest.MapToStringMultipleInputOperator
        public void finish() throws Exception {
            MultipleInputStreamTaskTest.LIFE_CYCLE_EVENTS.add(FINISH);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$LifeCycleTrackingMapToStringMultipleInputOperatorFactory.class */
    static class LifeCycleTrackingMapToStringMultipleInputOperatorFactory extends AbstractStreamOperatorFactory<String> {
        LifeCycleTrackingMapToStringMultipleInputOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            return new LifeCycleTrackingMapToStringMultipleInputOperator(streamOperatorParameters);
        }

        public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
            return LifeCycleTrackingMapToStringMultipleInputOperator.class;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$LifeCycleTrackingMockSource.class */
    public static class LifeCycleTrackingMockSource extends MockSource {
        public LifeCycleTrackingMockSource(Boundedness boundedness, int i) {
            super(boundedness, i);
        }

        public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext sourceReaderContext) {
            LifeCycleTrackingMockSourceReader lifeCycleTrackingMockSourceReader = new LifeCycleTrackingMockSourceReader();
            this.createdReaders.add(lifeCycleTrackingMockSourceReader);
            return lifeCycleTrackingMockSourceReader;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$LifeCycleTrackingMockSourceReader.class */
    static class LifeCycleTrackingMockSourceReader extends MockSourceReader {
        public static final String START = "SourceReader#start";
        public static final String CLOSE = "SourceReader#close";

        LifeCycleTrackingMockSourceReader() {
        }

        public void start() {
            MultipleInputStreamTaskTest.LIFE_CYCLE_EVENTS.add(START);
            super.start();
        }

        public void close() throws Exception {
            MultipleInputStreamTaskTest.LIFE_CYCLE_EVENTS.add(CLOSE);
            super.close();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$MapToStringMultipleInputOperator.class */
    protected static class MapToStringMultipleInputOperator extends AbstractStreamOperatorV2<String> implements MultipleInputStreamOperator<String> {
        private static final long serialVersionUID = 1;
        private final int numberOfInputs;
        private final boolean emitOnFinish;
        private boolean openCalled;
        private boolean closeCalled;

        /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$MapToStringMultipleInputOperator$MapToStringInput.class */
        public class MapToStringInput<T> extends AbstractInput<T, String> {
            public MapToStringInput(AbstractStreamOperatorV2<String> abstractStreamOperatorV2, int i) {
                super(abstractStreamOperatorV2, i);
            }

            public void processElement(StreamRecord<T> streamRecord) throws Exception {
                if (!MapToStringMultipleInputOperator.this.openCalled) {
                    Assert.fail("Open was not called before run.");
                }
                if (streamRecord.hasTimestamp()) {
                    this.output.collect(new StreamRecord(streamRecord.getValue().toString(), streamRecord.getTimestamp()));
                } else {
                    this.output.collect(new StreamRecord(streamRecord.getValue().toString()));
                }
            }
        }

        public MapToStringMultipleInputOperator(StreamOperatorParameters<String> streamOperatorParameters, int i) {
            this(streamOperatorParameters, i, false);
        }

        public MapToStringMultipleInputOperator(StreamOperatorParameters<String> streamOperatorParameters, int i, boolean z) {
            super(streamOperatorParameters, i);
            this.numberOfInputs = i;
            this.emitOnFinish = z;
        }

        public void open() throws Exception {
            super.open();
            if (this.closeCalled) {
                Assert.fail("Close called before open.");
            }
            this.openCalled = true;
        }

        public void finish() throws Exception {
            if (this.emitOnFinish) {
                this.output.collect(new StreamRecord("FINISH"));
            }
        }

        public void close() throws Exception {
            super.close();
            if (!this.openCalled) {
                Assert.fail("Open was not called before close.");
            }
            this.closeCalled = true;
        }

        public List<Input> getInputs() {
            Preconditions.checkArgument(this.numberOfInputs <= 4);
            return Arrays.asList(new MapToStringInput(this, 1), new MapToStringInput(this, 2), new MapToStringInput(this, 3), new MapToStringInput(this, 4)).subList(0, this.numberOfInputs);
        }

        public boolean wasCloseCalled() {
            return this.closeCalled;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$MapToStringMultipleInputOperatorFactory.class */
    public static class MapToStringMultipleInputOperatorFactory extends AbstractStreamOperatorFactory<String> {
        private final int numberOfInputs;
        private final boolean emitOnFinish;

        public MapToStringMultipleInputOperatorFactory(int i) {
            this(i, false);
        }

        public MapToStringMultipleInputOperatorFactory(int i, boolean z) {
            this.numberOfInputs = i;
            this.emitOnFinish = z;
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            return new MapToStringMultipleInputOperator(streamOperatorParameters, this.numberOfInputs, this.emitOnFinish);
        }

        public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
            return MapToStringMultipleInputOperator.class;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest$RecordToWatermarkGenerator.class */
    public static class RecordToWatermarkGenerator implements WatermarkGenerator<Integer>, Serializable {
        private RecordToWatermarkGenerator() {
        }

        public void onEvent(Integer num, long j, WatermarkOutput watermarkOutput) {
            watermarkOutput.emitWatermark(new Watermark(num.intValue()));
        }

        public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
        }
    }

    @Parameterized.Parameters(name = "objectReuse = {0}")
    public static Boolean[] parameters() {
        return new Boolean[]{true, false};
    }

    @Before
    public void setUp() {
        LIFE_CYCLE_EVENTS.clear();
    }

    @Test
    public void testBasicProcessing() throws Exception {
        StreamTaskMailboxTestHarness<String> buildTestHarness = buildTestHarness(this.objectReuse);
        Throwable th = null;
        try {
            ArrayDeque arrayDeque = new ArrayDeque();
            addSourceRecords(buildTestHarness, 1, 42, 43);
            arrayDeque.add(new StreamRecord("42", Long.MIN_VALUE));
            arrayDeque.add(new StreamRecord("43", Long.MIN_VALUE));
            buildTestHarness.processElement(new StreamRecord("Hello", 0 + 1), 0);
            arrayDeque.add(new StreamRecord("Hello", 0 + 1));
            buildTestHarness.processElement(new StreamRecord(Double.valueOf(42.44d), 0 + 3), 1);
            arrayDeque.add(new StreamRecord("42.44", 0 + 3));
            buildTestHarness.endInput();
            buildTestHarness.waitForTaskCompletion();
            MatcherAssert.assertThat(buildTestHarness.getOutput(), Matchers.containsInAnyOrder(arrayDeque.toArray()));
            if (buildTestHarness != null) {
                if (0 == 0) {
                    buildTestHarness.close();
                    return;
                }
                try {
                    buildTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (buildTestHarness != null) {
                if (0 != 0) {
                    try {
                        buildTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    buildTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCopyForObjectReuse() throws Exception {
        SharedReference add = this.sharedObjects.add(new ArrayList());
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(applyObjectReuse(this.objectReuse)).addInput(BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory(new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()), new CopyProxySerializer(add)).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory(3)).build();
        Throwable th = null;
        try {
            addSourceRecords(build, 1, 42, 43);
            build.endInput();
            build.waitForTaskCompletion();
            if (this.objectReuse) {
                Assert.assertTrue(((List) add.get()).isEmpty());
            } else {
                MatcherAssert.assertThat(add.get(), Matchers.containsInAnyOrder(new Integer[]{42, 43}));
            }
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCheckpointBarriers() throws Exception {
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO, 2).addInput(BasicTypeInfo.INT_TYPE_INFO, 2).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory(3)).build();
        Throwable th = null;
        try {
            ArrayDeque arrayDeque = new ArrayDeque();
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
            build.processElement(new StreamRecord("Ciao-0-0", 0L), 0, 1);
            arrayDeque.add(new StreamRecord("Ciao-0-0", 0L));
            build.processElement(new StreamRecord(11, 0L), 1, 1);
            build.processElement(new StreamRecord(Double.valueOf(1.0d), 0L), 2, 0);
            arrayDeque.add(new StreamRecord("11", 0L));
            arrayDeque.add(new StreamRecord("1.0", 0L));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 0);
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 1);
            arrayDeque.add(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testOvertakingCheckpointBarriers() throws Exception {
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO, 2).addInput(BasicTypeInfo.INT_TYPE_INFO, 2).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory(3)).build();
        Throwable th = null;
        try {
            ArrayDeque arrayDeque = new ArrayDeque();
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
            build.processElement(new StreamRecord("Witam-0-1", 0L), 0, 1);
            build.processElement(new StreamRecord(42, 0L), 1, 1);
            build.processElement(new StreamRecord(Double.valueOf(1.0d), 0L), 2, 1);
            arrayDeque.add(new StreamRecord("Witam-0-1", 0L));
            arrayDeque.add(new StreamRecord("42", 0L));
            arrayDeque.add(new StreamRecord("1.0", 0L));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            build.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
            build.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
            build.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
            build.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 0);
            build.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 1);
            build.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
            arrayDeque.add(new CancelCheckpointMarker(0L));
            arrayDeque.add(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 0);
            build.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 1);
            build.waitForTaskCompletion();
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testMetrics() throws Exception {
        final HashMap hashMap = new HashMap();
        TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() { // from class: org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskTest.1
            public InternalOperatorMetricGroup getOrAddOperator(OperatorID operatorID, String str) {
                InternalOperatorMetricGroup orAddOperator = super.getOrAddOperator(operatorID, str);
                hashMap.put(str, orAddOperator);
                return orAddOperator;
            }
        };
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(applyObjectReuse(this.objectReuse)).addInput(BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory(new LifeCycleTrackingMockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).setupOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory(3)).name("MainOperator").chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish()).setTaskMetricGroup(taskMetricGroup).build();
        Throwable th = null;
        try {
            try {
                Assert.assertTrue(hashMap.containsKey("MainOperator"));
                OperatorMetricGroup operatorMetricGroup = (OperatorMetricGroup) hashMap.get("MainOperator");
                Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
                Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
                for (int i = 0; i < 3; i++) {
                    addSourceRecords(build, 1, 42);
                }
                for (int i2 = 0; i2 < 5; i2++) {
                    build.processElement(new StreamRecord("hello"), 0, 0);
                }
                for (int i3 = 0; i3 < 2; i3++) {
                    build.processElement(new StreamRecord("hello"), 1, 0);
                }
                int i4 = 5 + 2;
                Assert.assertEquals(i4 + 3, operatorMetricGroup.getIOMetricGroup().getNumRecordsInCounter().getCount());
                Assert.assertEquals(i4, numRecordsInCounter.getCount());
                Assert.assertEquals(r0 * 2 * 2 * 2, numRecordsOutCounter.getCount());
                build.waitForTaskCompletion();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testLifeCycleOrder() throws Exception {
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(applyObjectReuse(this.objectReuse)).addInput(BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory(new LifeCycleTrackingMockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO).setupOperatorChain((StreamOperatorFactory<?>) new LifeCycleTrackingMapToStringMultipleInputOperatorFactory()).chain(new LifeCycleTrackingMap(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish()).build();
        Throwable th = null;
        try {
            build.waitForTaskCompletion();
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    build.close();
                }
            }
            MatcherAssert.assertThat(LIFE_CYCLE_EVENTS, Matchers.contains(new String[]{LifeCycleTrackingMap.OPEN, LifeCycleTrackingMapToStringMultipleInputOperator.OPEN, LifeCycleTrackingMockSourceReader.START, LifeCycleTrackingMapToStringMultipleInputOperator.END_INPUT, LifeCycleTrackingMapToStringMultipleInputOperator.END_INPUT, LifeCycleTrackingMapToStringMultipleInputOperator.END_INPUT, LifeCycleTrackingMapToStringMultipleInputOperator.FINISH, LifeCycleTrackingMap.END_INPUT, LifeCycleTrackingMap.CLOSE, LifeCycleTrackingMapToStringMultipleInputOperator.CLOSE, LifeCycleTrackingMockSourceReader.CLOSE}));
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testInputFairness() throws Exception {
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory(3)).build();
        Throwable th = null;
        try {
            ArrayDeque arrayDeque = new ArrayDeque();
            build.setAutoProcess(false);
            build.processElement(new StreamRecord("0"), 0);
            build.processElement(new StreamRecord("1"), 0);
            build.processElement(new StreamRecord("2"), 0);
            build.processElement(new StreamRecord("3"), 0);
            build.processElement(new StreamRecord("0"), 2);
            build.processElement(new StreamRecord("1"), 2);
            build.processAll();
            arrayDeque.add(new StreamRecord("0"));
            arrayDeque.add(new StreamRecord("0"));
            arrayDeque.add(new StreamRecord("1"));
            arrayDeque.add(new StreamRecord("1"));
            arrayDeque.add(new StreamRecord("2"));
            arrayDeque.add(new StreamRecord("3"));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testWatermark() throws Exception {
        StreamTaskMailboxTestHarness<String> buildWatermarkTestHarness = buildWatermarkTestHarness(2, false);
        Throwable th = null;
        try {
            ArrayDeque arrayDeque = new ArrayDeque();
            buildWatermarkTestHarness.processElement(new org.apache.flink.streaming.api.watermark.Watermark(0), 0, 0);
            buildWatermarkTestHarness.processElement(new org.apache.flink.streaming.api.watermark.Watermark(0), 0, 1);
            addSourceRecords(buildWatermarkTestHarness, 1, 0);
            arrayDeque.add(new StreamRecord("0", Long.MIN_VALUE));
            buildWatermarkTestHarness.processElement(new org.apache.flink.streaming.api.watermark.Watermark(0), 1, 0);
            MatcherAssert.assertThat(buildWatermarkTestHarness.getOutput(), Matchers.contains(arrayDeque.toArray()));
            buildWatermarkTestHarness.processElement(new org.apache.flink.streaming.api.watermark.Watermark(0), 1, 1);
            arrayDeque.add(new org.apache.flink.streaming.api.watermark.Watermark(0));
            MatcherAssert.assertThat(buildWatermarkTestHarness.getOutput(), Matchers.contains(arrayDeque.toArray()));
            buildWatermarkTestHarness.processElement(new StreamRecord("Hello", 0), 0, 0);
            buildWatermarkTestHarness.processElement(new StreamRecord(Double.valueOf(42.0d), 0), 1, 1);
            arrayDeque.add(new StreamRecord("Hello", 0));
            arrayDeque.add(new StreamRecord("42.0", 0));
            MatcherAssert.assertThat(buildWatermarkTestHarness.getOutput(), Matchers.contains(arrayDeque.toArray()));
            buildWatermarkTestHarness.processElement(new org.apache.flink.streaming.api.watermark.Watermark(0 + 4), 0, 0);
            buildWatermarkTestHarness.processElement(new org.apache.flink.streaming.api.watermark.Watermark(0 + 3), 0, 1);
            addSourceRecords(buildWatermarkTestHarness, 1, 0 + 3);
            arrayDeque.add(new StreamRecord("" + (0 + 3), Long.MIN_VALUE));
            buildWatermarkTestHarness.processElement(new org.apache.flink.streaming.api.watermark.Watermark(0 + 3), 1, 0);
            buildWatermarkTestHarness.processElement(new org.apache.flink.streaming.api.watermark.Watermark(0 + 2), 1, 1);
            arrayDeque.add(new org.apache.flink.streaming.api.watermark.Watermark(0 + 2));
            MatcherAssert.assertThat(buildWatermarkTestHarness.getOutput(), Matchers.contains(arrayDeque.toArray()));
            buildWatermarkTestHarness.processElement(new org.apache.flink.streaming.api.watermark.Watermark(0 + 4), 1, 1);
            arrayDeque.add(new org.apache.flink.streaming.api.watermark.Watermark(0 + 3));
            MatcherAssert.assertThat(buildWatermarkTestHarness.getOutput(), Matchers.contains(arrayDeque.toArray()));
            buildWatermarkTestHarness.processElement(new org.apache.flink.streaming.api.watermark.Watermark(0 + 4), 0, 1);
            addSourceRecords(buildWatermarkTestHarness, 1, 0 + 4);
            arrayDeque.add(new StreamRecord("" + (0 + 4), Long.MIN_VALUE));
            buildWatermarkTestHarness.processElement(new org.apache.flink.streaming.api.watermark.Watermark(0 + 4), 1, 0);
            arrayDeque.add(new org.apache.flink.streaming.api.watermark.Watermark(0 + 4));
            MatcherAssert.assertThat(buildWatermarkTestHarness.getOutput(), Matchers.contains(arrayDeque.toArray()));
            Assert.assertEquals(5L, TestHarnessUtil.getRawElementsFromOutput(buildWatermarkTestHarness.getOutput()).size());
            if (buildWatermarkTestHarness != null) {
                if (0 == 0) {
                    buildWatermarkTestHarness.close();
                    return;
                }
                try {
                    buildWatermarkTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (buildWatermarkTestHarness != null) {
                if (0 != 0) {
                    try {
                        buildWatermarkTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    buildWatermarkTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testWatermarkAndWatermarkStatusForwarding() throws Exception {
        StreamTaskMailboxTestHarness<String> buildWatermarkTestHarness = buildWatermarkTestHarness(2, true);
        Throwable th = null;
        try {
            try {
                ArrayDeque arrayDeque = new ArrayDeque();
                buildWatermarkTestHarness.processElement(WatermarkStatus.IDLE, 0, 1);
                buildWatermarkTestHarness.processElement(new org.apache.flink.streaming.api.watermark.Watermark(0 + 6), 0, 0);
                buildWatermarkTestHarness.processElement(new org.apache.flink.streaming.api.watermark.Watermark(0 + 5), 1, 1);
                buildWatermarkTestHarness.processElement(WatermarkStatus.IDLE, 1, 0);
                arrayDeque.add(new org.apache.flink.streaming.api.watermark.Watermark(0 + 5));
                MatcherAssert.assertThat(buildWatermarkTestHarness.getOutput(), Matchers.contains(arrayDeque.toArray()));
                buildWatermarkTestHarness.processElement(WatermarkStatus.IDLE, 1, 1);
                arrayDeque.add(new org.apache.flink.streaming.api.watermark.Watermark(0 + 6));
                MatcherAssert.assertThat(buildWatermarkTestHarness.getOutput(), Matchers.contains(arrayDeque.toArray()));
                buildWatermarkTestHarness.processElement(WatermarkStatus.IDLE, 0, 0);
                arrayDeque.add(WatermarkStatus.IDLE);
                MatcherAssert.assertThat(buildWatermarkTestHarness.getOutput(), Matchers.contains(arrayDeque.toArray()));
                addSourceRecords(buildWatermarkTestHarness, 1, 0 + 10);
                arrayDeque.add(WatermarkStatus.ACTIVE);
                arrayDeque.add(new StreamRecord("" + (0 + 10), Long.MIN_VALUE));
                arrayDeque.add(new org.apache.flink.streaming.api.watermark.Watermark(0 + 10));
                arrayDeque.add(WatermarkStatus.IDLE);
                buildWatermarkTestHarness.processAll();
                MatcherAssert.assertThat(buildWatermarkTestHarness.getOutput(), Matchers.contains(arrayDeque.toArray()));
                buildWatermarkTestHarness.processElement(WatermarkStatus.ACTIVE, 0, 1);
                arrayDeque.add(WatermarkStatus.ACTIVE);
                MatcherAssert.assertThat(buildWatermarkTestHarness.getOutput(), Matchers.contains(arrayDeque.toArray()));
                if (buildWatermarkTestHarness != null) {
                    if (0 == 0) {
                        buildWatermarkTestHarness.close();
                        return;
                    }
                    try {
                        buildWatermarkTestHarness.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (buildWatermarkTestHarness != null) {
                if (th != null) {
                    try {
                        buildWatermarkTestHarness.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    buildWatermarkTestHarness.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testAdvanceToEndOfEventTime() throws Exception {
        StreamTaskMailboxTestHarness<String> buildWatermarkTestHarness = buildWatermarkTestHarness(2, false);
        Throwable th = null;
        try {
            buildWatermarkTestHarness.processElement(org.apache.flink.streaming.api.watermark.Watermark.MAX_WATERMARK, 0, 0);
            buildWatermarkTestHarness.processElement(org.apache.flink.streaming.api.watermark.Watermark.MAX_WATERMARK, 0, 1);
            buildWatermarkTestHarness.getStreamTask().advanceToEndOfEventTime();
            buildWatermarkTestHarness.processElement(org.apache.flink.streaming.api.watermark.Watermark.MAX_WATERMARK, 1, 0);
            MatcherAssert.assertThat(buildWatermarkTestHarness.getOutput(), CoreMatchers.not(Matchers.contains(new Object[]{org.apache.flink.streaming.api.watermark.Watermark.MAX_WATERMARK})));
            buildWatermarkTestHarness.processElement(org.apache.flink.streaming.api.watermark.Watermark.MAX_WATERMARK, 1, 1);
            MatcherAssert.assertThat(buildWatermarkTestHarness.getOutput(), Matchers.contains(new Object[]{org.apache.flink.streaming.api.watermark.Watermark.MAX_WATERMARK}));
            if (buildWatermarkTestHarness != null) {
                if (0 == 0) {
                    buildWatermarkTestHarness.close();
                    return;
                }
                try {
                    buildWatermarkTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (buildWatermarkTestHarness != null) {
                if (0 != 0) {
                    try {
                        buildWatermarkTestHarness.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    buildWatermarkTestHarness.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testWatermarkMetrics() throws Exception {
        final OperatorID operatorID = new OperatorID();
        final OperatorID operatorID2 = new OperatorID();
        final InterceptingOperatorMetricGroup interceptingOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        final InterceptingOperatorMetricGroup interceptingOperatorMetricGroup2 = new InterceptingOperatorMetricGroup();
        TaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() { // from class: org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskTest.2
            public InternalOperatorMetricGroup getOrAddOperator(OperatorID operatorID3, String str) {
                return operatorID3.equals(operatorID) ? interceptingOperatorMetricGroup : operatorID3.equals(operatorID2) ? interceptingOperatorMetricGroup2 : super.getOrAddOperator(operatorID3, str);
            }
        };
        StreamTaskMailboxTestHarness<String> build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(applyObjectReuse(this.objectReuse)).addInput(BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory(new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 2, true, false), WatermarkStrategy.forGenerator(context -> {
            return new RecordToWatermarkGenerator();
        })), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO).setupOperatorChain(operatorID, (StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory(3)).chain(operatorID2, new OneInputStreamTaskTest.WatermarkMetricOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish()).setTaskMetricGroup(taskMetricGroup).build();
        Throwable th = null;
        try {
            Gauge gauge = taskMetricGroup.get("currentInputWatermark");
            Gauge gauge2 = interceptingOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(1));
            Gauge gauge3 = interceptingOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(2));
            Gauge gauge4 = interceptingOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(3));
            Gauge gauge5 = interceptingOperatorMetricGroup.get("currentInputWatermark");
            Gauge gauge6 = interceptingOperatorMetricGroup.get("currentOutputWatermark");
            Gauge gauge7 = interceptingOperatorMetricGroup2.get("currentInputWatermark");
            Gauge gauge8 = interceptingOperatorMetricGroup2.get("currentOutputWatermark");
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge5.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge2.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge3.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge4.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge6.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge7.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge8.getValue()).longValue());
            build.processElement(new org.apache.flink.streaming.api.watermark.Watermark(1L), 0);
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge5.getValue()).longValue());
            Assert.assertEquals(1L, ((Long) gauge2.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge3.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge4.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge6.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge7.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge8.getValue()).longValue());
            addSourceRecords(build, 1, 2);
            build.processAll();
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge5.getValue()).longValue());
            Assert.assertEquals(1L, ((Long) gauge2.getValue()).longValue());
            Assert.assertEquals(2L, ((Long) gauge3.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge4.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge6.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge7.getValue()).longValue());
            Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge8.getValue()).longValue());
            build.processElement(new org.apache.flink.streaming.api.watermark.Watermark(2L), 1);
            Assert.assertEquals(1L, ((Long) gauge.getValue()).longValue());
            Assert.assertEquals(1L, ((Long) gauge5.getValue()).longValue());
            Assert.assertEquals(1L, ((Long) gauge2.getValue()).longValue());
            Assert.assertEquals(2L, ((Long) gauge3.getValue()).longValue());
            Assert.assertEquals(2L, ((Long) gauge4.getValue()).longValue());
            Assert.assertEquals(1L, ((Long) gauge6.getValue()).longValue());
            Assert.assertEquals(1L, ((Long) gauge7.getValue()).longValue());
            Assert.assertEquals(2L, ((Long) gauge8.getValue()).longValue());
            build.processElement(new org.apache.flink.streaming.api.watermark.Watermark(4L), 0);
            addSourceRecords(build, 1, 3);
            build.processAll();
            Assert.assertEquals(2L, ((Long) gauge.getValue()).longValue());
            Assert.assertEquals(2L, ((Long) gauge5.getValue()).longValue());
            Assert.assertEquals(4L, ((Long) gauge2.getValue()).longValue());
            Assert.assertEquals(3L, ((Long) gauge3.getValue()).longValue());
            Assert.assertEquals(2L, ((Long) gauge4.getValue()).longValue());
            Assert.assertEquals(2L, ((Long) gauge6.getValue()).longValue());
            Assert.assertEquals(2L, ((Long) gauge7.getValue()).longValue());
            Assert.assertEquals(4L, ((Long) gauge8.getValue()).longValue());
            finishAddingRecords(build, 1);
            build.endInput();
            build.waitForTaskCompletion();
            build.finishProcessing();
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCheckpointBarrierMetrics() throws Exception {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO, 2).addInput(BasicTypeInfo.INT_TYPE_INFO, 2).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory(3)).setTaskMetricGroup(StreamTaskTestHarness.createTaskMetricGroup(concurrentHashMap)).build();
        Throwable th = null;
        try {
            try {
                MatcherAssert.assertThat(concurrentHashMap, IsMapContaining.hasKey("checkpointAlignmentTime"));
                MatcherAssert.assertThat(concurrentHashMap, IsMapContaining.hasKey("checkpointStartDelayNanos"));
                build.endInput();
                build.waitForTaskCompletion();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testLatencyMarker() throws Exception {
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory(3)).setTaskMetricGroup(StreamTaskTestHarness.createTaskMetricGroup(new ConcurrentHashMap())).build();
        Throwable th = null;
        try {
            try {
                ArrayDeque arrayDeque = new ArrayDeque();
                LatencyMarker latencyMarker = new LatencyMarker(42L, new OperatorID(), 0);
                build.processElement(latencyMarker);
                arrayDeque.add(latencyMarker);
                MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
                build.endInput();
                build.waitForTaskCompletion();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTriggeringAlignedNoTimeoutCheckpointWithFinishedChannels() throws Exception {
        testTriggeringCheckpointWithFinishedChannels(CheckpointOptions.alignedNoTimeout(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()));
    }

    @Test
    public void testTriggeringUnalignedCheckpointWithFinishedChannels() throws Exception {
        testTriggeringCheckpointWithFinishedChannels(CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault()));
    }

    @Test
    public void testTriggeringAlignedWithTimeoutCheckpointWithFinishedChannels() throws Exception {
        testTriggeringCheckpointWithFinishedChannels(CheckpointOptions.alignedWithTimeout(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault(), 10L));
    }

    private void testTriggeringCheckpointWithFinishedChannels(CheckpointOptions checkpointOptions) throws Exception {
        ResultPartition[] resultPartitionArr = new ResultPartition[2];
        for (int i = 0; i < resultPartitionArr.length; i++) {
            try {
                resultPartitionArr[i] = PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
                resultPartitionArr[i].setup();
            } finally {
                for (ResultPartition resultPartition : resultPartitionArr) {
                    if (resultPartition != null) {
                        resultPartition.close();
                    }
                }
            }
        }
        CompletingCheckpointResponder completingCheckpointResponder = new CompletingCheckpointResponder();
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO).addAdditionalOutput(resultPartitionArr).setCheckpointResponder(completingCheckpointResponder).modifyStreamConfig(streamConfig -> {
            streamConfig.setCheckpointingEnabled(true);
            streamConfig.setUnalignedCheckpointsEnabled(checkpointOptions.isUnalignedCheckpoint() || checkpointOptions.isTimeoutable());
        }).setupOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory(3)).finishForSingletonOperatorChain(StringSerializer.INSTANCE)).build();
        Throwable th = null;
        try {
            try {
                StreamTask<OUT, ?> streamTask = build.streamTask;
                streamTask.getClass();
                Consumer<Long> consumer = (v1) -> {
                    r1.notifyCheckpointCompleteAsync(v1);
                };
                StreamTask<OUT, ?> streamTask2 = build.streamTask;
                streamTask2.getClass();
                completingCheckpointResponder.setHandlers(consumer, (v1, v2) -> {
                    r2.notifyCheckpointAbortAsync(v1, v2);
                });
                build.getStreamTask().getCheckpointBarrierHandler().get();
                StreamTaskFinalCheckpointsTest.processMailTillCheckpointSucceeds(build, StreamTaskFinalCheckpointsTest.triggerCheckpoint(build, 2L, checkpointOptions));
                Assert.assertEquals(2L, build.getTaskStateManager().getReportedCheckpointId());
                build.processEvent(new EndOfData(StopMode.DRAIN), 0, 0);
                build.processEvent(EndOfPartitionEvent.INSTANCE, 0, 0);
                StreamTaskFinalCheckpointsTest.processMailTillCheckpointSucceeds(build, StreamTaskFinalCheckpointsTest.triggerCheckpoint(build, 4L, checkpointOptions));
                Assert.assertEquals(4L, build.getTaskStateManager().getReportedCheckpointId());
                build.processEvent(new EndOfData(StopMode.DRAIN), 1, 0);
                build.processEvent(new EndOfData(StopMode.DRAIN), 2, 0);
                build.processEvent(EndOfPartitionEvent.INSTANCE, 1, 0);
                build.processEvent(EndOfPartitionEvent.INSTANCE, 2, 0);
                CompletableFuture<Boolean> triggerCheckpoint = StreamTaskFinalCheckpointsTest.triggerCheckpoint(build, 6L, checkpointOptions);
                triggerCheckpoint.thenAccept(bool -> {
                    for (ResultPartition resultPartition2 : resultPartitionArr) {
                        resultPartition2.onSubpartitionAllDataProcessed(0);
                    }
                });
                build.processAll();
                build.finishProcessing();
                Assert.assertTrue(triggerCheckpoint.isDone());
                build.getTaskStateManager().getWaitForReportLatch().await();
                Assert.assertEquals(6L, build.getTaskStateManager().getReportedCheckpointId());
                for (ResultPartition resultPartition2 : resultPartitionArr) {
                    Assert.assertEquals(4L, resultPartition2.getNumberOfQueuedBuffers());
                }
                if (build != null) {
                    if (0 != 0) {
                        try {
                            build.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        build.close();
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testSkipExecutionsIfFinishedOnRestore() throws Exception {
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).setCollectNetworkEvents().modifyStreamConfig(streamConfig -> {
            streamConfig.setCheckpointingEnabled(true);
        }).modifyExecutionConfig(applyObjectReuse(this.objectReuse)).addInput(BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).setTaskStateSnapshot(1L, TaskStateSnapshot.FINISHED_ON_RESTORE).setupOperatorChain(new OperatorID(), (StreamOperatorFactory<?>) new MultipleInputStreamTaskChainedSourcesCheckpointingTest.LifeCycleMonitorMultipleInputOperatorFactory()).chain((OneInputStreamOperator) new TestFinishedOnRestoreStreamOperator(), (TypeSerializer) StringSerializer.INSTANCE).finish()).build();
        Throwable th = null;
        try {
            try {
                build.processElement(org.apache.flink.streaming.api.watermark.Watermark.MAX_WATERMARK, 0);
                build.processElement(org.apache.flink.streaming.api.watermark.Watermark.MAX_WATERMARK, 1);
                build.processElement(org.apache.flink.streaming.api.watermark.Watermark.MAX_WATERMARK, 2);
                build.waitForTaskCompletion();
                MatcherAssert.assertThat(build.getOutput(), Matchers.contains(new Object[]{org.apache.flink.streaming.api.watermark.Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)}));
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTriggeringStopWithSavepointWithDrain() throws Exception {
        new SourceOperatorFactory(new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 2), WatermarkStrategy.noWatermarks());
        final CompletableFuture completableFuture = new CompletableFuture();
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).setCollectNetworkEvents().modifyStreamConfig(streamConfig -> {
            streamConfig.setCheckpointingEnabled(true);
        }).modifyExecutionConfig(applyObjectReuse(this.objectReuse)).addInput(BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).setTaskStateSnapshot(1L, TaskStateSnapshot.FINISHED_ON_RESTORE).setupOperatorChain((StreamOperatorFactory<?>) new MultipleInputStreamTaskChainedSourcesCheckpointingTest.LifeCycleMonitorMultipleInputOperatorFactory()).finishForSingletonOperatorChain(StringSerializer.INSTANCE)).setCheckpointResponder(new TestCheckpointResponder() { // from class: org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskTest.3
            public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics, TaskStateSnapshot taskStateSnapshot) {
                super.acknowledgeCheckpoint(jobID, executionAttemptID, j, checkpointMetrics, taskStateSnapshot);
                completableFuture.complete(null);
            }
        }).build();
        Throwable th = null;
        try {
            try {
                CompletableFuture triggerCheckpointAsync = build.streamTask.triggerCheckpointAsync(new CheckpointMetaData(2L, 2L), CheckpointOptions.alignedNoTimeout(SavepointType.terminate(SavepointFormatType.CANONICAL), CheckpointStorageLocationReference.getDefault()));
                completableFuture.whenComplete((bool, th2) -> {
                    build.streamTask.notifyCheckpointCompleteAsync(2L);
                });
                build.waitForTaskCompletion();
                build.finishProcessing();
                Assert.assertTrue(triggerCheckpointAsync.isDone());
                Assert.assertTrue(((Boolean) triggerCheckpointAsync.get()).booleanValue());
                Assert.assertTrue(completableFuture.isDone());
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    build.close();
                }
            }
            throw th5;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Consumer<ExecutionConfig> applyObjectReuse(boolean z) {
        return executionConfig -> {
            if (z) {
                executionConfig.enableObjectReuse();
            } else {
                executionConfig.disableObjectReuse();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static StreamTaskMailboxTestHarness<String> buildTestHarness(boolean z) throws Exception {
        return buildTestHarness(false, z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static StreamTaskMailboxTestHarness<String> buildTestHarness(boolean z, boolean z2) throws Exception {
        return new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(applyObjectReuse(z2)).modifyStreamConfig(streamConfig -> {
            streamConfig.setUnalignedCheckpointsEnabled(z);
        }).modifyStreamConfig(streamConfig2 -> {
            streamConfig2.setAlignedCheckpointTimeout(Duration.ZERO);
        }).addInput(BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory(new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory(3)).build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void addSourceRecords(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness, int i, int... iArr) throws Exception {
        addSourceRecords(streamTaskMailboxTestHarness, i, Boundedness.BOUNDED, iArr);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void addSourceRecords(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness, int i, Boundedness boundedness, int... iArr) throws Exception {
        OperatorID sourceOperatorID = getSourceOperatorID(streamTaskMailboxTestHarness, i);
        MockSourceSplit mockSourceSplit = new MockSourceSplit(0, 0, boundedness == Boundedness.BOUNDED ? iArr.length : Integer.MAX_VALUE);
        for (int i2 : iArr) {
            mockSourceSplit.addRecord(i2);
        }
        streamTaskMailboxTestHarness.getStreamTask().dispatchOperatorEvent(sourceOperatorID, new SerializedValue(new AddSplitEvent(Collections.singletonList(mockSourceSplit), new MockSourceSplitSerializer())));
    }

    private StreamTaskMailboxTestHarness<String> buildWatermarkTestHarness(int i, boolean z) throws Exception {
        return new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(applyObjectReuse(this.objectReuse)).addInput(BasicTypeInfo.STRING_TYPE_INFO, i).addSourceInput(new SourceOperatorFactory(new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 2, true, z), WatermarkStrategy.forGenerator(context -> {
            return new RecordToWatermarkGenerator();
        })), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, i).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new MapToStringMultipleInputOperatorFactory(3)).build();
    }

    private static OperatorID getSourceOperatorID(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness, int i) {
        return streamTaskMailboxTestHarness.getStreamTask().operatorChain.getSourceTaskInput(streamTaskMailboxTestHarness.getStreamTask().getConfiguration().getInputs(streamTaskMailboxTestHarness.getClass().getClassLoader())[i]).getOperatorID();
    }

    private void finishAddingRecords(StreamTaskMailboxTestHarness<String> streamTaskMailboxTestHarness, int i) throws Exception {
        streamTaskMailboxTestHarness.getStreamTask().dispatchOperatorEvent(getSourceOperatorID(streamTaskMailboxTestHarness, i), new SerializedValue(new NoMoreSplitsEvent()));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 323248067:
                if (implMethodName.equals("lambda$buildWatermarkTestHarness$33957e3f$1")) {
                    z = false;
                    break;
                }
                break;
            case 780786721:
                if (implMethodName.equals("lambda$testWatermarkMetrics$3cd870cd$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWatermarkGenerator") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;")) {
                    return context -> {
                        return new RecordToWatermarkGenerator();
                    };
                }
                break;
            case SourceOperatorTestContext.SUBTASK_INDEX /* 1 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWatermarkGenerator") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;")) {
                    return context2 -> {
                        return new RecordToWatermarkGenerator();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
