package org.apache.flink.streaming.api.operators.async;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
import org.apache.flink.streaming.util.retryable.RetryPredicates;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;

@Timeout(value = 100, unit = TimeUnit.SECONDS)
/* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.class */
class AsyncWaitOperatorTest {
    private static final long TIMEOUT = 1000;

    @RegisterExtension
    private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
    private static AsyncRetryStrategy emptyResultFixedDelayRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(2, 10).ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE).build();
    private static AsyncRetryStrategy exceptionRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(2, 10).ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE).build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$AlwaysTimeoutWithDefaultValueAsyncFunction.class */
    public static class AlwaysTimeoutWithDefaultValueAsyncFunction extends RichAsyncFunction<Integer, Integer> {
        private static final long serialVersionUID = 1;
        private static Map<Integer, Integer> tryCounts = new HashMap();

        private AlwaysTimeoutWithDefaultValueAsyncFunction() {
        }

        @VisibleForTesting
        public int getTryCount(Integer num) {
            return tryCounts.getOrDefault(num, 0).intValue();
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            tryCounts = new HashMap();
        }

        public void asyncInvoke(Integer num, ResultFuture<Integer> resultFuture) {
            tryCounts.merge(num, 1, (v0, v1) -> {
                return Integer.sum(v0, v1);
            });
            CompletableFuture.runAsync(() -> {
                try {
                    Thread.sleep(501L);
                    resultFuture.completeExceptionally(new Exception("Dummy error"));
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        }

        public void timeout(Integer num, ResultFuture<Integer> resultFuture) {
            resultFuture.complete(Collections.singletonList(-1));
        }

        public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) throws Exception {
            asyncInvoke((Integer) obj, (ResultFuture<Integer>) resultFuture);
        }

        public /* bridge */ /* synthetic */ void timeout(Object obj, ResultFuture resultFuture) throws Exception {
            timeout((Integer) obj, (ResultFuture<Integer>) resultFuture);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$CollectableFuturesAsyncFunction.class */
    public static class CollectableFuturesAsyncFunction<IN> implements AsyncFunction<IN, IN> {
        private static final long serialVersionUID = -4214078239227288637L;
        private final SharedReference<List<ResultFuture<?>>> resultFutures;

        private CollectableFuturesAsyncFunction(SharedReference<List<ResultFuture<?>>> sharedReference) {
            this.resultFutures = sharedReference;
        }

        public void asyncInvoke(IN in, ResultFuture<IN> resultFuture) throws Exception {
            ((List) this.resultFutures.get()).add(resultFuture);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$ControllableAsyncFunction.class */
    private static class ControllableAsyncFunction<IN> implements AsyncFunction<IN, IN> {
        private static final long serialVersionUID = -4214078239267288636L;
        private transient CompletableFuture<Void> trigger;

        private ControllableAsyncFunction(CompletableFuture<Void> completableFuture) {
            this.trigger = (CompletableFuture) Preconditions.checkNotNull(completableFuture);
        }

        public void asyncInvoke(IN in, ResultFuture<IN> resultFuture) throws Exception {
            this.trigger.thenAccept(r5 -> {
                resultFuture.complete(Collections.singleton(in));
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$IgnoreTimeoutLazyAsyncFunction.class */
    private static class IgnoreTimeoutLazyAsyncFunction extends LazyAsyncFunction {
        private static final long serialVersionUID = 1428714561365346128L;

        private IgnoreTimeoutLazyAsyncFunction() {
        }

        public void timeout(Integer num, ResultFuture<Integer> resultFuture) throws Exception {
            resultFuture.complete(Collections.singletonList(Integer.valueOf(num.intValue() * 3)));
        }

        public /* bridge */ /* synthetic */ void timeout(Object obj, ResultFuture resultFuture) throws Exception {
            timeout((Integer) obj, (ResultFuture<Integer>) resultFuture);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$IllWrittenOddInputEmptyResultAsyncFunction.class */
    private static class IllWrittenOddInputEmptyResultAsyncFunction extends MyAbstractAsyncFunction<Integer> {
        private static final long serialVersionUID = 1;

        private IllWrittenOddInputEmptyResultAsyncFunction() {
        }

        public void asyncInvoke(final Integer num, final ResultFuture<Integer> resultFuture) throws Exception {
            executorService.submit(new Runnable() { // from class: org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.IllWrittenOddInputEmptyResultAsyncFunction.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Thread.sleep(3L);
                        if (num.intValue() % 2 != 1) {
                            resultFuture.complete(Collections.singletonList(Integer.valueOf(num.intValue() * 2)));
                            return;
                        }
                        for (int i = 0; i < 10; i++) {
                            resultFuture.complete(Collections.EMPTY_LIST);
                        }
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }

        public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) throws Exception {
            asyncInvoke((Integer) obj, (ResultFuture<Integer>) resultFuture);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$InputReusedAsyncFunction.class */
    private static class InputReusedAsyncFunction extends MyAbstractAsyncFunction<Tuple1<Integer>> {
        private static final long serialVersionUID = 8627909616410487720L;

        private InputReusedAsyncFunction() {
        }

        public void asyncInvoke(final Tuple1<Integer> tuple1, final ResultFuture<Integer> resultFuture) throws Exception {
            executorService.submit(new Runnable() { // from class: org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.InputReusedAsyncFunction.1
                @Override // java.lang.Runnable
                public void run() {
                    resultFuture.complete(Collections.singletonList(Integer.valueOf(((Integer) tuple1.f0).intValue() * 2)));
                }
            });
        }

        public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) throws Exception {
            asyncInvoke((Tuple1<Integer>) obj, (ResultFuture<Integer>) resultFuture);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$LazyAsyncFunction.class */
    public static class LazyAsyncFunction extends MyAsyncFunction {
        private static final long serialVersionUID = 3537791752703154670L;
        private static CountDownLatch latch;

        public LazyAsyncFunction() {
            latch = new CountDownLatch(1);
        }

        @Override // org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.MyAsyncFunction
        public void asyncInvoke(final Integer num, final ResultFuture<Integer> resultFuture) throws Exception {
            executorService.submit(new Runnable() { // from class: org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.LazyAsyncFunction.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        LazyAsyncFunction.latch.await();
                    } catch (InterruptedException e) {
                    }
                    resultFuture.complete(Collections.singletonList(num));
                }
            });
        }

        public static void countDown() {
            latch.countDown();
        }

        @Override // org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.MyAsyncFunction
        public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) throws Exception {
            asyncInvoke((Integer) obj, (ResultFuture<Integer>) resultFuture);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$MyAbstractAsyncFunction.class */
    public static abstract class MyAbstractAsyncFunction<IN> extends RichAsyncFunction<IN, Integer> {
        private static final long serialVersionUID = 8522411971886428444L;
        private static final long TERMINATION_TIMEOUT = 5000;
        private static final int THREAD_POOL_SIZE = 10;
        static ExecutorService executorService;
        static int counter = 0;

        private MyAbstractAsyncFunction() {
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            synchronized (MyAbstractAsyncFunction.class) {
                if (counter == 0) {
                    executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
                }
                counter++;
            }
        }

        public void close() throws Exception {
            super.close();
            freeExecutor();
        }

        private void freeExecutor() {
            synchronized (MyAbstractAsyncFunction.class) {
                counter--;
                if (counter == 0) {
                    executorService.shutdown();
                    try {
                        if (!executorService.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.MILLISECONDS)) {
                            executorService.shutdownNow();
                        }
                    } catch (InterruptedException e) {
                        executorService.shutdownNow();
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$MyAsyncFunction.class */
    public static class MyAsyncFunction extends MyAbstractAsyncFunction<Integer> {
        private static final long serialVersionUID = -1504699677704123889L;

        private MyAsyncFunction() {
        }

        public void asyncInvoke(final Integer num, final ResultFuture<Integer> resultFuture) throws Exception {
            executorService.submit(new Runnable() { // from class: org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.MyAsyncFunction.1
                @Override // java.lang.Runnable
                public void run() {
                    resultFuture.complete(Collections.singletonList(Integer.valueOf(num.intValue() * 2)));
                }
            });
        }

        public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) throws Exception {
            asyncInvoke((Integer) obj, (ResultFuture<Integer>) resultFuture);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$NoOpAsyncFunction.class */
    public static class NoOpAsyncFunction<IN, OUT> implements AsyncFunction<IN, OUT> {
        private static final long serialVersionUID = -3060481953330480694L;

        private NoOpAsyncFunction() {
        }

        public void asyncInvoke(IN in, ResultFuture<OUT> resultFuture) throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$OddInputEmptyResultAsyncFunction.class */
    private static class OddInputEmptyResultAsyncFunction extends MyAbstractAsyncFunction<Integer> {
        private static final long serialVersionUID = 1;

        private OddInputEmptyResultAsyncFunction() {
        }

        public void asyncInvoke(final Integer num, final ResultFuture<Integer> resultFuture) throws Exception {
            executorService.submit(new Runnable() { // from class: org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.OddInputEmptyResultAsyncFunction.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Thread.sleep(3L);
                        if (num.intValue() % 2 == 1) {
                            resultFuture.complete(Collections.EMPTY_LIST);
                        } else {
                            resultFuture.complete(Collections.singletonList(Integer.valueOf(num.intValue() * 2)));
                        }
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }

        public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) throws Exception {
            asyncInvoke((Integer) obj, (ResultFuture<Integer>) resultFuture);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$StreamRecordComparator.class */
    public class StreamRecordComparator implements Comparator<Object> {
        private StreamRecordComparator() {
        }

        @Override // java.util.Comparator
        public int compare(Object obj, Object obj2) {
            if ((obj instanceof Watermark) || (obj2 instanceof Watermark)) {
                return 0;
            }
            StreamRecord streamRecord = (StreamRecord) obj;
            StreamRecord streamRecord2 = (StreamRecord) obj2;
            if (streamRecord.getTimestamp() != streamRecord2.getTimestamp()) {
                return (int) (streamRecord.getTimestamp() - streamRecord2.getTimestamp());
            }
            int compareTo = ((Integer) streamRecord.getValue()).compareTo((Integer) streamRecord2.getValue());
            return compareTo != 0 ? compareTo : ((Integer) streamRecord.getValue()).intValue() - ((Integer) streamRecord2.getValue()).intValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$TimeoutAfterCompletionTestFunction.class */
    private static class TimeoutAfterCompletionTestFunction implements AsyncFunction<Integer, Integer> {
        static final AtomicBoolean TIMED_OUT = new AtomicBoolean(false);
        static final CountDownLatch COMPLETION_TRIGGER = new CountDownLatch(1);

        private TimeoutAfterCompletionTestFunction() {
        }

        public void asyncInvoke(Integer num, ResultFuture<Integer> resultFuture) {
            ForkJoinPool.commonPool().submit(() -> {
                COMPLETION_TRIGGER.await();
                resultFuture.complete(Collections.singletonList(num));
                return null;
            });
        }

        public void timeout(Integer num, ResultFuture<Integer> resultFuture) {
            TIMED_OUT.set(true);
        }

        public /* bridge */ /* synthetic */ void timeout(Object obj, ResultFuture resultFuture) throws Exception {
            timeout((Integer) obj, (ResultFuture<Integer>) resultFuture);
        }

        public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) throws Exception {
            asyncInvoke((Integer) obj, (ResultFuture<Integer>) resultFuture);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest$UserExceptionAsyncFunction.class */
    public static class UserExceptionAsyncFunction implements AsyncFunction<Integer, Integer> {
        private static final long serialVersionUID = 6326568632967110990L;

        private UserExceptionAsyncFunction() {
        }

        public void asyncInvoke(Integer num, ResultFuture<Integer> resultFuture) throws Exception {
            resultFuture.completeExceptionally(new Exception("Test exception"));
        }

        public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) throws Exception {
            asyncInvoke((Integer) obj, (ResultFuture<Integer>) resultFuture);
        }
    }

    AsyncWaitOperatorTest() {
    }

    @Test
    void testEventTimeOrdered() throws Exception {
        testEventTime(AsyncDataStream.OutputMode.ORDERED);
    }

    @Test
    void testWaterMarkUnordered() throws Exception {
        testEventTime(AsyncDataStream.OutputMode.UNORDERED);
    }

    private void testEventTime(AsyncDataStream.OutputMode outputMode) throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new MyAsyncFunction(), TIMEOUT, 2, outputMode);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        synchronized (createTestHarness.getCheckpointLock()) {
            createTestHarness.processElement(new StreamRecord(1, 1L));
            createTestHarness.processElement(new StreamRecord(2, 2L));
            createTestHarness.processWatermark(new Watermark(2L));
            createTestHarness.processElement(new StreamRecord(3, 3L));
        }
        synchronized (createTestHarness.getCheckpointLock()) {
            createTestHarness.endInput();
            createTestHarness.close();
        }
        concurrentLinkedQueue.add(new StreamRecord(2, 1L));
        concurrentLinkedQueue.add(new StreamRecord(4, 2L));
        concurrentLinkedQueue.add(new Watermark(2L));
        concurrentLinkedQueue.add(new StreamRecord(6, 3L));
        if (AsyncDataStream.OutputMode.ORDERED == outputMode) {
            TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
            return;
        }
        Object[] array = createTestHarness.getOutput().toArray();
        Assertions.assertThat(array[2]).as("Watermark should be at index 2", new Object[0]).isEqualTo(new Watermark(2L));
        Assertions.assertThat(array[3]).as("StreamRecord 3 should be at the end", new Object[0]).isEqualTo(new StreamRecord(6, 3L));
        TestHarnessUtil.assertOutputEqualsSorted("Output for StreamRecords does not match", concurrentLinkedQueue, createTestHarness.getOutput(), new StreamRecordComparator());
    }

    @Test
    void testProcessingTimeOrdered() throws Exception {
        testProcessingTime(AsyncDataStream.OutputMode.ORDERED);
    }

    @Test
    void testProcessingUnordered() throws Exception {
        testProcessingTime(AsyncDataStream.OutputMode.UNORDERED);
    }

    private void testProcessingTime(AsyncDataStream.OutputMode outputMode) throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new MyAsyncFunction(), TIMEOUT, 6, outputMode);
        ArrayDeque arrayDeque = new ArrayDeque();
        createTestHarness.open();
        synchronized (createTestHarness.getCheckpointLock()) {
            createTestHarness.processElement(new StreamRecord(1, 1L));
            createTestHarness.processElement(new StreamRecord(2, 2L));
            createTestHarness.processElement(new StreamRecord(3, 3L));
            createTestHarness.processElement(new StreamRecord(4, 4L));
            createTestHarness.processElement(new StreamRecord(5, 5L));
            createTestHarness.processElement(new StreamRecord(6, 6L));
            createTestHarness.processElement(new StreamRecord(7, 7L));
            createTestHarness.processElement(new StreamRecord(8, 8L));
        }
        arrayDeque.add(new StreamRecord(2, 1L));
        arrayDeque.add(new StreamRecord(4, 2L));
        arrayDeque.add(new StreamRecord(6, 3L));
        arrayDeque.add(new StreamRecord(8, 4L));
        arrayDeque.add(new StreamRecord(10, 5L));
        arrayDeque.add(new StreamRecord(12, 6L));
        arrayDeque.add(new StreamRecord(14, 7L));
        arrayDeque.add(new StreamRecord(16, 8L));
        synchronized (createTestHarness.getCheckpointLock()) {
            createTestHarness.endInput();
            createTestHarness.close();
        }
        if (outputMode == AsyncDataStream.OutputMode.ORDERED) {
            TestHarnessUtil.assertOutputEquals("ORDERED Output was not correct.", arrayDeque, createTestHarness.getOutput());
        } else {
            TestHarnessUtil.assertOutputEqualsSorted("UNORDERED Output was not correct.", arrayDeque, createTestHarness.getOutput(), new StreamRecordComparator());
        }
    }

    @Test
    void testOperatorChainWithProcessingTime() throws Exception {
        JobVertex createChainedVertex = createChainedVertex(new MyAsyncFunction(), new MyAsyncFunction());
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        oneInputStreamTaskTestHarness.taskConfig = createChainedVertex.getConfiguration();
        oneInputStreamTaskTestHarness.getStreamConfig().setStreamOperatorFactory(new StreamConfig(createChainedVertex.getConfiguration()).getStreamOperatorFactory(AsyncWaitOperatorTest.class.getClassLoader()));
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(5, 0L));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(6, 0 + 1));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(7, 0 + 2));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(8, 0 + 3));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(9, 0 + 4));
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        LinkedList linkedList = new LinkedList();
        linkedList.add(new StreamRecord(22, 0L));
        linkedList.add(new StreamRecord(26, 0 + 1));
        linkedList.add(new StreamRecord(30, 0 + 2));
        linkedList.add(new StreamRecord(34, 0 + 3));
        linkedList.add(new StreamRecord(38, 0 + 4));
        TestHarnessUtil.assertOutputEqualsSorted("Test for chained operator with AsyncWaitOperator failed", linkedList, oneInputStreamTaskTestHarness.getOutput(), new StreamRecordComparator());
    }

    private JobVertex createChainedVertex(AsyncFunction<Integer, Integer> asyncFunction, AsyncFunction<Integer, Integer> asyncFunction2) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(2);
        AsyncDataStream.unorderedWait(AsyncDataStream.orderedWait(executionEnvironment.fromData(new Integer[]{1, 2, 3}), asyncFunction, TIMEOUT, TimeUnit.MILLISECONDS, 6).map(new RichMapFunction<Integer, Integer>() { // from class: org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.1
            private static final long serialVersionUID = 1;
            private Integer initialValue = null;

            public void open(OpenContext openContext) throws Exception {
                this.initialValue = 1;
            }

            public Integer map(Integer num) throws Exception {
                return Integer.valueOf(this.initialValue.intValue() + num.intValue());
            }
        }), asyncFunction2, TIMEOUT, TimeUnit.MILLISECONDS, 3).map(new MapFunction<Integer, Integer>() { // from class: org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest.2
            private static final long serialVersionUID = 5162085254238405527L;

            public Integer map(Integer num) throws Exception {
                return num;
            }
        }).startNewChain().sinkTo(new DiscardingSink());
        JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
        Assertions.assertThat(jobGraph.getVerticesSortedTopologicallyFromSources()).hasSize(3);
        return (JobVertex) jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
    }

    @Test
    void testStateSnapshotAndRestore() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        AsyncWaitOperatorFactory asyncWaitOperatorFactory = new AsyncWaitOperatorFactory(new LazyAsyncFunction(), TIMEOUT, 4, AsyncDataStream.OutputMode.ORDERED);
        StreamConfig streamConfig = oneInputStreamTaskTestHarness.getStreamConfig();
        OperatorID operatorID = new OperatorID(42L, 4711L);
        streamConfig.setStreamOperatorFactory(asyncWaitOperatorFactory);
        streamConfig.setOperatorID(operatorID);
        TestTaskStateManager taskStateManager = oneInputStreamTaskTestHarness.getTaskStateManager();
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        OneInputStreamTask mo52getTask = oneInputStreamTaskTestHarness.mo52getTask();
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(1, 1L));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(2, 2L));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(3, 3L));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(4, 4L));
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        mo52getTask.triggerCheckpointAsync(new CheckpointMetaData(1L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation());
        taskStateManager.getWaitForReportLatch().await();
        Assertions.assertThat(taskStateManager.getReportedCheckpointId()).isEqualTo(1L);
        LazyAsyncFunction.countDown();
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        TaskStateSnapshot lastJobManagerTaskStateSnapshot = taskStateManager.getLastJobManagerTaskStateSnapshot();
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness2 = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        oneInputStreamTaskTestHarness2.setTaskStateSnapshot(1L, lastJobManagerTaskStateSnapshot);
        oneInputStreamTaskTestHarness2.setupOutputForSingletonOperatorChain();
        oneInputStreamTaskTestHarness2.getStreamConfig().setStreamOperatorFactory(new AsyncWaitOperatorFactory(new MyAsyncFunction(), TIMEOUT, 6, AsyncDataStream.OutputMode.ORDERED));
        oneInputStreamTaskTestHarness2.getStreamConfig().setOperatorID(operatorID);
        oneInputStreamTaskTestHarness2.invoke();
        oneInputStreamTaskTestHarness2.waitForTaskRunning();
        OneInputStreamTask mo52getTask2 = oneInputStreamTaskTestHarness2.mo52getTask();
        oneInputStreamTaskTestHarness2.processElement(new StreamRecord(5, 5L));
        oneInputStreamTaskTestHarness2.processElement(new StreamRecord(6, 6L));
        oneInputStreamTaskTestHarness2.processElement(new StreamRecord(7, 7L));
        mo52getTask2.triggerCheckpointAsync(new CheckpointMetaData(1L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation()).get();
        oneInputStreamTaskTestHarness2.processElement(new StreamRecord(8, 8L));
        oneInputStreamTaskTestHarness2.endInput();
        oneInputStreamTaskTestHarness2.waitForTaskCompletion();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(2, 1L));
        concurrentLinkedQueue.add(new StreamRecord(4, 2L));
        concurrentLinkedQueue.add(new StreamRecord(6, 3L));
        concurrentLinkedQueue.add(new StreamRecord(8, 4L));
        concurrentLinkedQueue.add(new StreamRecord(10, 5L));
        concurrentLinkedQueue.add(new StreamRecord(12, 6L));
        concurrentLinkedQueue.add(new StreamRecord(14, 7L));
        concurrentLinkedQueue.add(new StreamRecord(16, 8L));
        oneInputStreamTaskTestHarness2.getOutput().removeIf(obj -> {
            return obj instanceof CheckpointBarrier;
        });
        TestHarnessUtil.assertOutputEquals("StateAndRestored Test Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness2.getOutput());
    }

    @Test
    void testObjectReused() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new AsyncWaitOperatorFactory(new InputReusedAsyncFunction(), TIMEOUT, 4, AsyncDataStream.OutputMode.ORDERED), new TupleSerializer(Tuple1.class, new TypeSerializer[]{IntSerializer.INSTANCE}));
        oneInputStreamOperatorTestHarness.getExecutionConfig().enableObjectReuse();
        Tuple1 tuple1 = new Tuple1();
        StreamRecord streamRecord = new StreamRecord(tuple1, -1L);
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.open();
        synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
            tuple1.setFields(1);
            streamRecord.setTimestamp(1L);
            oneInputStreamOperatorTestHarness.processElement(streamRecord);
            tuple1.setFields(2);
            streamRecord.setTimestamp(2L);
            oneInputStreamOperatorTestHarness.processElement(streamRecord);
            tuple1.setFields(3);
            streamRecord.setTimestamp(3L);
            oneInputStreamOperatorTestHarness.processElement(streamRecord);
            tuple1.setFields(4);
            streamRecord.setTimestamp(4L);
            oneInputStreamOperatorTestHarness.processElement(streamRecord);
        }
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(2, 1L));
        concurrentLinkedQueue.add(new StreamRecord(4, 2L));
        concurrentLinkedQueue.add(new StreamRecord(6, 3L));
        concurrentLinkedQueue.add(new StreamRecord(8, 4L));
        synchronized (oneInputStreamOperatorTestHarness.getCheckpointLock()) {
            oneInputStreamOperatorTestHarness.endInput();
            oneInputStreamOperatorTestHarness.close();
        }
        TestHarnessUtil.assertOutputEquals("StateAndRestoredWithObjectReuse Test Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput());
    }

    @Test
    void testAsyncTimeoutFailure() throws Exception {
        testAsyncTimeout(new LazyAsyncFunction(), Optional.of(TimeoutException.class), new StreamRecord<>(2, 5L));
    }

    @Test
    void testAsyncTimeoutIgnore() throws Exception {
        testAsyncTimeout(new IgnoreTimeoutLazyAsyncFunction(), Optional.empty(), new StreamRecord<>(3, 0L), new StreamRecord<>(2, 5L));
    }

    private void testAsyncTimeout(LazyAsyncFunction lazyAsyncFunction, Optional<Class<? extends Throwable>> optional, StreamRecord<Integer>... streamRecordArr) throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(lazyAsyncFunction, 10L, 2, AsyncDataStream.OutputMode.ORDERED);
        MockEnvironment environment = createTestHarness.getEnvironment();
        environment.setExpectedExternalFailureCause(Throwable.class);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.open();
        createTestHarness.setProcessingTime(0L);
        synchronized (createTestHarness.getCheckpointLock()) {
            createTestHarness.processElement(new StreamRecord(1, 0L));
            createTestHarness.setProcessingTime(5L);
            createTestHarness.processElement(new StreamRecord(2, 5L));
        }
        createTestHarness.setProcessingTime(11L);
        LazyAsyncFunction.countDown();
        synchronized (createTestHarness.getCheckpointLock()) {
            createTestHarness.endInput();
            createTestHarness.close();
        }
        concurrentLinkedQueue.addAll(Arrays.asList(streamRecordArr));
        TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        if (optional.isPresent()) {
            Assertions.assertThat(environment.getActualExternalFailureCause()).isPresent();
            Assertions.assertThat(ExceptionUtils.findThrowable((Throwable) environment.getActualExternalFailureCause().get(), optional.get())).isPresent();
        }
    }

    @Test
    void testTimeoutCleanup() throws Exception {
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new MyAsyncFunction(), TIMEOUT, 1, AsyncDataStream.OutputMode.UNORDERED);
        createTestHarness.open();
        synchronized (createTestHarness.getCheckpointLock()) {
            createTestHarness.processElement(42, 1L);
        }
        synchronized (createTestHarness.getCheckpointLock()) {
            createTestHarness.endInput();
            createTestHarness.close();
        }
        Assertions.assertThat(createTestHarness.getOutput()).containsOnly(new Object[]{new StreamRecord(84, 1L)});
        Assertions.assertThat(createTestHarness.getProcessingTimeService().getNumActiveTimers()).isZero();
    }

    @Test
    void testTimeoutAfterComplete() throws Exception {
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new AsyncWaitOperatorFactory(new TimeoutAfterCompletionTestFunction(), TIMEOUT, 1, AsyncDataStream.OutputMode.UNORDERED)).build();
        try {
            build.processElement(new StreamRecord(1));
            ScheduledFuture registerTimer = build.getTimerService().registerTimer(build.getTimerService().getCurrentProcessingTime() + TIMEOUT, j -> {
            });
            TimeoutAfterCompletionTestFunction.COMPLETION_TRIGGER.countDown();
            registerTimer.get();
            build.processAll();
            Assertions.assertThat(build.getOutput()).containsOnly(new Object[]{new StreamRecord(1)});
            Assertions.assertThat(TimeoutAfterCompletionTestFunction.TIMED_OUT).isFalse();
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testOrderedWaitUserExceptionHandling() throws Exception {
        testUserExceptionHandling(AsyncDataStream.OutputMode.ORDERED, AsyncRetryStrategies.NO_RETRY_STRATEGY);
    }

    @Test
    void testOrderedWaitUserExceptionHandlingWithRetry() throws Exception {
        testUserExceptionHandling(AsyncDataStream.OutputMode.ORDERED, exceptionRetryStrategy);
    }

    @Test
    void testUnorderedWaitUserExceptionHandling() throws Exception {
        testUserExceptionHandling(AsyncDataStream.OutputMode.UNORDERED, AsyncRetryStrategies.NO_RETRY_STRATEGY);
    }

    @Test
    void testUnorderedWaitUserExceptionHandlingWithRetry() throws Exception {
        testUserExceptionHandling(AsyncDataStream.OutputMode.UNORDERED, exceptionRetryStrategy);
    }

    private void testUserExceptionHandling(AsyncDataStream.OutputMode outputMode, AsyncRetryStrategy asyncRetryStrategy) throws Exception {
        OneInputStreamOperatorTestHarness createTestHarnessWithRetry = createTestHarnessWithRetry(new UserExceptionAsyncFunction(), TIMEOUT, 2, outputMode, asyncRetryStrategy);
        createTestHarnessWithRetry.getEnvironment().setExpectedExternalFailureCause(Throwable.class);
        createTestHarnessWithRetry.open();
        synchronized (createTestHarnessWithRetry.getCheckpointLock()) {
            createTestHarnessWithRetry.processElement(1, 1L);
        }
        synchronized (createTestHarnessWithRetry.getCheckpointLock()) {
            createTestHarnessWithRetry.endInput();
            createTestHarnessWithRetry.close();
        }
        Assertions.assertThat(createTestHarnessWithRetry.getEnvironment().getActualExternalFailureCause()).isPresent();
    }

    @Test
    void testOrderedWaitTimeoutHandling() throws Exception {
        testTimeoutExceptionHandling(AsyncDataStream.OutputMode.ORDERED, AsyncRetryStrategies.NO_RETRY_STRATEGY);
    }

    @Test
    void testOrderedWaitTimeoutHandlingWithRetry() throws Exception {
        testTimeoutExceptionHandling(AsyncDataStream.OutputMode.ORDERED, emptyResultFixedDelayRetryStrategy);
    }

    @Test
    void testUnorderedWaitTimeoutHandling() throws Exception {
        testTimeoutExceptionHandling(AsyncDataStream.OutputMode.UNORDERED, AsyncRetryStrategies.NO_RETRY_STRATEGY);
    }

    @Test
    void testUnorderedWaitTimeoutHandlingWithRetry() throws Exception {
        testTimeoutExceptionHandling(AsyncDataStream.OutputMode.UNORDERED, emptyResultFixedDelayRetryStrategy);
    }

    private void testTimeoutExceptionHandling(AsyncDataStream.OutputMode outputMode, AsyncRetryStrategy asyncRetryStrategy) throws Exception {
        OneInputStreamOperatorTestHarness createTestHarnessWithRetry = createTestHarnessWithRetry(new NoOpAsyncFunction(), 10L, 2, outputMode, asyncRetryStrategy);
        createTestHarnessWithRetry.getEnvironment().setExpectedExternalFailureCause(Throwable.class);
        createTestHarnessWithRetry.open();
        synchronized (createTestHarnessWithRetry.getCheckpointLock()) {
            createTestHarnessWithRetry.processElement(1, 1L);
        }
        createTestHarnessWithRetry.setProcessingTime(10L);
        synchronized (createTestHarnessWithRetry.getCheckpointLock()) {
            createTestHarnessWithRetry.close();
        }
    }

    @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
    @Test
    void testRestartWithFullQueue() throws Exception {
        OperatorSubtaskState snapshot;
        CompletableFuture completableFuture = new CompletableFuture();
        OneInputStreamOperatorTestHarness createTestHarness = createTestHarness(new ControllableAsyncFunction(completableFuture), TIMEOUT, 10, AsyncDataStream.OutputMode.ORDERED);
        createTestHarness.open();
        ArrayList arrayList = new ArrayList(10);
        try {
            synchronized (createTestHarness.getCheckpointLock()) {
                for (int i = 0; i < 10; i++) {
                    createTestHarness.processElement(Integer.valueOf(i), 0L);
                    arrayList.add(Integer.valueOf(i));
                }
            }
            synchronized (createTestHarness.getCheckpointLock()) {
                snapshot = createTestHarness.snapshot(0L, 0L);
            }
            completableFuture.complete(null);
            synchronized (createTestHarness.getCheckpointLock()) {
                createTestHarness.close();
            }
            OneInputStreamOperatorTestHarness createTestHarness2 = createTestHarness(new ControllableAsyncFunction(CompletableFuture.completedFuture(null)), TIMEOUT, 10, AsyncDataStream.OutputMode.ORDERED);
            createTestHarness2.initializeState(snapshot);
            synchronized (createTestHarness2.getCheckpointLock()) {
                createTestHarness2.open();
            }
            synchronized (createTestHarness2.getCheckpointLock()) {
                createTestHarness2.endInput();
                createTestHarness2.close();
            }
            Assertions.assertThat((List) createTestHarness2.getOutput().stream().map(obj -> {
                return (Integer) ((StreamRecord) obj).getValue();
            }).collect(Collectors.toList())).isEqualTo(arrayList);
        } catch (Throwable th) {
            synchronized (createTestHarness.getCheckpointLock()) {
                createTestHarness.close();
                throw th;
            }
        }
    }

    @Test
    void testIgnoreAsyncOperatorRecordsOnDrain() throws Exception {
        testIgnoreAsyncOperatorRecordsOnDrain(AsyncRetryStrategies.NO_RETRY_STRATEGY);
    }

    @Test
    void testIgnoreAsyncOperatorRecordsOnDrainWithRetry() throws Exception {
        testIgnoreAsyncOperatorRecordsOnDrain(emptyResultFixedDelayRetryStrategy);
    }

    private void testIgnoreAsyncOperatorRecordsOnDrain(AsyncRetryStrategy asyncRetryStrategy) throws Exception {
        StreamTaskMailboxTestHarnessBuilder addInput = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO);
        SharedReference add = this.sharedObjects.add(new ArrayList());
        StreamTaskMailboxTestHarness build = addInput.setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new AsyncWaitOperatorFactory(new CollectableFuturesAsyncFunction(add), TIMEOUT, 5, AsyncDataStream.OutputMode.ORDERED, asyncRetryStrategy)).build();
        try {
            build.processElement(new StreamRecord(1));
            build.processElement(new StreamRecord(2));
            Iterator it = Lists.reverse((List) add.get()).iterator();
            while (it.hasNext()) {
                ((ResultFuture) it.next()).complete(Collections.emptyList());
            }
            build.finishProcessing();
            Assertions.assertThat(build.getOutput()).isEmpty();
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testProcessingTimeOrderedWithRetry() throws Exception {
        testProcessingTimeWithRetry(AsyncDataStream.OutputMode.ORDERED, new OddInputEmptyResultAsyncFunction());
    }

    @Test
    void testProcessingTimeUnorderedWithRetry() throws Exception {
        testProcessingTimeWithRetry(AsyncDataStream.OutputMode.UNORDERED, new OddInputEmptyResultAsyncFunction());
    }

    @Test
    void testProcessingTimeRepeatedCompleteUnorderedWithRetry() throws Exception {
        testProcessingTimeWithRetry(AsyncDataStream.OutputMode.UNORDERED, new IllWrittenOddInputEmptyResultAsyncFunction());
    }

    @Test
    void testProcessingTimeRepeatedCompleteOrderedWithRetry() throws Exception {
        testProcessingTimeWithRetry(AsyncDataStream.OutputMode.ORDERED, new IllWrittenOddInputEmptyResultAsyncFunction());
    }

    private void testProcessingTimeWithRetry(AsyncDataStream.OutputMode outputMode, RichAsyncFunction richAsyncFunction) throws Exception {
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new AsyncWaitOperatorFactory(richAsyncFunction, TIMEOUT, 6, outputMode, emptyResultFixedDelayRetryStrategy)).build();
        try {
            ArrayDeque arrayDeque = new ArrayDeque();
            build.processElement(new StreamRecord(1, 1L));
            build.processElement(new StreamRecord(2, 2L));
            build.processElement(new StreamRecord(3, 3L));
            build.processElement(new StreamRecord(4, 4L));
            build.processElement(new StreamRecord(5, 5L));
            build.processElement(new StreamRecord(6, 6L));
            arrayDeque.add(new StreamRecord(4, 2L));
            arrayDeque.add(new StreamRecord(8, 4L));
            arrayDeque.add(new StreamRecord(12, 6L));
            while (build.getOutput().size() < arrayDeque.size()) {
                build.processAll();
                Thread.sleep(100L);
            }
            if (outputMode == AsyncDataStream.OutputMode.ORDERED) {
                TestHarnessUtil.assertOutputEquals("ORDERED Output was not correct.", arrayDeque, build.getOutput());
            } else {
                TestHarnessUtil.assertOutputEqualsSorted("UNORDERED Output was not correct.", arrayDeque, build.getOutput(), new StreamRecordComparator());
            }
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testProcessingTimeWithTimeoutFunctionUnorderedWithRetry() throws Exception {
        testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.OutputMode.UNORDERED);
    }

    @Test
    void testProcessingTimeWithTimeoutFunctionOrderedWithRetry() throws Exception {
        testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.OutputMode.ORDERED);
    }

    private void testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.OutputMode outputMode) throws Exception {
        StreamTaskMailboxTestHarnessBuilder addInput = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO);
        AsyncRetryStrategies.FixedDelayRetryStrategy build = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(5, 100L).ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE).build();
        AlwaysTimeoutWithDefaultValueAsyncFunction alwaysTimeoutWithDefaultValueAsyncFunction = new AlwaysTimeoutWithDefaultValueAsyncFunction();
        StreamTaskMailboxTestHarness build2 = addInput.setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new AsyncWaitOperatorFactory(alwaysTimeoutWithDefaultValueAsyncFunction, TIMEOUT, 10, outputMode, build)).build();
        try {
            ArrayDeque arrayDeque = new ArrayDeque();
            build2.processElement(new StreamRecord(1, 1L));
            build2.processElement(new StreamRecord(2, 2L));
            arrayDeque.add(new StreamRecord(-1, 1L));
            arrayDeque.add(new StreamRecord(-1, 2L));
            while (build2.getOutput().size() < arrayDeque.size()) {
                build2.processAll();
                Thread.sleep(100L);
            }
            if (outputMode == AsyncDataStream.OutputMode.ORDERED) {
                TestHarnessUtil.assertOutputEquals("ORDERED Output was not correct.", arrayDeque, build2.getOutput());
            } else {
                TestHarnessUtil.assertOutputEqualsSorted("UNORDERED Output was not correct.", arrayDeque, build2.getOutput(), new StreamRecordComparator());
            }
            Assertions.assertThat(alwaysTimeoutWithDefaultValueAsyncFunction.getTryCount(1)).isLessThanOrEqualTo(2);
            Assertions.assertThat(alwaysTimeoutWithDefaultValueAsyncFunction.getTryCount(2)).isLessThanOrEqualTo(2);
            if (build2 != null) {
                build2.close();
            }
        } catch (Throwable th) {
            if (build2 != null) {
                try {
                    build2.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static <OUT> OneInputStreamOperatorTestHarness<Integer, OUT> createTestHarness(AsyncFunction<Integer, OUT> asyncFunction, long j, int i, AsyncDataStream.OutputMode outputMode) throws Exception {
        return new OneInputStreamOperatorTestHarness<>(new AsyncWaitOperatorFactory(asyncFunction, j, i, outputMode), IntSerializer.INSTANCE);
    }

    private static <OUT> OneInputStreamOperatorTestHarness<Integer, OUT> createTestHarnessWithRetry(AsyncFunction<Integer, OUT> asyncFunction, long j, int i, AsyncDataStream.OutputMode outputMode, AsyncRetryStrategy<OUT> asyncRetryStrategy) throws Exception {
        return new OneInputStreamOperatorTestHarness<>(new AsyncWaitOperatorFactory(asyncFunction, j, i, outputMode, asyncRetryStrategy), IntSerializer.INSTANCE);
    }
}
