package org.apache.flink.runtime.asyncprocessing.operators;

import java.lang.invoke.SerializedLambda;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateBackendTestUtils;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.class */
public class AbstractAsyncStateStreamOperatorTest {

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest$TestKeySelector.class */
    public static class TestKeySelector implements KeySelector<Tuple2<Integer, String>, Integer> {
        private static final long serialVersionUID = 1;

        public Integer getKey(Tuple2<Integer, String> tuple2) {
            return (Integer) tuple2.f0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest$TestOperator.class */
    public static class TestOperator extends AbstractAsyncStateStreamOperator<String> implements OneInputStreamOperator<Tuple2<Integer, String>, String>, Triggerable<Integer, VoidNamespace> {
        private static final long serialVersionUID = 1;
        private final ElementOrder elementOrder;
        final AtomicInteger processed = new AtomicInteger(0);
        final AtomicInteger latencyProcessed = new AtomicInteger(0);
        final Object objectToWait = new Object();
        private WatermarkStatus watermarkStatus = new WatermarkStatus(-1);
        private int watermarkIndex = -1;
        InternalTimerService<VoidNamespace> timerService;

        TestOperator(ElementOrder elementOrder) {
            this.elementOrder = elementOrder;
        }

        public void open() throws Exception {
            super.open();
            this.timerService = getInternalTimerService("processing timer", VoidNamespaceSerializer.INSTANCE, this);
        }

        public ElementOrder getElementOrder() {
            return this.elementOrder;
        }

        public void processElement(StreamRecord<Tuple2<Integer, String>> streamRecord) throws Exception {
            this.processed.incrementAndGet();
            synchronized (this.objectToWait) {
                this.objectToWait.wait();
            }
            this.timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, ((Integer) ((Tuple2) streamRecord.getValue()).f0).intValue() + 100);
        }

        public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            super.processLatencyMarker(latencyMarker);
            this.latencyProcessed.incrementAndGet();
        }

        protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int i) throws Exception {
            super.processWatermarkStatus(watermarkStatus, i);
            this.watermarkStatus = watermarkStatus;
            this.watermarkIndex = i;
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            Assertions.assertThat(getCurrentKey()).isEqualTo(internalTimer.getKey());
            this.output.collect(new StreamRecord("EventTimer-" + internalTimer.getKey() + "-" + internalTimer.getTimestamp()));
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            Assertions.assertThat(getCurrentKey()).isEqualTo(internalTimer.getKey());
            this.output.collect(new StreamRecord("ProcessingTimer-" + internalTimer.getKey() + "-" + internalTimer.getTimestamp()));
        }

        public int getProcessed() {
            return this.processed.get();
        }

        public int getLatencyProcessed() {
            return this.latencyProcessed.get();
        }

