/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing.operators;

import java.io.Serializable;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.SimpleAsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncRunnableStreamOperator;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendTestUtils;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.asyncprocessing.AsyncOneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class AbstractAsyncRunnableStreamOperatorTest {
    protected AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> createTestHarness(int maxParalelism, int numSubtasks, int subtaskIndex, TestOperator testOperator) throws Exception {
        AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness = AsyncOneInputStreamOperatorTestHarness.create(testOperator, maxParalelism, numSubtasks, subtaskIndex);
        testHarness.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend((StateBackend)new HashMapStateBackend()));
        return testHarness;
    }

    protected AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> createTestHarness(int maxParalelism, int numSubtasks, int subtaskIndex, ElementOrder elementOrder) throws Exception {
        TestOperator testOperator = new TestOperator(new TestKeySelector(), elementOrder);
        AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness = AsyncOneInputStreamOperatorTestHarness.create(testOperator, maxParalelism, numSubtasks, subtaskIndex);
        testHarness.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend((StateBackend)new HashMapStateBackend()));
        return testHarness;
    }

    @Test
    void testCreateAsyncExecutionController() throws Exception {
        try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            Assertions.assertThat(testHarness.getOperator()).isInstanceOf(AbstractAsyncRunnableStreamOperator.class);
            AsyncExecutionController aec = ((AbstractAsyncRunnableStreamOperator)testHarness.getOperator()).getAsyncExecutionController();
            Assertions.assertThat((Object)aec).isNotNull();
            Assertions.assertThat((int)((MailboxExecutorImpl)aec.getMailboxExecutor()).getPriority()).isGreaterThan(-1);
            Assertions.assertThat((Object)aec.getAsyncExecutor()).isNotNull();
        }
    }

    @Test
    void testRecordProcessorWithFirstRequestOrder() throws Exception {
        try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.FIRST_REQUEST_ORDER);){
            testHarness.open();
            TestOperator testOperator = (TestOperator)testHarness.getOperator();
            CompletableFuture<Void> future = testHarness.processElementInternal((StreamRecord<Tuple2<Integer, String>>)new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(1);
            testOperator.proceed();
            future.get();
            testHarness.drainAsyncRequests();
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
        }
    }

    @Test
    void testRecordProcessorWithRecordOrder() throws Exception {
        try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            TestOperator testOperator = (TestOperator)testHarness.getOperator();
            CompletableFuture<Void> future = testHarness.processElementInternal((StreamRecord<Tuple2<Integer, String>>)new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isGreaterThan(1);
            testOperator.proceed();
            future.get();
            testHarness.drainAsyncRequests();
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testAsyncProcessWithKey() throws Exception {
        TestOperatorWithAsyncProcessWithKey testOperator = new TestOperatorWithAsyncProcessWithKey(new TestKeySelector(), ElementOrder.RECORD_ORDER);
        testHarness.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend((StateBackend)new HashMapStateBackend()));
        try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness = AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0);){
            testHarness.open();
            CompletableFuture<Void> future = testHarness.processElementInternal((StreamRecord<Tuple2<Integer, String>>)new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(0);
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isGreaterThan(1);
            testOperator.proceed();
            future.get();
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(1);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testDirectAsyncProcess() throws Exception {
        TestOperatorWithDirectAsyncProcess testOperator = new TestOperatorWithDirectAsyncProcess(new TestKeySelector(), ElementOrder.RECORD_ORDER);
        testHarness.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend((StateBackend)new HashMapStateBackend()));
        try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness = AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0);){
            testHarness.open();
            CompletableFuture<Void> future = testHarness.processElementInternal((StreamRecord<Tuple2<Integer, String>>)new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
            testHarness.drainAsyncRequests();
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(1);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testManyAsyncProcessWithKey() throws Exception {
        int requests = (Integer)ExecutionOptions.ASYNC_STATE_TOTAL_BUFFER_SIZE.defaultValue() + 1;
        TestOperatorWithMultipleDirectAsyncProcess testOperator = new TestOperatorWithMultipleDirectAsyncProcess(new TestKeySelector(), ElementOrder.RECORD_ORDER, requests);
        testHarness.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend((StateBackend)new HashMapStateBackend()));
        try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness = AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0);){
            testHarness.open();
            testHarness.processElementInternal((StreamRecord<Tuple2<Integer, String>>)new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
            CompletableFuture<Void> future = testHarness.processElementInternal((StreamRecord<Tuple2<Integer, String>>)new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
            testHarness.drainAsyncRequests();
            future.get(10000L, TimeUnit.MILLISECONDS);
            testOperator.getLastProcessedFuture().get(10000L, TimeUnit.MILLISECONDS);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(requests * 2);
            Assertions.assertThat(testOperator.getProcessedOrders()).isEqualTo(testOperator.getExpectedProcessedOrders());
        }
    }

    @Test
    void testCheckpointDrain() throws Exception {
        try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            SimpleAsyncExecutionController asyncExecutionController = (SimpleAsyncExecutionController)((AbstractAsyncRunnableStreamOperator)testHarness.getOperator()).getAsyncExecutionController();
            ((AbstractAsyncRunnableStreamOperator)testHarness.getOperator()).setAsyncKeyedContextElement(new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")), (KeySelector)new TestKeySelector());
            ((AbstractAsyncRunnableStreamOperator)testHarness.getOperator()).asyncProcess(() -> null);
            ((AbstractAsyncRunnableStreamOperator)testHarness.getOperator()).postProcessElement();
            Assertions.assertThat((int)asyncExecutionController.getInFlightRecordNum()).isEqualTo(1);
            testHarness.drainAsyncRequests();
            Assertions.assertThat((int)asyncExecutionController.getInFlightRecordNum()).isEqualTo(0);
        }
    }

    @Test
    void testNonRecordProcess() throws Exception {
        try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            TestOperator testOperator = (TestOperator)testHarness.getOperator();
            testHarness.processElementInternal((StreamRecord<Tuple2<Integer, String>>)new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
            CompletableFuture<Void> future = testHarness.processLatencyMarkerInternal(new LatencyMarker(1234L, new OperatorID(), 0));
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isGreaterThan(1);
            Assertions.assertThat((int)testOperator.getLatencyProcessed()).isEqualTo(0);
            testOperator.proceed();
            future.get();
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            Assertions.assertThat((int)testOperator.getLatencyProcessed()).isEqualTo(1);
        }
    }

    @Test
    void testWatermark() throws Exception {
        TestOperatorWithAsyncProcessTimer testOperator = new TestOperatorWithAsyncProcessTimer(new TestKeySelector(), ElementOrder.RECORD_ORDER);
        try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, testOperator);){
            testHarness.open();
            ConcurrentLinkedQueue<Watermark> expectedOutput = new ConcurrentLinkedQueue<Watermark>();
            testHarness.processElementInternal((StreamRecord<Tuple2<Integer, String>>)new StreamRecord((Object)Tuple2.of((Object)1, (Object)"1")));
            testHarness.processElementInternal((StreamRecord<Tuple2<Integer, String>>)new StreamRecord((Object)Tuple2.of((Object)1, (Object)"3")));
            testHarness.processElementInternal((StreamRecord<Tuple2<Integer, String>>)new StreamRecord((Object)Tuple2.of((Object)1, (Object)"6")));
            testHarness.processElementInternal((StreamRecord<Tuple2<Integer, String>>)new StreamRecord((Object)Tuple2.of((Object)1, (Object)"9")));
            testHarness.processWatermark(10L);
            expectedOutput.add(new Watermark(10L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", expectedOutput, testHarness.getOutput());
        }
    }

    @Test
    void testWatermarkHooks() throws Exception {
        KeySelector & Serializable dummyKeySelector = (KeySelector & Serializable)l -> 0;
        WatermarkTestingOperator testOperator = new WatermarkTestingOperator(dummyKeySelector, dummyKeySelector);
        AtomicInteger counter = new AtomicInteger(0);
        testOperator.setPreProcessFunction((FunctionWithException<Watermark, Watermark, Exception>)((FunctionWithException)watermark -> {
            testOperator.asyncProcessWithKey(1L, () -> {
                Assertions.assertThat((Object)testOperator.getCurrentKey()).isEqualTo((Object)1L);
                testOperator.output(watermark.getTimestamp() + 1000L);
            });
            if (counter.incrementAndGet() % 2 == 0) {
                return null;
            }
            return new Watermark(watermark.getTimestamp() + 1L);
        }));
        testOperator.setPostProcessFunction((ThrowingConsumer<Watermark, Exception>)((ThrowingConsumer)watermark -> testOperator.output(watermark.getTimestamp() + 100L)));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        try (AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = AsyncKeyedTwoInputStreamOperatorTestHarness.create(testOperator, dummyKeySelector, dummyKeySelector, BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);){
            testHarness.setup();
            testHarness.open();
            testHarness.processElement1(1L, 1L);
            testHarness.processElement1(3L, 3L);
            testHarness.processElement1(4L, 4L);
            testHarness.processWatermark1(new Watermark(2L));
            testHarness.processWatermark2(new Watermark(2L));
            expectedOutput.add(new StreamRecord((Object)1002L));
            expectedOutput.add(new StreamRecord((Object)1L));
            expectedOutput.add(new StreamRecord((Object)3L));
            expectedOutput.add(new StreamRecord((Object)103L));
            expectedOutput.add(new Watermark(3L));
            testHarness.processWatermark1(new Watermark(4L));
            testHarness.processWatermark2(new Watermark(4L));
            expectedOutput.add(new StreamRecord((Object)1004L));
            testHarness.processWatermark1(new Watermark(5L));
            testHarness.processWatermark2(new Watermark(5L));
            expectedOutput.add(new StreamRecord((Object)1005L));
            expectedOutput.add(new StreamRecord((Object)4L));
            expectedOutput.add(new StreamRecord((Object)106L));
            expectedOutput.add(new Watermark(6L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", expectedOutput, testHarness.getOutput());
        }
    }

    @Test
    void testWatermarkStatus() throws Exception {
        try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            TestOperator testOperator = (TestOperator)testHarness.getOperator();
            ThrowingConsumer processor = RecordProcessorUtils.getRecordProcessor((Input)testOperator);
            testHarness.processElementInternal((StreamRecord<Tuple2<Integer, String>>)new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
            testHarness.processWatermarkInternal(new Watermark(205L));
            CompletableFuture<Void> future = testHarness.processWatermarkStatusInternal(WatermarkStatus.IDLE);
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isGreaterThan(1);
            Assertions.assertThat((int)testOperator.watermarkIndex).isEqualTo(-1);
            Assertions.assertThat((boolean)testOperator.watermarkStatus.isIdle()).isTrue();
            testOperator.proceed();
            future.get();
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            Assertions.assertThat((boolean)testOperator.watermarkStatus.isActive()).isFalse();
            Assertions.assertThat(testHarness.getOutput()).containsExactly(new Object[]{new Watermark(205L), WatermarkStatus.IDLE});
        }
    }

    @Test
    void testIdleWatermarkHandling() throws Exception {
        KeySelector & Serializable dummyKeySelector = (KeySelector & Serializable)l -> 0;
        WatermarkTestingOperator testOperator = new WatermarkTestingOperator(dummyKeySelector, dummyKeySelector);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        try (AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = AsyncKeyedTwoInputStreamOperatorTestHarness.create(testOperator, dummyKeySelector, dummyKeySelector, BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);){
            testHarness.setup();
            testHarness.open();
            testHarness.processElement1(1L, 1L);
            testHarness.processElement1(3L, 3L);
            testHarness.processElement1(4L, 4L);
            testHarness.processWatermark1(new Watermark(1L));
            Assertions.assertThat(testHarness.getOutput()).isEmpty();
            testHarness.processWatermarkStatus2(WatermarkStatus.IDLE);
            expectedOutput.add(new StreamRecord((Object)1L));
            expectedOutput.add(new Watermark(1L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", expectedOutput, testHarness.getOutput());
            testHarness.processWatermark1(new Watermark(3L));
            expectedOutput.add(new StreamRecord((Object)3L));
            expectedOutput.add(new Watermark(3L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", expectedOutput, testHarness.getOutput());
            testHarness.processWatermarkStatus2(WatermarkStatus.ACTIVE);
            testHarness.processWatermark1(new Watermark(4L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", expectedOutput, testHarness.getOutput());
        }
    }

    public static class TestKeySelector
    implements KeySelector<Tuple2<Integer, String>, Integer> {
        private static final long serialVersionUID = 1L;

        public Integer getKey(Tuple2<Integer, String> value) {
            return (Integer)value.f0;
        }
    }

    private static class WatermarkTestingOperator
    extends AbstractAsyncRunnableStreamOperator<Long>
    implements TwoInputStreamOperator<Long, Long, Long>,
    Triggerable<Integer, VoidNamespace> {
        private transient InternalTimerService<VoidNamespace> timerService;
        private FunctionWithException<Watermark, Watermark, Exception> preProcessFunction;
        private ThrowingConsumer<Watermark, Exception> postProcessFunction;

        public WatermarkTestingOperator(KeySelector<Long, ?> keySelector1, KeySelector<Long, ?> keySelector2) {
            super(keySelector1, keySelector2, Executors.newSingleThreadExecutor(), 1, 100L, 100);
        }

        public void setPreProcessFunction(FunctionWithException<Watermark, Watermark, Exception> preProcessFunction) {
            this.preProcessFunction = preProcessFunction;
        }

        public void setPostProcessFunction(ThrowingConsumer<Watermark, Exception> postProcessFunction) {
            this.postProcessFunction = postProcessFunction;
        }

        public void output(Long o) {
            this.output.collect((Object)new StreamRecord((Object)o));
        }

        public void open() throws Exception {
            super.open();
            this.timerService = this.getInternalTimerService("test-timers", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
        }

        public Watermark preProcessWatermark(Watermark watermark) throws Exception {
            return this.preProcessFunction == null ? watermark : (Watermark)this.preProcessFunction.apply((Object)watermark);
        }

        public Watermark postProcessWatermark(Watermark watermark) throws Exception {
            if (this.postProcessFunction != null) {
                this.postProcessFunction.accept((Object)watermark);
            }
            return watermark;
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            Assertions.assertThat((Object)this.getCurrentKey()).isEqualTo(timer.getKey());
            this.output.collect((Object)new StreamRecord((Object)timer.getTimestamp()));
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            Assertions.assertThat((Object)this.getCurrentKey()).isEqualTo(timer.getKey());
        }

        public void processElement1(StreamRecord<Long> element) throws Exception {
            this.timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, ((Long)element.getValue()).longValue());
        }

        public void processElement2(StreamRecord<Long> element) throws Exception {
            this.timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, ((Long)element.getValue()).longValue());
        }
    }

    private static class TestOperatorWithAsyncProcessTimer
    extends TestOperator {
        TestOperatorWithAsyncProcessTimer(KeySelector<Tuple2<Integer, String>, ?> keySelector, ElementOrder elementOrder) {
            super(keySelector, elementOrder);
        }

        @Override
        public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
            this.processed.incrementAndGet();
        }

        @Override
        public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            this.asyncProcessWithKey((Integer)timer.getKey(), () -> super.onEventTime(timer));
        }

        @Override
        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            this.asyncProcessWithKey((Integer)timer.getKey(), () -> super.onProcessingTime(timer));
        }
    }

    private static class TestOperatorWithMultipleDirectAsyncProcess
    extends TestOperator {
        private final int numAsyncProcesses;
        private final CompletableFuture<Void> lastProcessedFuture = new CompletableFuture();
        private final LinkedList<Integer> processedOrders = new LinkedList();
        private final LinkedList<Integer> expectedProcessedOrders = new LinkedList();

        TestOperatorWithMultipleDirectAsyncProcess(KeySelector<Tuple2<Integer, String>, ?> keySelector, ElementOrder elementOrder, int numAsyncProcesses) {
            super(keySelector, elementOrder);
            this.numAsyncProcesses = numAsyncProcesses;
        }

        @Override
        public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
            for (int i = 0; i < this.numAsyncProcesses; ++i) {
                int finalI = i;
                if (i < this.numAsyncProcesses - 1) {
                    this.asyncProcessWithKey((Integer)((Tuple2)element.getValue()).f0, () -> {
                        this.processed.incrementAndGet();
                        this.processedOrders.add(finalI);
                    });
                } else {
                    this.asyncProcessWithKey((Integer)((Tuple2)element.getValue()).f0, () -> {
                        this.processed.incrementAndGet();
                        this.processedOrders.add(finalI);
                        if (!this.lastProcessedFuture.isDone()) {
                            this.lastProcessedFuture.complete(null);
                        }
                    });
                }
                this.expectedProcessedOrders.add(finalI);
            }
        }

        CompletableFuture<Void> getLastProcessedFuture() {
            return this.lastProcessedFuture;
        }

        LinkedList<Integer> getProcessedOrders() {
            return this.processedOrders;
        }

        LinkedList<Integer> getExpectedProcessedOrders() {
            return this.expectedProcessedOrders;
        }
    }

    private static class TestOperatorWithDirectAsyncProcess
    extends TestOperator {
        TestOperatorWithDirectAsyncProcess(KeySelector<Tuple2<Integer, String>, ?> keySelector, ElementOrder elementOrder) {
            super(keySelector, elementOrder);
        }

        @Override
        public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
            this.asyncProcess(this.processed::decrementAndGet).thenAccept(e -> this.processed.addAndGet(2));
        }
    }

    private static class TestOperatorWithAsyncProcessWithKey
    extends TestOperator {
        TestOperatorWithAsyncProcessWithKey(KeySelector<Tuple2<Integer, String>, ?> keySelector, ElementOrder elementOrder) {
            super(keySelector, elementOrder);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
            this.asyncProcessWithKey((Integer)((Tuple2)element.getValue()).f0, () -> this.processed.incrementAndGet());
            Object object = this.objectToWait;
            synchronized (object) {
                this.objectToWait.wait();
            }
            this.processed.incrementAndGet();
        }
    }

    private static class TestOperator
    extends AbstractAsyncRunnableStreamOperator<String>
    implements OneInputStreamOperator<Tuple2<Integer, String>, String>,
    Triggerable<Integer, VoidNamespace> {
        private static final long serialVersionUID = 1L;
        private final ElementOrder elementOrder;
        final AtomicInteger processed = new AtomicInteger(0);
        final AtomicInteger latencyProcessed = new AtomicInteger(0);
        final Object objectToWait = new Object();
        private WatermarkStatus watermarkStatus = new WatermarkStatus(-1);
        private int watermarkIndex = -1;

        TestOperator(KeySelector<Tuple2<Integer, String>, ?> keySelector, ElementOrder elementOrder) {
            super(keySelector, null, Executors.newSingleThreadExecutor(), 1, 100L, 100);
            this.elementOrder = elementOrder;
        }

        public void open() throws Exception {
            super.open();
        }

        public ElementOrder getElementOrder() {
            return this.elementOrder;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
            this.processed.incrementAndGet();
            Object object = this.objectToWait;
            synchronized (object) {
                this.objectToWait.wait();
            }
            this.asyncProcess(() -> this.processed.decrementAndGet()).thenAccept(a -> this.processed.incrementAndGet());
        }

        public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            super.processLatencyMarker(latencyMarker);
            this.latencyProcessed.incrementAndGet();
        }

        protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index) throws Exception {
            super.processWatermarkStatus(watermarkStatus, index);
            this.watermarkStatus = watermarkStatus;
            this.watermarkIndex = index;
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            Assertions.assertThat((Object)this.getCurrentKey()).isEqualTo(timer.getKey());
            this.output.collect((Object)new StreamRecord((Object)("EventTimer-" + timer.getKey() + "-" + timer.getTimestamp())));
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            Assertions.assertThat((Object)this.getCurrentKey()).isEqualTo(timer.getKey());
            this.output.collect((Object)new StreamRecord((Object)("ProcessingTimer-" + timer.getKey() + "-" + timer.getTimestamp())));
        }

        public int getProcessed() {
            return this.processed.get();
        }

        public int getLatencyProcessed() {
            return this.latencyProcessed.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void proceed() {
            Object object = this.objectToWait;
            synchronized (object) {
                this.objectToWait.notify();
            }
        }
    }
}

