package org.apache.flink.streaming.api.operators;

import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorTest;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorV2;
import org.apache.flink.runtime.state.StateBackendTestUtils;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedMultiInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.BiFunctionWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.class */
class AbstractAsyncStateStreamOperatorV2Test {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test$KeyedOneInputStreamOperatorV2TestHarness.class */
    public static class KeyedOneInputStreamOperatorV2TestHarness<K, IN, OUT> extends AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> {
        public static <K, IN, OUT> KeyedOneInputStreamOperatorV2TestHarness<K, IN, OUT> create(StreamOperatorFactory<OUT> streamOperatorFactory, KeySelector<IN, K> keySelector, TypeInformation<K> typeInformation, int i, int i2, int i3) throws Exception {
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
            CompletableFuture completableFuture = new CompletableFuture();
            newSingleThreadExecutor.execute(() -> {
                try {
                    completableFuture.complete(new KeyedOneInputStreamOperatorV2TestHarness(newSingleThreadExecutor, streamOperatorFactory, keySelector, typeInformation, i, i2, i3));
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            return (KeyedOneInputStreamOperatorV2TestHarness) completableFuture.get();
        }

        public KeyedOneInputStreamOperatorV2TestHarness(ExecutorService executorService, StreamOperatorFactory<OUT> streamOperatorFactory, KeySelector<IN, K> keySelector, TypeInformation<K> typeInformation, int i, int i2, int i3) throws Exception {
            super(executorService, streamOperatorFactory, keySelector, typeInformation, i, i2, i3);
        }

        public StreamOperator<OUT> getBaseOperator() {
            return this.operator;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test$SingleInputTestOperator.class */
    private static class SingleInputTestOperator extends AbstractAsyncStateStreamOperatorV2<String> implements MultipleInputStreamOperator<String>, Triggerable<Integer, VoidNamespace> {
        private static final long serialVersionUID = 1;
        final AtomicInteger processed;
        final AtomicInteger attributeProcessed;
        private final ElementOrder elementOrder;
        final Object objectToWait;
        Input input;
        private WatermarkStatus watermarkStatus;
        private int watermarkIndex;
        InternalTimerService<VoidNamespace> timerService;

        public SingleInputTestOperator(StreamOperatorParameters<String> streamOperatorParameters, ElementOrder elementOrder) {
            super(streamOperatorParameters, 1);
            this.processed = new AtomicInteger(0);
            this.attributeProcessed = new AtomicInteger(0);
            this.objectToWait = new Object();
            this.watermarkStatus = new WatermarkStatus(-1);
            this.watermarkIndex = -1;
            this.elementOrder = elementOrder;
            this.input = new AbstractInput<Tuple2<Integer, String>, String>(this, 1) { // from class: org.apache.flink.streaming.api.operators.AbstractAsyncStateStreamOperatorV2Test.SingleInputTestOperator.1
                public void processElement(StreamRecord<Tuple2<Integer, String>> streamRecord) throws Exception {
                    SingleInputTestOperator.this.processed.incrementAndGet();
                    synchronized (SingleInputTestOperator.this.objectToWait) {
                        SingleInputTestOperator.this.objectToWait.wait();
                    }
                    SingleInputTestOperator.this.timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, ((Integer) ((Tuple2) streamRecord.getValue()).f0).intValue() + 100);
                }
            };
        }

        public void open() throws Exception {
            super.open();
            this.timerService = getInternalTimerService("processing timer", VoidNamespaceSerializer.INSTANCE, this);
        }

        public List<Input> getInputs() {
            return Collections.singletonList(this.input);
        }

        public ElementOrder getElementOrder() {
            return this.elementOrder;
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            Assertions.assertThat(getCurrentKey()).isEqualTo(internalTimer.getKey());
            this.output.collect(new StreamRecord("EventTimer-" + internalTimer.getKey() + "-" + internalTimer.getTimestamp()));
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            Assertions.assertThat(getCurrentKey()).isEqualTo(internalTimer.getKey());
            this.output.collect(new StreamRecord("ProcessingTimer-" + internalTimer.getKey() + "-" + internalTimer.getTimestamp()));
        }

        public void processRecordAttributes(RecordAttributes recordAttributes, int i) throws Exception {
            super.processRecordAttributes(recordAttributes, i);
            this.attributeProcessed.incrementAndGet();
        }

        public void processWatermarkStatus(WatermarkStatus watermarkStatus, int i) throws Exception {
            super.processWatermarkStatus(watermarkStatus, i);
            this.watermarkStatus = watermarkStatus;
            this.watermarkIndex = i;
        }

        public int getProcessed() {
            return this.processed.get();
        }

        public int getAttributeProcessed() {
            return this.attributeProcessed.get();
        }

        public void proceed() {
            synchronized (this.objectToWait) {
                this.objectToWait.notify();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test$SingleInputTestOperatorDirectAsyncProcess.class */
    private static class SingleInputTestOperatorDirectAsyncProcess extends SingleInputTestOperator {
        SingleInputTestOperatorDirectAsyncProcess(StreamOperatorParameters<String> streamOperatorParameters, ElementOrder elementOrder) {
            super(streamOperatorParameters, elementOrder);
            this.input = new AbstractInput<Tuple2<Integer, String>, String>(this, 1) { // from class: org.apache.flink.streaming.api.operators.AbstractAsyncStateStreamOperatorV2Test.SingleInputTestOperatorDirectAsyncProcess.1
                public void processElement(StreamRecord<Tuple2<Integer, String>> streamRecord) throws Exception {
                    SingleInputTestOperatorDirectAsyncProcess.this.asyncProcessWithKey((Integer) ((Tuple2) streamRecord.getValue()).f0, () -> {
                        SingleInputTestOperatorDirectAsyncProcess.this.processed.incrementAndGet();
                    });
                    synchronized (SingleInputTestOperatorDirectAsyncProcess.this.objectToWait) {
                        SingleInputTestOperatorDirectAsyncProcess.this.objectToWait.wait();
                    }
                    SingleInputTestOperatorDirectAsyncProcess.this.processed.incrementAndGet();
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test$SingleInputTestOperatorWithAsyncProcessTimer.class */
    private static class SingleInputTestOperatorWithAsyncProcessTimer extends SingleInputTestOperator {
        SingleInputTestOperatorWithAsyncProcessTimer(StreamOperatorParameters<String> streamOperatorParameters, ElementOrder elementOrder) {
            super(streamOperatorParameters, elementOrder);
            this.input = new AbstractInput<Tuple2<Integer, String>, String>(this, 1) { // from class: org.apache.flink.streaming.api.operators.AbstractAsyncStateStreamOperatorV2Test.SingleInputTestOperatorWithAsyncProcessTimer.1
                public void processElement(StreamRecord<Tuple2<Integer, String>> streamRecord) throws Exception {
                    SingleInputTestOperatorWithAsyncProcessTimer.this.processed.incrementAndGet();
                    SingleInputTestOperatorWithAsyncProcessTimer.this.timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, Long.parseLong((String) ((Tuple2) streamRecord.getValue()).f1));
                }
            };
        }

        @Override // org.apache.flink.streaming.api.operators.AbstractAsyncStateStreamOperatorV2Test.SingleInputTestOperator
        public void onEventTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            asyncProcessWithKey((Integer) internalTimer.getKey(), () -> {
                super.onEventTime(internalTimer);
            });
        }

        @Override // org.apache.flink.streaming.api.operators.AbstractAsyncStateStreamOperatorV2Test.SingleInputTestOperator
        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            asyncProcessWithKey((Integer) internalTimer.getKey(), () -> {
                super.onProcessingTime(internalTimer);
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test$TestDirectAsyncProcessOperatorFactory.class */
    private static class TestDirectAsyncProcessOperatorFactory extends AbstractStreamOperatorFactory<String> {
        private final ElementOrder elementOrder;

        TestDirectAsyncProcessOperatorFactory(ElementOrder elementOrder) {
            this.elementOrder = elementOrder;
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            return new SingleInputTestOperatorDirectAsyncProcess(streamOperatorParameters, this.elementOrder);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return SingleInputTestOperatorDirectAsyncProcess.class;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test$TestOperatorFactory.class */
    public static class TestOperatorFactory extends AbstractStreamOperatorFactory<String> {
        private final ElementOrder elementOrder;

        TestOperatorFactory(ElementOrder elementOrder) {
            this.elementOrder = elementOrder;
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            return new SingleInputTestOperator(streamOperatorParameters, this.elementOrder);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return SingleInputTestOperator.class;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test$TestWithAsyncProcessTimerOperatorFactory.class */
    private static class TestWithAsyncProcessTimerOperatorFactory extends AbstractStreamOperatorFactory<String> {
        private final ElementOrder elementOrder;

        TestWithAsyncProcessTimerOperatorFactory(ElementOrder elementOrder) {
            this.elementOrder = elementOrder;
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            return new SingleInputTestOperatorWithAsyncProcessTimer(streamOperatorParameters, this.elementOrder);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return SingleInputTestOperatorWithAsyncProcessTimer.class;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test$WatermarkTestingOperator.class */
    public static class WatermarkTestingOperator extends AbstractAsyncStateStreamOperatorV2<Long> implements MultipleInputStreamOperator<Long>, Triggerable<Integer, VoidNamespace> {
        private transient InternalTimerService<VoidNamespace> timerService;
        private BiFunctionWithException<WatermarkTestingOperator, Watermark, Watermark, Exception> preProcessFunction;
        private BiConsumerWithException<WatermarkTestingOperator, Watermark, Exception> postProcessFunction;

        public WatermarkTestingOperator(StreamOperatorParameters<Long> streamOperatorParameters, BiFunctionWithException<WatermarkTestingOperator, Watermark, Watermark, Exception> biFunctionWithException, BiConsumerWithException<WatermarkTestingOperator, Watermark, Exception> biConsumerWithException) {
            super(streamOperatorParameters, 3);
            this.preProcessFunction = biFunctionWithException;
            this.postProcessFunction = biConsumerWithException;
        }

        public void output(Long l) {
            this.output.collect(new StreamRecord(l));
        }

        public void open() throws Exception {
            super.open();
            this.timerService = getInternalTimerService("test-timers", VoidNamespaceSerializer.INSTANCE, this);
        }

        public Watermark preProcessWatermark(Watermark watermark) throws Exception {
            return this.preProcessFunction == null ? watermark : (Watermark) this.preProcessFunction.apply(this, watermark);
        }

        public void postProcessWatermark(Watermark watermark) throws Exception {
            if (this.postProcessFunction != null) {
                this.postProcessFunction.accept(this, watermark);
            }
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            Assertions.assertThat(getCurrentKey()).isEqualTo(internalTimer.getKey());
            this.output.collect(new StreamRecord(Long.valueOf(internalTimer.getTimestamp())));
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> internalTimer) throws Exception {
            Assertions.assertThat(getCurrentKey()).isEqualTo(internalTimer.getKey());
        }

        private Input<Long> createInput(int i) {
            return new AbstractInput<Long, Long>(this, i) { // from class: org.apache.flink.streaming.api.operators.AbstractAsyncStateStreamOperatorV2Test.WatermarkTestingOperator.1
                public void processElement(StreamRecord<Long> streamRecord) throws Exception {
                    WatermarkTestingOperator.this.timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, ((Long) streamRecord.getValue()).longValue());
                }
            };
        }

        public List<Input> getInputs() {
            return Arrays.asList(createInput(1), createInput(2), createInput(3));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test$WatermarkTestingOperatorFactory.class */
    private static class WatermarkTestingOperatorFactory extends AbstractStreamOperatorFactory<Long> {
        private BiFunctionWithException<WatermarkTestingOperator, Watermark, Watermark, Exception> preProcessFunction;
        private BiConsumerWithException<WatermarkTestingOperator, Watermark, Exception> postProcessFunction;

        private WatermarkTestingOperatorFactory() {
        }

        public void setPreProcessFunction(BiFunctionWithException<WatermarkTestingOperator, Watermark, Watermark, Exception> biFunctionWithException) {
            this.preProcessFunction = biFunctionWithException;
        }

        public void setPostProcessFunction(BiConsumerWithException<WatermarkTestingOperator, Watermark, Exception> biConsumerWithException) {
            this.postProcessFunction = biConsumerWithException;
        }

        public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> streamOperatorParameters) {
            return new WatermarkTestingOperator(streamOperatorParameters, this.preProcessFunction, this.postProcessFunction);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return WatermarkTestingOperator.class;
        }
    }

    AbstractAsyncStateStreamOperatorV2Test() {
    }

    protected KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness(int i, int i2, int i3, ElementOrder elementOrder) throws Exception {
        KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> create = KeyedOneInputStreamOperatorV2TestHarness.create(new TestOperatorFactory(elementOrder), new AbstractAsyncStateStreamOperatorTest.TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, i, i2, i3);
        create.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend(new HashMapStateBackend()));
        return create;
    }

    @Test
    void testCreateAsyncExecutionController() throws Exception {
        KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        try {
            createTestHarness.open();
            Assertions.assertThat(createTestHarness.getBaseOperator()).isInstanceOf(AbstractAsyncStateStreamOperatorV2.class);
            Assertions.assertThat(createTestHarness.getBaseOperator().getAsyncExecutionController()).isNotNull();
            Assertions.assertThat(createTestHarness.getBaseOperator().getAsyncExecutionController().getStateExecutor()).isNotNull();
            if (createTestHarness != null) {
                createTestHarness.close();
            }
        } catch (Throwable th) {
            if (createTestHarness != null) {
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testRecordProcessorWithFirstStateOrder() throws Exception {
        KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.FIRST_STATE_ORDER);
        try {
            createTestHarness.open();
            SingleInputTestOperator baseOperator = createTestHarness.getBaseOperator();
            CompletableFuture processElementInternal = createTestHarness.processElementInternal(new StreamRecord(Tuple2.of(5, "5")));
            Thread.sleep(1000L);
            Assertions.assertThat(baseOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat(baseOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(1);
            baseOperator.proceed();
            processElementInternal.get();
            Assertions.assertThat(baseOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            if (createTestHarness != null) {
                createTestHarness.close();
            }
        } catch (Throwable th) {
            if (createTestHarness != null) {
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testRecordProcessorWithRecordOrder() throws Exception {
        KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        try {
            createTestHarness.open();
            SingleInputTestOperator baseOperator = createTestHarness.getBaseOperator();
            CompletableFuture processElementInternal = createTestHarness.processElementInternal(new StreamRecord(Tuple2.of(5, "5")));
            Thread.sleep(1000L);
            Assertions.assertThat(baseOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat(baseOperator.getCurrentProcessingContext().getReferenceCount()).isGreaterThan(1);
            baseOperator.proceed();
            processElementInternal.get();
            Assertions.assertThat(baseOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            if (createTestHarness != null) {
                createTestHarness.close();
            }
        } catch (Throwable th) {
            if (createTestHarness != null) {
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testAsyncProcessWithKey() throws Exception {
        KeyedOneInputStreamOperatorV2TestHarness create = KeyedOneInputStreamOperatorV2TestHarness.create(new TestDirectAsyncProcessOperatorFactory(ElementOrder.RECORD_ORDER), new AbstractAsyncStateStreamOperatorTest.TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, 128, 1, 0);
        create.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend(new HashMapStateBackend()));
        try {
            create.open();
            SingleInputTestOperatorDirectAsyncProcess baseOperator = create.getBaseOperator();
            CompletableFuture processElementInternal = create.processElementInternal(new StreamRecord(Tuple2.of(5, "5")));
            Thread.sleep(1000L);
            Assertions.assertThat(baseOperator.getProcessed()).isEqualTo(0);
            Assertions.assertThat(baseOperator.getCurrentProcessingContext().getReferenceCount()).isGreaterThan(1);
            baseOperator.proceed();
            processElementInternal.get();
            Assertions.assertThat(baseOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            Assertions.assertThat(baseOperator.getProcessed()).isEqualTo(1);
            create.close();
        } catch (Throwable th) {
            create.close();
            throw th;
        }
    }

    @Test
    void testCheckpointDrain() throws Exception {
        KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        try {
            createTestHarness.open();
            SingleInputTestOperator baseOperator = createTestHarness.getBaseOperator();
            AsyncExecutionController asyncExecutionController = baseOperator.getAsyncExecutionController();
            baseOperator.setAsyncKeyedContextElement(new StreamRecord(Tuple2.of(5, "5")), new AbstractAsyncStateStreamOperatorTest.TestKeySelector());
            asyncExecutionController.handleRequest((State) null, StateRequestType.VALUE_GET, (Object) null);
            baseOperator.postProcessElement();
            Assertions.assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(1);
            createTestHarness.drainStateRequests();
            Assertions.assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(0);
            if (createTestHarness != null) {
                createTestHarness.close();
            }
        } catch (Throwable th) {
            if (createTestHarness != null) {
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testTimerServiceIsAsync() throws Exception {
        KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        try {
            createTestHarness.open();
            Assertions.assertThat(createTestHarness.getBaseOperator()).isInstanceOf(AbstractAsyncStateStreamOperatorV2.class);
            Assertions.assertThat(createTestHarness.getBaseOperator().getInternalTimerService("test", VoidNamespaceSerializer.INSTANCE, new Triggerable() { // from class: org.apache.flink.streaming.api.operators.AbstractAsyncStateStreamOperatorV2Test.1
                public void onEventTime(InternalTimer internalTimer) throws Exception {
                }

                public void onProcessingTime(InternalTimer internalTimer) throws Exception {
                }
            })).isInstanceOf(InternalTimerServiceAsyncImpl.class);
            if (createTestHarness != null) {
                createTestHarness.close();
            }
        } catch (Throwable th) {
            if (createTestHarness != null) {
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testNonRecordProcess() throws Exception {
        KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        try {
            createTestHarness.open();
            SingleInputTestOperator baseOperator = createTestHarness.getBaseOperator();
            createTestHarness.processElementInternal(new StreamRecord(Tuple2.of(5, "5")));
            CompletableFuture processRecordAttributesInternal = createTestHarness.processRecordAttributesInternal(new RecordAttributes(false));
            Thread.sleep(1000L);
            Assertions.assertThat(baseOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat(baseOperator.getAttributeProcessed()).isEqualTo(0);
            baseOperator.proceed();
            processRecordAttributesInternal.get();
            Assertions.assertThat(baseOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            Assertions.assertThat(baseOperator.getAttributeProcessed()).isEqualTo(1);
            if (createTestHarness != null) {
                createTestHarness.close();
            }
        } catch (Throwable th) {
            if (createTestHarness != null) {
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testWatermark() throws Exception {
        KeyedOneInputStreamOperatorV2TestHarness create = KeyedOneInputStreamOperatorV2TestHarness.create(new TestWithAsyncProcessTimerOperatorFactory(ElementOrder.RECORD_ORDER), new AbstractAsyncStateStreamOperatorTest.TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, 128, 1, 0);
        create.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend(new HashMapStateBackend()));
        try {
            create.open();
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
            create.processElementInternal(new StreamRecord(Tuple2.of(1, "1")));
            concurrentLinkedQueue.add(new StreamRecord("EventTimer-1-1"));
            create.processElementInternal(new StreamRecord(Tuple2.of(1, "3")));
            concurrentLinkedQueue.add(new StreamRecord("EventTimer-1-3"));
            create.processElementInternal(new StreamRecord(Tuple2.of(1, "6")));
            concurrentLinkedQueue.add(new StreamRecord("EventTimer-1-6"));
            create.processElementInternal(new StreamRecord(Tuple2.of(1, "9")));
            concurrentLinkedQueue.add(new StreamRecord("EventTimer-1-9"));
            create.processWatermark(10L);
            concurrentLinkedQueue.add(new Watermark(10L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, create.getOutput());
        } finally {
            create.close();
        }
    }

    @Test
    void testWatermarkHooks() throws Exception {
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeySelector keySelector = l -> {
            return 0;
        };
        List asList = Arrays.asList(keySelector, keySelector, keySelector);
        WatermarkTestingOperatorFactory watermarkTestingOperatorFactory = new WatermarkTestingOperatorFactory();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        watermarkTestingOperatorFactory.setPreProcessFunction((watermarkTestingOperator, watermark) -> {
            watermarkTestingOperator.asyncProcessWithKey(1L, () -> {
                Assertions.assertThat(watermarkTestingOperator.getCurrentKey()).isEqualTo(1L);
                watermarkTestingOperator.output(Long.valueOf(watermark.getTimestamp() + 1000));
            });
            if (atomicInteger.incrementAndGet() % 2 == 0) {
                return null;
            }
            return new Watermark(watermark.getTimestamp() + 1);
        });
        watermarkTestingOperatorFactory.setPostProcessFunction((watermarkTestingOperator2, watermark2) -> {
            watermarkTestingOperator2.output(Long.valueOf(watermark2.getTimestamp() + 100));
        });
        AsyncKeyedMultiInputStreamOperatorTestHarness create = AsyncKeyedMultiInputStreamOperatorTestHarness.create(watermarkTestingOperatorFactory, BasicTypeInfo.INT_TYPE_INFO, asList, 1, 1, 0);
        try {
            create.setup();
            create.open();
            create.processElement(0, new StreamRecord(1L, 1L));
            create.processElement(0, new StreamRecord(3L, 3L));
            create.processElement(0, new StreamRecord(4L, 4L));
            create.processWatermark(0, new Watermark(2L));
            create.processWatermark(1, new Watermark(2L));
            create.processWatermark(2, new Watermark(2L));
            concurrentLinkedQueue.add(new StreamRecord(1002L));
            concurrentLinkedQueue.add(new StreamRecord(1L));
            concurrentLinkedQueue.add(new StreamRecord(3L));
            concurrentLinkedQueue.add(new Watermark(3L));
            concurrentLinkedQueue.add(new StreamRecord(103L));
            create.processWatermark(0, new Watermark(4L));
            create.processWatermark(1, new Watermark(4L));
            create.processWatermark(2, new Watermark(4L));
            concurrentLinkedQueue.add(new StreamRecord(1004L));
            create.processWatermark(0, new Watermark(5L));
            create.processWatermark(1, new Watermark(5L));
            create.processWatermark(2, new Watermark(5L));
            concurrentLinkedQueue.add(new StreamRecord(1005L));
            concurrentLinkedQueue.add(new StreamRecord(4L));
            concurrentLinkedQueue.add(new Watermark(6L));
            concurrentLinkedQueue.add(new StreamRecord(106L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, create.getOutput());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testWatermarkStatus() throws Exception {
        KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);
        try {
            createTestHarness.open();
            SingleInputTestOperator baseOperator = createTestHarness.getBaseOperator();
            createTestHarness.processElementInternal(new StreamRecord(Tuple2.of(5, "5")));
            createTestHarness.processWatermarkInternal(new Watermark(205L));
            CompletableFuture processWatermarkStatusInternal = createTestHarness.processWatermarkStatusInternal(WatermarkStatus.IDLE);
            Thread.sleep(1000L);
            Assertions.assertThat(baseOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat(baseOperator.watermarkIndex).isEqualTo(-1);
            Assertions.assertThat(baseOperator.watermarkStatus.isIdle()).isTrue();
            baseOperator.proceed();
            processWatermarkStatusInternal.get();
            Assertions.assertThat(baseOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            Assertions.assertThat(baseOperator.watermarkStatus.isActive()).isFalse();
            Assertions.assertThat(createTestHarness.getOutput()).containsExactly(new Object[]{new StreamRecord("EventTimer-5-105"), new Watermark(205L), WatermarkStatus.IDLE});
            if (createTestHarness != null) {
                createTestHarness.close();
            }
        } catch (Throwable th) {
            if (createTestHarness != null) {
                try {
                    createTestHarness.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testIdleWatermarkHandling() throws Exception {
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeySelector keySelector = l -> {
            return 0;
        };
        AsyncKeyedMultiInputStreamOperatorTestHarness create = AsyncKeyedMultiInputStreamOperatorTestHarness.create(new WatermarkTestingOperatorFactory(), BasicTypeInfo.INT_TYPE_INFO, Arrays.asList(keySelector, keySelector, keySelector), 1, 1, 0);
        try {
            create.setup();
            create.open();
            create.processElement(0, new StreamRecord(1L, 1L));
            create.processElement(0, new StreamRecord(3L, 3L));
            create.processElement(0, new StreamRecord(4L, 4L));
            create.processWatermark(0, new Watermark(1L));
            Assertions.assertThat(create.getOutput()).isEmpty();
            create.processWatermarkStatus(1, WatermarkStatus.IDLE);
            TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, create.getOutput());
            create.processWatermarkStatus(2, WatermarkStatus.IDLE);
            concurrentLinkedQueue.add(new StreamRecord(1L));
            concurrentLinkedQueue.add(new Watermark(1L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, create.getOutput());
            create.processWatermark(0, new Watermark(3L));
            concurrentLinkedQueue.add(new StreamRecord(3L));
            concurrentLinkedQueue.add(new Watermark(3L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, create.getOutput());
            create.processWatermarkStatus(1, WatermarkStatus.ACTIVE);
            create.processWatermark(0, new Watermark(4L));
            TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, create.getOutput());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testIdlenessForwarding() throws Exception {
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeySelector keySelector = l -> {
            return 0;
        };
        AsyncKeyedMultiInputStreamOperatorTestHarness create = AsyncKeyedMultiInputStreamOperatorTestHarness.create(new WatermarkTestingOperatorFactory(), BasicTypeInfo.INT_TYPE_INFO, Arrays.asList(keySelector, keySelector, keySelector), 1, 1, 0);
        try {
            create.setup();
            create.open();
            create.processWatermarkStatus(0, WatermarkStatus.IDLE);
            create.processWatermarkStatus(1, WatermarkStatus.IDLE);
            TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, create.getOutput());
            create.processWatermarkStatus(2, WatermarkStatus.IDLE);
            concurrentLinkedQueue.add(WatermarkStatus.IDLE);
            TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, create.getOutput());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testRecordAttributesForwarding() throws Exception {
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        KeySelector keySelector = l -> {
            return 0;
        };
        AsyncKeyedMultiInputStreamOperatorTestHarness create = AsyncKeyedMultiInputStreamOperatorTestHarness.create(new WatermarkTestingOperatorFactory(), BasicTypeInfo.INT_TYPE_INFO, Arrays.asList(keySelector, keySelector, keySelector), 1, 1, 0);
        try {
            create.setup();
            create.open();
            RecordAttributes build = new RecordAttributesBuilder(Collections.emptyList()).setBacklog(true).build();
            RecordAttributes build2 = new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build();
            create.processRecordAttributes(0, build);
            create.processRecordAttributes(1, build);
            create.processRecordAttributes(2, build);
            concurrentLinkedQueue.add(build);
            concurrentLinkedQueue.add(build);
            concurrentLinkedQueue.add(build);
            create.processRecordAttributes(0, build2);
            create.processRecordAttributes(1, build2);
            create.processRecordAttributes(2, build2);
            concurrentLinkedQueue.add(build);
            concurrentLinkedQueue.add(build);
            concurrentLinkedQueue.add(build2);
            TestHarnessUtil.assertOutputEquals("Output was not correct", concurrentLinkedQueue, create.getOutput());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1272293197:
                if (implMethodName.equals("lambda$testWatermarkHooks$bf5bd099$1")) {
                    z = true;
                    break;
                }
                break;
            case 718303944:
                if (implMethodName.equals("lambda$testIdleWatermarkHandling$bf5bd099$1")) {
                    z = false;
                    break;
                }
                break;
            case 780924631:
                if (implMethodName.equals("lambda$testIdlenessForwarding$bf5bd099$1")) {
                    z = 3;
                    break;
                }
                break;
            case 2125496730:
                if (implMethodName.equals("lambda$testRecordAttributesForwarding$bf5bd099$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Integer;")) {
                    return l -> {
                        return 0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Integer;")) {
                    return l2 -> {
                        return 0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Integer;")) {
                    return l3 -> {
                        return 0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Integer;")) {
                    return l4 -> {
                        return 0;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