        public void proceed() {
            synchronized (this.objectToWait) {
                this.objectToWait.notify();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest$TestOperatorWithAsyncProcessTimer.class */
    private static class TestOperatorWithAsyncProcessTimer extends TestOperator {
        TestOperatorWithAsyncProcessTimer(ElementOrder elementOrder) {
            super(elementOrder);
        }

        @Override // org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorTest.TestOperator
        public void processElement(StreamRecord<Tuple2<Integer, String>> streamRecord) throws Exception {
            this.processed.incrementAndGet();
            this.timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, Long.parseLong((String) ((Tuple2) streamRecord.getValue()).f1));
        }

        @Override // org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorTest.TestOperator
        public void onEventTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            asyncProcessWithKey((Integer) internalTimer.getKey(), () -> {
                super.onEventTime(internalTimer);
            });
        }

        @Override // org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorTest.TestOperator
        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            asyncProcessWithKey((Integer) internalTimer.getKey(), () -> {
                super.onProcessingTime(internalTimer);
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest$TestOperatorWithDirectAsyncProcess.class */
    private static class TestOperatorWithDirectAsyncProcess extends TestOperator {
        TestOperatorWithDirectAsyncProcess(ElementOrder elementOrder) {
            super(elementOrder);
        }

        @Override // org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorTest.TestOperator
        public void processElement(StreamRecord<Tuple2<Integer, String>> streamRecord) throws Exception {
            asyncProcessWithKey((Integer) ((Tuple2) streamRecord.getValue()).f0, () -> {
                this.processed.incrementAndGet();
            });
            synchronized (this.objectToWait) {
                this.objectToWait.wait();
            }
            this.processed.incrementAndGet();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest$TestOperatorWithMultipleDirectAsyncProcess.class */
    private static class TestOperatorWithMultipleDirectAsyncProcess extends TestOperator {
        private final int numAsyncProcesses;
        private final CompletableFuture<Void> lastProcessedFuture;
        private final LinkedList<Integer> processedOrders;
        private final LinkedList<Integer> expectedProcessedOrders;

        TestOperatorWithMultipleDirectAsyncProcess(ElementOrder elementOrder, int i) {
            super(elementOrder);
            this.lastProcessedFuture = new CompletableFuture<>();
            this.processedOrders = new LinkedList<>();
            this.expectedProcessedOrders = new LinkedList<>();
            this.numAsyncProcesses = i;
        }

        @Override // org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorTest.TestOperator
        public void processElement(StreamRecord<Tuple2<Integer, String>> streamRecord) throws Exception {
            for (int i = 0; i < this.numAsyncProcesses; i++) {
                int i2 = i;
                if (i < this.numAsyncProcesses - 1) {
                    asyncProcessWithKey((Integer) ((Tuple2) streamRecord.getValue()).f0, () -> {
                        this.processed.incrementAndGet();
                        this.processedOrders.add(Integer.valueOf(i2));
                    });
                } else {
                    asyncProcessWithKey((Integer) ((Tuple2) streamRecord.getValue()).f0, () -> {
                        this.processed.incrementAndGet();
                        this.processedOrders.add(Integer.valueOf(i2));
                        if (this.lastProcessedFuture.isDone()) {
                            return;
                        }
                        this.lastProcessedFuture.complete(null);
                    });
                }
                this.expectedProcessedOrders.add(Integer.valueOf(i2));
            }
        }

        CompletableFuture<Void> getLastProcessedFuture() {
            return this.lastProcessedFuture;
        }

        LinkedList<Integer> getProcessedOrders() {
            return this.processedOrders;
        }

        LinkedList<Integer> getExpectedProcessedOrders() {
            return this.expectedProcessedOrders;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest$WatermarkTestingOperator.class */
    public static class WatermarkTestingOperator extends AbstractAsyncStateStreamOperator<Long> implements TwoInputStreamOperator<Long, Long, Long>, Triggerable<Integer, VoidNamespace> {
        private transient InternalTimerService<VoidNamespace> timerService;
        private FunctionWithException<Watermark, Watermark, Exception> preProcessFunction;
        private ThrowingConsumer<Watermark, Exception> postProcessFunction;

        private WatermarkTestingOperator() {
        }

        public void setPreProcessFunction(FunctionWithException<Watermark, Watermark, Exception> functionWithException) {
            this.preProcessFunction = functionWithException;
        }

        public void setPostProcessFunction(ThrowingConsumer<Watermark, Exception> throwingConsumer) {
            this.postProcessFunction = throwingConsumer;
        }

        public void output(Long l) {
            this.output.collect(new StreamRecord(l));
        }

        public void open() throws Exception {
            super.open();
            this.timerService = getInternalTimerService("test-timers", VoidNamespaceSerializer.INSTANCE, this);
        }

        public Watermark preProcessWatermark(Watermark watermark) throws Exception {
            return this.preProcessFunction == null ? watermark : (Watermark) this.preProcessFunction.apply(watermark);
        }

        public void postProcessWatermark(Watermark watermark) throws Exception {
            if (this.postProcessFunction != null) {
                this.postProcessFunction.accept(watermark);
            }
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            Assertions.assertThat(getCurrentKey()).isEqualTo(internalTimer.getKey());
            this.output.collect(new StreamRecord(Long.valueOf(internalTimer.getTimestamp())));
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            Assertions.assertThat(getCurrentKey()).isEqualTo(internalTimer.getKey());
        }

        public void processElement1(StreamRecord<Long> streamRecord) throws Exception {
            this.timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, ((Long) streamRecord.getValue()).longValue());
        }

        public void processElement2(StreamRecord<Long> streamRecord) throws Exception {
            this.timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, ((Long) streamRecord.getValue()).longValue());
        }
    }

    protected AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness(int i, int i2, int i3, TestOperator testOperator) throws Exception {
        AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> create = AsyncKeyedOneInputStreamOperatorTestHarness.create(testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, i, i2, i3);
        create.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend(new HashMapStateBackend()));
        return create;
    }

    protected AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness(int i, int i2, int i3, ElementOrder elementOrder) throws Exception {
        AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> create = AsyncKeyedOneInputStreamOperatorTestHarness.create(new TestOperator(elementOrder), new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, i, i2, i3);
        create.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend(new HashMapStateBackend()));
        return create;
    }

    @Test
    void testCreateAsyncExecutionController() throws Exception {
        AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        try {
            createTestHarness.open();
            Assertions.assertThat(createTestHarness.getOperator()).isInstanceOf(AbstractAsyncStateStreamOperator.class);
            Assertions.assertThat(createTestHarness.getOperator().getAsyncExecutionController()).isNotNull();
            Assertions.assertThat(createTestHarness.getOperator().getAsyncExecutionController().getStateExecutor()).isNotNull();
            if (createTestHarness != null) {
                createTestHarness.close();
            }
        } catch (Throwable th) {
            if (createTestHarness != null) {
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testRecordProcessorWithFirstStateOrder() throws Exception {
        AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.FIRST_STATE_ORDER);
        try {
            createTestHarness.open();
            TestOperator operator = createTestHarness.getOperator();
            CompletableFuture<Void> processElementInternal = createTestHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5")));
            Thread.sleep(1000L);
            Assertions.assertThat(operator.getProcessed()).isEqualTo(1);
            Assertions.assertThat(operator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(1);
            operator.proceed();
            processElementInternal.get();
            Assertions.assertThat(operator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            if (createTestHarness != null) {
                createTestHarness.close();
            }
        } catch (Throwable th) {
            if (createTestHarness != null) {
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testRecordProcessorWithRecordOrder() throws Exception {
        AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        try {
            createTestHarness.open();
            TestOperator operator = createTestHarness.getOperator();
            CompletableFuture<Void> processElementInternal = createTestHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5")));
            Thread.sleep(1000L);
            Assertions.assertThat(operator.getProcessed()).isEqualTo(1);
            Assertions.assertThat(operator.getCurrentProcessingContext().getReferenceCount()).isGreaterThan(1);
            operator.proceed();
            processElementInternal.get();
            Assertions.assertThat(operator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            if (createTestHarness != null) {
                createTestHarness.close();
            }
        } catch (Throwable th) {
            if (createTestHarness != null) {
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testAsyncProcessWithKey() throws Exception {
        TestOperatorWithDirectAsyncProcess testOperatorWithDirectAsyncProcess = new TestOperatorWithDirectAsyncProcess(ElementOrder.RECORD_ORDER);
        AsyncKeyedOneInputStreamOperatorTestHarness create = AsyncKeyedOneInputStreamOperatorTestHarness.create(testOperatorWithDirectAsyncProcess, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, 128, 1, 0);
        create.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend(new HashMapStateBackend()));
        try {
            create.open();
            CompletableFuture<Void> processElementInternal = create.processElementInternal(new StreamRecord(Tuple2.of(5, "5")));
            Thread.sleep(1000L);
            Assertions.assertThat(testOperatorWithDirectAsyncProcess.getProcessed()).isEqualTo(0);
            Assertions.assertThat(testOperatorWithDirectAsyncProcess.getCurrentProcessingContext().getReferenceCount()).isGreaterThan(1);
            testOperatorWithDirectAsyncProcess.proceed();
            processElementInternal.get();
            Assertions.assertThat(testOperatorWithDirectAsyncProcess.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            Assertions.assertThat(testOperatorWithDirectAsyncProcess.getProcessed()).isEqualTo(1);
            create.close();
        } catch (Throwable th) {
            create.close();
            throw th;
        }
    }

    @Test
    void testManyAsyncProcessWithKey() throws Exception {
        int intValue = ((Integer) ExecutionOptions.ASYNC_INFLIGHT_RECORDS_LIMIT.defaultValue()).intValue() + 1;
        TestOperatorWithMultipleDirectAsyncProcess testOperatorWithMultipleDirectAsyncProcess = new TestOperatorWithMultipleDirectAsyncProcess(ElementOrder.RECORD_ORDER, intValue);
        AsyncKeyedOneInputStreamOperatorTestHarness create = AsyncKeyedOneInputStreamOperatorTestHarness.create(testOperatorWithMultipleDirectAsyncProcess, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, 128, 1, 0);
        create.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend(new HashMapStateBackend()));
        try {
            create.open();
            create.processElementInternal(new StreamRecord(Tuple2.of(5, "5")));
            CompletableFuture<Void> processElementInternal = create.processElementInternal(new StreamRecord(Tuple2.of(5, "5")));
            create.drainStateRequests();
            processElementInternal.get(10000L, TimeUnit.MILLISECONDS);
            testOperatorWithMultipleDirectAsyncProcess.getLastProcessedFuture().get(10000L, TimeUnit.MILLISECONDS);
            Assertions.assertThat(testOperatorWithMultipleDirectAsyncProcess.getProcessed()).isEqualTo(intValue * 2);
            Assertions.assertThat(testOperatorWithMultipleDirectAsyncProcess.getProcessedOrders()).isEqualTo(testOperatorWithMultipleDirectAsyncProcess.getExpectedProcessedOrders());
            create.close();
        } catch (Throwable th) {
            create.close();
            throw th;
        }
    }

    @Test
    void testCheckpointDrain() throws Exception {
        AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        try {
            createTestHarness.open();
            AsyncExecutionController asyncExecutionController = createTestHarness.getOperator().getAsyncExecutionController();
            createTestHarness.getOperator().setAsyncKeyedContextElement(new StreamRecord(Tuple2.of(5, "5")), new TestKeySelector());
            asyncExecutionController.handleRequest((State) null, StateRequestType.VALUE_GET, (Object) null);
            createTestHarness.getOperator().postProcessElement();
            Assertions.assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(1);
            createTestHarness.drainStateRequests();
            Assertions.assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(0);
            if (createTestHarness != null) {
                createTestHarness.close();
            }
        } catch (Throwable th) {
            if (createTestHarness != null) {
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testTimerServiceIsAsync() throws Exception {
        AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        try {
            createTestHarness.open();
            Assertions.assertThat(createTestHarness.getOperator()).isInstanceOf(AbstractAsyncStateStreamOperator.class);
            Assertions.assertThat(createTestHarness.getOperator().getInternalTimerService("test", VoidNamespaceSerializer.INSTANCE, new Triggerable() { // from class: org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorTest.1
                public void onEventTime(InternalTimer internalTimer) throws Exception {
                }

                public void onProcessingTime(InternalTimer internalTimer) throws Exception {
                }
            })).isInstanceOf(InternalTimerServiceAsyncImpl.class);
            if (createTestHarness != null) {
                createTestHarness.close();
            }
        } catch (Throwable th) {
            if (createTestHarness != null) {
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testNonRecordProcess() throws Exception {
        AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        try {
            createTestHarness.open();
            TestOperator operator = createTestHarness.getOperator();
            createTestHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5")));
            CompletableFuture<Void> processLatencyMarkerInternal = createTestHarness.processLatencyMarkerInternal(new LatencyMarker(1234L, new OperatorID(), 0));
            Thread.sleep(1000L);
            Assertions.assertThat(operator.getProcessed()).isEqualTo(1);
            Assertions.assertThat(operator.getCurrentProcessingContext().getReferenceCount()).isGreaterThan(1);
            Assertions.assertThat(operator.getLatencyProcessed()).isEqualTo(0);
            operator.proceed();
            processLatencyMarkerInternal.get();
            Assertions.assertThat(operator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            Assertions.assertThat(operator.getLatencyProcessed()).isEqualTo(1);
            if (createTestHarness != null) {
                createTestHarness.close();
            }
        } catch (Throwable th) {
            if (createTestHarness != null) {
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testWatermark() throws Exception {
        AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, new TestOperatorWithAsyncProcessTimer(ElementOrder.RECORD_ORDER));
        try {
            createTestHarness.open();
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
            createTestHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "1")));
            concurrentLinkedQueue.add(new StreamRecord("EventTimer-1-1"));
            createTestHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "3")));
            concurrentLinkedQueue.add(new StreamRecord("EventTimer-1-3"));
            createTestHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "6")));
            concurrentLinkedQueue.add(new StreamRecord("EventTimer-1-6"));
            createTestHarness.processElementInternal(new StreamRecord<>(Tuple2.of(1, "9")));
            concurrentLinkedQueue.add(new StreamRecord("EventTimer-1-9"));
            createTestHarness.processWatermark(10L);
            concurrentLinkedQueue.add(new Watermark(10L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, createTestHarness.getOutput());
            if (createTestHarness != null) {
                createTestHarness.close();
            }
        } catch (Throwable th) {
            if (createTestHarness != null) {
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testWatermarkHooks() throws Exception {
        WatermarkTestingOperator watermarkTestingOperator = new WatermarkTestingOperator();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        watermarkTestingOperator.setPreProcessFunction(watermark -> {
            watermarkTestingOperator.asyncProcessWithKey(1L, () -> {
                Assertions.assertThat(watermarkTestingOperator.getCurrentKey()).isEqualTo(1L);
                watermarkTestingOperator.output(Long.valueOf(watermark.getTimestamp() + 1000));
            });
            if (atomicInteger.incrementAndGet() % 2 == 0) {
                return null;
            }
            return new Watermark(watermark.getTimestamp() + 1);
        });
        watermarkTestingOperator.setPostProcessFunction(watermark2 -> {
            watermarkTestingOperator.output(Long.valueOf(watermark2.getTimestamp() + 100));
        });
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeySelector keySelector = l -> {
            return 0;
        };
        AsyncKeyedTwoInputStreamOperatorTestHarness create = AsyncKeyedTwoInputStreamOperatorTestHarness.create(watermarkTestingOperator, keySelector, keySelector, BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        try {
            create.setup();
            create.open();
            create.processElement1(1L, 1L);
            create.processElement1(3L, 3L);
            create.processElement1(4L, 4L);
            create.processWatermark1(new Watermark(2L));
            create.processWatermark2(new Watermark(2L));
            concurrentLinkedQueue.add(new StreamRecord(1002L));
            concurrentLinkedQueue.add(new StreamRecord(1L));
            concurrentLinkedQueue.add(new StreamRecord(3L));
            concurrentLinkedQueue.add(new Watermark(3L));
            concurrentLinkedQueue.add(new StreamRecord(103L));
            create.processWatermark1(new Watermark(4L));
            create.processWatermark2(new Watermark(4L));
            concurrentLinkedQueue.add(new StreamRecord(1004L));
            create.processWatermark1(new Watermark(5L));
            create.processWatermark2(new Watermark(5L));
            concurrentLinkedQueue.add(new StreamRecord(1005L));
            concurrentLinkedQueue.add(new StreamRecord(4L));
            concurrentLinkedQueue.add(new Watermark(6L));
            concurrentLinkedQueue.add(new StreamRecord(106L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, create.getOutput());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testWatermarkStatus() throws Exception {
        AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        try {
            createTestHarness.open();
            TestOperator operator = createTestHarness.getOperator();
            RecordProcessorUtils.getRecordProcessor(operator);
            createTestHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5")));
            createTestHarness.processWatermarkInternal(new Watermark(205L));
            CompletableFuture<Void> processWatermarkStatusInternal = createTestHarness.processWatermarkStatusInternal(WatermarkStatus.IDLE);
            Thread.sleep(1000L);
            Assertions.assertThat(operator.getProcessed()).isEqualTo(1);
            Assertions.assertThat(operator.getCurrentProcessingContext().getReferenceCount()).isGreaterThan(1);
            Assertions.assertThat(operator.watermarkIndex).isEqualTo(-1);
            Assertions.assertThat(operator.watermarkStatus.isIdle()).isTrue();
            operator.proceed();
            processWatermarkStatusInternal.get();
            Assertions.assertThat(operator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            Assertions.assertThat(operator.watermarkStatus.isActive()).isFalse();
            Assertions.assertThat(createTestHarness.getOutput()).containsExactly(new Object[]{new StreamRecord("EventTimer-5-105"), new Watermark(205L), WatermarkStatus.IDLE});
            if (createTestHarness != null) {
                createTestHarness.close();
            }
        } catch (Throwable th) {
            if (createTestHarness != null) {
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testIdleWatermarkHandling() throws Exception {
        WatermarkTestingOperator watermarkTestingOperator = new WatermarkTestingOperator();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeySelector keySelector = l -> {
            return 0;
        };
        AsyncKeyedTwoInputStreamOperatorTestHarness create = AsyncKeyedTwoInputStreamOperatorTestHarness.create(watermarkTestingOperator, keySelector, keySelector, BasicTypeInfo.INT_TYPE_INFO, 1, 1, 0);
        try {
            create.setup();
            create.open();
            create.processElement1(1L, 1L);
            create.processElement1(3L, 3L);
            create.processElement1(4L, 4L);
            create.processWatermark1(new Watermark(1L));
            Assertions.assertThat(create.getOutput()).isEmpty();
            create.processWatermarkStatus2(WatermarkStatus.IDLE);
            concurrentLinkedQueue.add(new StreamRecord(1L));
            concurrentLinkedQueue.add(new Watermark(1L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, create.getOutput());
            create.processWatermark1(new Watermark(3L));
            concurrentLinkedQueue.add(new StreamRecord(3L));
            concurrentLinkedQueue.add(new Watermark(3L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, create.getOutput());
            create.processWatermarkStatus2(WatermarkStatus.ACTIVE);
            create.processWatermark1(new Watermark(4L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, create.getOutput());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1272293197:
                if (implMethodName.equals("lambda$testWatermarkHooks$bf5bd099$1")) {
                    z = true;
                    break;
                }
                break;
            case 718303944:
                if (implMethodName.equals("lambda$testIdleWatermarkHandling$bf5bd099$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Integer;")) {
                    return l -> {
                        return 0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Integer;")) {
                    return l2 -> {
                        return 0;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
