/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.deltajoin;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper;
import org.apache.flink.table.runtime.generated.GeneratedResultFutureWrapper;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.deltajoin.AsyncDeltaJoinRunner;
import org.apache.flink.table.runtime.operators.join.deltajoin.StreamingDeltaJoinOperator;
import org.apache.flink.table.runtime.operators.join.lookup.keyordered.AecRecord;
import org.apache.flink.table.runtime.operators.join.lookup.keyordered.RecordsBuffer;
import org.apache.flink.table.runtime.operators.join.lookup.keyordered.TableAsyncExecutionController;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class StreamingDeltaJoinOperatorTest {
    private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness;
    private static final int AEC_CAPACITY = 100;
    private static final LinkedList<RowData> leftTableCurrentData = new LinkedList();
    private static final LinkedList<RowData> rightTableCurrentData = new LinkedList();
    private static final InternalTypeInfo<RowData> leftTypeInfo = InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{new IntType(), new BooleanType(), VarCharType.STRING_TYPE}, (String[])new String[]{"left_value", "left_jk1", "left_jk2_lk"}));
    private static final int[] leftJoinKeyIndices = new int[]{1, 2};
    private static final InternalTypeInfo<RowData> rightTypeInfo = InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{VarCharType.STRING_TYPE, new IntType(), new BooleanType()}, (String[])new String[]{"right_jk2", "right_value", "right_jk1_lk"}));
    private static final int[] rightJoinKeyIndices = new int[]{2, 0};
    private static final RowDataKeySelector leftJoinKeySelector = HandwrittenSelectorUtil.getRowDataSelector(leftJoinKeyIndices, leftTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]));
    private static final RowDataKeySelector rightJoinKeySelector = HandwrittenSelectorUtil.getRowDataSelector(rightJoinKeyIndices, rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]));
    private static final int[] outputUpsertKeyIndices = leftJoinKeyIndices;
    private RowDataHarnessAssertor assertor;
    private Optional<Throwable> latestException = Optional.empty();

    @BeforeEach
    public void beforeEach() throws Exception {
        this.testHarness = this.createDeltaJoinOperatorTestHarness();
        this.testHarness.setup();
        this.testHarness.open();
        StreamingDeltaJoinOperator operator = this.unwrapOperator(this.testHarness);
        this.testHarness.getEnvironment().setExternalFailureCauseConsumer(error -> {
            this.latestException = Optional.of(error);
        });
        operator.setAsyncExecutionController((TableAsyncExecutionController)new MyAsyncExecutionControllerDelegate((TableAsyncExecutionController<RowData, RowData, RowData>)operator.getAsyncExecutionController()));
        this.prepareOperatorRuntimeInfo(operator);
        this.assertor = new RowDataHarnessAssertor(this.getOutputType().getChildren().toArray(new LogicalType[0]), (o1, o2) -> {
            for (int keyIndex : outputUpsertKeyIndices) {
                LogicalType type = (LogicalType)this.getOutputType().getChildren().get(keyIndex);
                RowData.FieldGetter getter = RowData.createFieldGetter((LogicalType)type, (int)keyIndex);
                int compareResult = Objects.requireNonNull(getter.getFieldOrNull((RowData)o1)).toString().compareTo(Objects.requireNonNull(getter.getFieldOrNull((RowData)o2)).toString());
                if (compareResult == 0) continue;
                return compareResult;
            }
            return o1.toString().compareTo(o2.toString());
        });
        MyAsyncFunction.leftInvokeCount.set(0);
        MyAsyncFunction.rightInvokeCount.set(0);
        MyAsyncExecutionControllerDelegate.insertTableDataAfterEmit = true;
    }

    @AfterEach
    public void afterEach() throws Exception {
        this.testHarness.close();
        leftTableCurrentData.clear();
        rightTableCurrentData.clear();
        this.latestException = Optional.empty();
        MyAsyncFunction.clearExpectedThrownException();
    }

    @Test
    void testJoinBothAppendOnlyTables() throws Exception {
        StreamRecord<RowData> leftRecord1 = StreamRecordUtils.insertRecord(100, true, "jklk1");
        StreamRecord<RowData> leftRecord2 = StreamRecordUtils.insertRecord(100, false, "jklk2");
        this.testHarness.processElement1(leftRecord1);
        this.testHarness.processElement1(leftRecord2);
        StreamRecord<RowData> leftRecord3 = StreamRecordUtils.insertRecord(200, true, "jklk1");
        StreamRecord<RowData> leftRecord4 = StreamRecordUtils.insertRecord(200, false, "jklk2");
        this.testHarness.processElement1(leftRecord3);
        this.testHarness.processElement1(leftRecord4);
        StreamRecord<RowData> rightRecord1 = StreamRecordUtils.insertRecord("jklk1", 300, true);
        StreamRecord<RowData> rightRecord2 = StreamRecordUtils.insertRecord("jklk2", 300, false);
        this.testHarness.processElement2(rightRecord1);
        this.testHarness.processElement2(rightRecord2);
        StreamRecord<RowData> rightRecord3 = StreamRecordUtils.insertRecord("unknown", 500, false);
        this.testHarness.processElement2(rightRecord3);
        StreamRecord<RowData> leftRecord5 = StreamRecordUtils.insertRecord(800, true, "jklk1");
        StreamRecord<RowData> leftRecord6 = StreamRecordUtils.insertRecord(800, false, "jklk2");
        this.testHarness.processElement1(leftRecord5);
        this.testHarness.processElement1(leftRecord6);
        StreamRecord<RowData> rightRecord4 = StreamRecordUtils.insertRecord("jklk1", 1000, true);
        StreamRecord<RowData> rightRecord5 = StreamRecordUtils.insertRecord("jklk2", 1000, false);
        this.testHarness.processElement2(rightRecord4);
        this.testHarness.processElement2(rightRecord5);
        this.waitAllDataProcessed();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(100, true, "jklk1", "jklk1", 300, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(200, true, "jklk1", "jklk1", 300, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(100, false, "jklk2", "jklk2", 300, false));
        expectedOutput.add(StreamRecordUtils.insertRecord(200, false, "jklk2", "jklk2", 300, false));
        expectedOutput.add(StreamRecordUtils.insertRecord(800, true, "jklk1", "jklk1", 300, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(800, false, "jklk2", "jklk2", 300, false));
        expectedOutput.add(StreamRecordUtils.insertRecord(100, true, "jklk1", "jklk1", 1000, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(200, true, "jklk1", "jklk1", 1000, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(800, true, "jklk1", "jklk1", 1000, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(100, false, "jklk2", "jklk2", 1000, false));
        expectedOutput.add(StreamRecordUtils.insertRecord(200, false, "jklk2", "jklk2", 1000, false));
        expectedOutput.add(StreamRecordUtils.insertRecord(800, false, "jklk2", "jklk2", 1000, false));
        this.assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, this.testHarness.getOutput());
        TableAsyncExecutionController<RowData, RowData, RowData> aec = this.unwrapAEC(this.testHarness);
        Assertions.assertThat((int)aec.getBlockingSize()).isEqualTo(0);
        Assertions.assertThat((int)aec.getInFlightSize()).isEqualTo(0);
        Assertions.assertThat((int)aec.getFinishSize()).isEqualTo(0);
    }

    @Test
    void testBlockingWithSameJoinKey() throws Exception {
        MyAsyncFunction.block();
        StreamRecord<RowData> leftRecord1 = StreamRecordUtils.insertRecord(100, true, "jklk1");
        StreamRecord<RowData> leftRecord2 = StreamRecordUtils.insertRecord(100, false, "jklk2");
        this.testHarness.processElement1(leftRecord1);
        this.testHarness.processElement1(leftRecord2);
        StreamRecord<RowData> rightRecord1 = StreamRecordUtils.insertRecord("jklk1", 300, true);
        StreamRecord<RowData> rightRecord2 = StreamRecordUtils.insertRecord("jklk2", 300, false);
        this.testHarness.processElement2(rightRecord1);
        this.testHarness.processElement2(rightRecord2);
        StreamRecord<RowData> leftRecord3 = StreamRecordUtils.insertRecord(200, true, "jklk1");
        StreamRecord<RowData> leftRecord4 = StreamRecordUtils.insertRecord(200, false, "jklk2");
        StreamRecord<RowData> leftRecord5 = StreamRecordUtils.insertRecord(201, false, "jklk2");
        this.testHarness.processElement1(leftRecord3);
        this.testHarness.processElement1(leftRecord4);
        this.testHarness.processElement1(leftRecord5);
        StreamRecord<RowData> rightRecord3 = StreamRecordUtils.insertRecord("unknown", 500, false);
        this.testHarness.processElement2(rightRecord3);
        TableAsyncExecutionController<RowData, RowData, RowData> aec = this.unwrapAEC(this.testHarness);
        Assertions.assertThat((int)aec.getBlockingSize()).isEqualTo(5);
        Assertions.assertThat((int)aec.getInFlightSize()).isEqualTo(3);
        Assertions.assertThat((int)aec.getFinishSize()).isEqualTo(0);
        RecordsBuffer recordsBuffer = aec.getRecordsBuffer();
        Assertions.assertThat((int)recordsBuffer.getActiveBuffer().size()).isEqualTo(3);
        Assertions.assertThat((int)recordsBuffer.getBlockingBuffer().size()).isEqualTo(2);
        RowData joinKey1 = (RowData)leftJoinKeySelector.getKey((Object)((RowData)StreamRecordUtils.insertRecord(100, true, "jklk1").getValue()));
        RowData joinKey2 = (RowData)leftJoinKeySelector.getKey((Object)((RowData)StreamRecordUtils.insertRecord(100, false, "jklk2").getValue()));
        RowData joinKey3 = (RowData)rightJoinKeySelector.getKey((Object)((RowData)StreamRecordUtils.insertRecord("unknown", 500, false).getValue()));
        Assertions.assertThat((Object)((AecRecord)recordsBuffer.getActiveBuffer().get(joinKey1))).isNotNull();
        Assertions.assertThat((Object)((AecRecord)recordsBuffer.getActiveBuffer().get(joinKey2))).isNotNull();
        Assertions.assertThat((Object)((AecRecord)recordsBuffer.getActiveBuffer().get(joinKey3))).isNotNull();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)((Collection)recordsBuffer.getBlockingBuffer().get(joinKey1))).isNotNull()).hasSize(2);
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)((Collection)recordsBuffer.getBlockingBuffer().get(joinKey2))).isNotNull()).hasSize(3);
        Assertions.assertThat((Collection)((Collection)recordsBuffer.getBlockingBuffer().get(joinKey3))).isNull();
        MyAsyncFunction.release();
        this.waitAllDataProcessed();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(100, true, "jklk1", "jklk1", 300, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(100, false, "jklk2", "jklk2", 300, false));
        expectedOutput.add(StreamRecordUtils.insertRecord(200, true, "jklk1", "jklk1", 300, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(200, false, "jklk2", "jklk2", 300, false));
        expectedOutput.add(StreamRecordUtils.insertRecord(201, false, "jklk2", "jklk2", 300, false));
        this.assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, this.testHarness.getOutput());
        Assertions.assertThat((int)aec.getBlockingSize()).isEqualTo(0);
        Assertions.assertThat((int)aec.getInFlightSize()).isEqualTo(0);
        Assertions.assertThat((int)aec.getFinishSize()).isEqualTo(0);
        Assertions.assertThat((Map)recordsBuffer.getActiveBuffer()).isEmpty();
        Assertions.assertThat((Map)recordsBuffer.getBlockingBuffer()).isEmpty();
        Assertions.assertThat((Map)recordsBuffer.getFinishedBuffer()).isEmpty();
    }

    @Test
    void testTableDataVisibleBeforeJoin() throws Exception {
        MyAsyncExecutionControllerDelegate.insertTableDataAfterEmit = false;
        StreamRecord<RowData> leftRecord1 = StreamRecordUtils.insertRecord(100, true, "jklk1");
        this.insertLeftTable(leftRecord1);
        StreamRecord<RowData> leftRecord2 = StreamRecordUtils.insertRecord(200, true, "jklk1");
        this.insertLeftTable(leftRecord2);
        StreamRecord<RowData> rightRecord1 = StreamRecordUtils.insertRecord("jklk1", 300, true);
        this.insertRightTable(rightRecord1);
        StreamRecord<RowData> rightRecord2 = StreamRecordUtils.insertRecord("jklk2", 500, false);
        this.insertRightTable(rightRecord2);
        StreamRecord<RowData> leftRecord3 = StreamRecordUtils.insertRecord(800, true, "jklk1");
        this.insertLeftTable(leftRecord3);
        StreamRecord<RowData> rightRecord3 = StreamRecordUtils.insertRecord("jklk1", 1000, true);
        this.insertRightTable(rightRecord3);
        this.testHarness.processElement1(leftRecord1);
        this.testHarness.processElement1(leftRecord2);
        this.testHarness.processElement2(rightRecord1);
        this.testHarness.processElement2(rightRecord2);
        this.testHarness.processElement1(leftRecord3);
        this.testHarness.processElement2(rightRecord3);
        this.waitAllDataProcessed();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(100, true, "jklk1", "jklk1", 300, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(100, true, "jklk1", "jklk1", 1000, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(200, true, "jklk1", "jklk1", 300, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(200, true, "jklk1", "jklk1", 1000, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(100, true, "jklk1", "jklk1", 300, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(200, true, "jklk1", "jklk1", 300, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(800, true, "jklk1", "jklk1", 300, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(800, true, "jklk1", "jklk1", 300, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(800, true, "jklk1", "jklk1", 1000, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(100, true, "jklk1", "jklk1", 1000, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(200, true, "jklk1", "jklk1", 1000, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(800, true, "jklk1", "jklk1", 1000, true));
        this.assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, this.testHarness.getOutput());
        TableAsyncExecutionController<RowData, RowData, RowData> aec = this.unwrapAEC(this.testHarness);
        Assertions.assertThat((int)aec.getBlockingSize()).isEqualTo(0);
        Assertions.assertThat((int)aec.getInFlightSize()).isEqualTo(0);
        Assertions.assertThat((int)aec.getFinishSize()).isEqualTo(0);
    }

    @Test
    void testCheckpointAndRestore() throws Exception {
        MyAsyncFunction.block();
        StreamRecord<RowData> leftRecord1 = StreamRecordUtils.insertRecord(100, true, "jklk1");
        this.testHarness.processElement1(leftRecord1);
        StreamRecord<RowData> rightRecord1 = StreamRecordUtils.insertRecord("jklk1", 300, true);
        this.testHarness.processElement2(rightRecord1);
        StreamRecord<RowData> leftRecord2 = StreamRecordUtils.insertRecord(200, true, "jklk1");
        this.testHarness.processElement1(leftRecord2);
        StreamRecord<RowData> rightRecord2 = StreamRecordUtils.insertRecord("unknown", 500, false);
        this.testHarness.processElement2(rightRecord2);
        TableAsyncExecutionController<RowData, RowData, RowData> aec = this.unwrapAEC(this.testHarness);
        Assertions.assertThat((int)aec.getBlockingSize()).isEqualTo(2);
        Assertions.assertThat((int)aec.getInFlightSize()).isEqualTo(2);
        Assertions.assertThat((int)aec.getFinishSize()).isEqualTo(0);
        RecordsBuffer recordsBuffer = aec.getRecordsBuffer();
        Assertions.assertThat((int)recordsBuffer.getActiveBuffer().size()).isEqualTo(2);
        Assertions.assertThat((int)recordsBuffer.getBlockingBuffer().size()).isEqualTo(1);
        OperatorSubtaskState snapshot = this.testHarness.snapshot(0L, 0L);
        MyAsyncFunction.release();
        this.testHarness.close();
        MyAsyncFunction.block();
        this.testHarness = this.createDeltaJoinOperatorTestHarness();
        this.testHarness.setup();
        StreamingDeltaJoinOperator operator = this.unwrapOperator(this.testHarness);
        operator.setAsyncExecutionController((TableAsyncExecutionController)new MyAsyncExecutionControllerDelegate((TableAsyncExecutionController<RowData, RowData, RowData>)operator.getAsyncExecutionController()));
        this.latestException = Optional.empty();
        this.testHarness.initializeState(snapshot);
        this.testHarness.open();
        this.prepareOperatorRuntimeInfo(operator);
        aec = this.unwrapAEC(this.testHarness);
        Assertions.assertThat((int)aec.getBlockingSize()).isEqualTo(2);
        Assertions.assertThat((int)aec.getInFlightSize()).isEqualTo(2);
        Assertions.assertThat((int)aec.getFinishSize()).isEqualTo(0);
        recordsBuffer = aec.getRecordsBuffer();
        Assertions.assertThat((int)recordsBuffer.getActiveBuffer().size()).isEqualTo(2);
        Assertions.assertThat((int)recordsBuffer.getBlockingBuffer().size()).isEqualTo(1);
        MyAsyncFunction.release();
        this.waitAllDataProcessed();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(100, true, "jklk1", "jklk1", 300, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(200, true, "jklk1", "jklk1", 300, true));
        this.assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, this.testHarness.getOutput());
        Assertions.assertThat((int)aec.getBlockingSize()).isEqualTo(0);
        Assertions.assertThat((int)aec.getInFlightSize()).isEqualTo(0);
        Assertions.assertThat((int)aec.getFinishSize()).isEqualTo(0);
        Assertions.assertThat((Map)recordsBuffer.getActiveBuffer()).isEmpty();
        Assertions.assertThat((Map)recordsBuffer.getBlockingBuffer()).isEmpty();
        Assertions.assertThat((Map)recordsBuffer.getFinishedBuffer()).isEmpty();
    }

    @Test
    void testClearLegacyStateWhenCheckpointing() throws Exception {
        MyAsyncFunction.block();
        StreamRecord<RowData> leftRecord1 = StreamRecordUtils.insertRecord(100, true, "jklk1");
        this.testHarness.processElement1(leftRecord1);
        StreamRecord<RowData> rightRecord1 = StreamRecordUtils.insertRecord("jklk1", 300, true);
        this.testHarness.processElement2(rightRecord1);
        StreamRecord<RowData> leftRecord2 = StreamRecordUtils.insertRecord(200, true, "jklk1");
        this.testHarness.processElement1(leftRecord2);
        StreamRecord<RowData> rightRecord2 = StreamRecordUtils.insertRecord("unknown", 500, false);
        this.testHarness.processElement2(rightRecord2);
        this.testHarness.snapshot(0L, 0L);
        Assertions.assertThat((int)this.testHarness.numKeyedStateEntries()).isEqualTo(2);
        MyAsyncFunction.release();
        this.waitAllDataProcessed();
        MyAsyncFunction.block();
        StreamRecord<RowData> leftRecord3 = StreamRecordUtils.insertRecord(700, true, "jklk1");
        this.testHarness.processElement1(leftRecord3);
        this.testHarness.snapshot(1L, 0L);
        Assertions.assertThat((int)this.testHarness.numKeyedStateEntries()).isEqualTo(1);
        MyAsyncFunction.release();
        this.waitAllDataProcessed();
        this.testHarness.snapshot(2L, 0L);
        Assertions.assertThat((int)this.testHarness.numKeyedStateEntries()).isEqualTo(0);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(100, true, "jklk1", "jklk1", 300, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(200, true, "jklk1", "jklk1", 300, true));
        expectedOutput.add(StreamRecordUtils.insertRecord(700, true, "jklk1", "jklk1", 300, true));
        this.assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, this.testHarness.getOutput());
    }

    @Test
    void testMeetExceptionWhenLookup() throws Exception {
        IllegalStateException expectedException = new IllegalStateException("Mock to fail");
        MyAsyncFunction.setExpectedThrownException(expectedException);
        StreamRecord<RowData> record = StreamRecordUtils.insertRecord(100, true, "jklk1");
        this.testHarness.processElement1(record);
        Assertions.assertThatThrownBy(this::waitAllDataProcessed).cause().cause().cause().isEqualTo((Object)expectedException);
    }

    private void waitAllDataProcessed() throws Exception {
        this.testHarness.endAllInputs();
        if (this.latestException.isPresent()) {
            throw new IllegalStateException("Failed to wait all data processed", this.latestException.get());
        }
    }

    private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createDeltaJoinOperatorTestHarness() throws Exception {
        TaskMailboxImpl mailbox = new TaskMailboxImpl();
        MailboxProcessor mailboxProcessor = new MailboxProcessor(controller -> {}, (TaskMailbox)mailbox, StreamTaskActionExecutor.IMMEDIATE);
        DataStructureConverter leftFetcherConverter = DataStructureConverters.getConverter((DataType)leftTypeInfo.getDataType());
        AsyncDeltaJoinRunner leftAsyncFunction = new AsyncDeltaJoinRunner(new GeneratedFunctionWrapper<MyAsyncFunction>(new MyAsyncFunction()), leftFetcherConverter, new GeneratedResultFutureWrapper<TestingFetcherResultFuture>(new TestingFetcherResultFuture()), leftTypeInfo.toRowSerializer(), 100, false);
        DataStructureConverter rightFetcherConverter = DataStructureConverters.getConverter((DataType)rightTypeInfo.getDataType());
        AsyncDeltaJoinRunner rightAsyncFunction = new AsyncDeltaJoinRunner(new GeneratedFunctionWrapper<MyAsyncFunction>(new MyAsyncFunction()), rightFetcherConverter, new GeneratedResultFutureWrapper<TestingFetcherResultFuture>(new TestingFetcherResultFuture()), rightTypeInfo.toRowSerializer(), 100, true);
        InternalTypeInfo joinKeyTypeInfo = leftJoinKeySelector.getProducedType();
        StreamingDeltaJoinOperator operator = new StreamingDeltaJoinOperator(rightAsyncFunction, leftAsyncFunction, leftJoinKeySelector, rightJoinKeySelector, -1L, 100, (ProcessingTimeService)new TestProcessingTimeService(), (MailboxExecutor)new MailboxExecutorImpl((TaskMailbox)mailbox, 0, StreamTaskActionExecutor.IMMEDIATE, mailboxProcessor), (RowType)leftTypeInfo.toLogicalType(), (RowType)rightTypeInfo.toLogicalType());
        return new KeyedTwoInputStreamOperatorTestHarness((TwoInputStreamOperator)operator, (KeySelector)leftJoinKeySelector, (KeySelector)rightJoinKeySelector, (TypeInformation)joinKeyTypeInfo, 1, 1, 0, leftTypeInfo.toSerializer(), rightTypeInfo.toSerializer());
    }

    private void prepareOperatorRuntimeInfo(StreamingDeltaJoinOperator operator) {
        this.unwrapAsyncFunction(operator, true).tagInvokingSideDuringRuntime(true);
        this.unwrapAsyncFunction(operator, false).tagInvokingSideDuringRuntime(false);
    }

    private MyAsyncFunction unwrapAsyncFunction(StreamingDeltaJoinOperator operator, boolean unwrapLeft) {
        if (unwrapLeft) {
            return (MyAsyncFunction)((AsyncDeltaJoinRunner)operator.getLeftTriggeredUserFunction()).getFetcher();
        }
        return (MyAsyncFunction)((AsyncDeltaJoinRunner)operator.getRightTriggeredUserFunction()).getFetcher();
    }

    private TableAsyncExecutionController<RowData, RowData, RowData> unwrapAEC(KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness) {
        return this.unwrapOperator(testHarness).getAsyncExecutionController();
    }

    private StreamingDeltaJoinOperator unwrapOperator(KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness) {
        return (StreamingDeltaJoinOperator)testHarness.getOperator();
    }

    private RowType getOutputType() {
        return RowType.of((LogicalType[])((LogicalType[])Stream.concat(leftTypeInfo.toRowType().getChildren().stream(), rightTypeInfo.toRowType().getChildren().stream()).toArray(LogicalType[]::new)), (String[])((String[])Stream.concat(leftTypeInfo.toRowType().getFieldNames().stream(), rightTypeInfo.toRowType().getFieldNames().stream()).toArray(String[]::new)));
    }

    private void insertLeftTable(StreamRecord<RowData> record) {
        StreamingDeltaJoinOperatorTest.insertTableData(record, true);
    }

    private void insertRightTable(StreamRecord<RowData> record) {
        StreamingDeltaJoinOperatorTest.insertTableData(record, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void insertTableData(StreamRecord<RowData> record, boolean insertLeftTable) {
        block9: {
            RowData rowData = (RowData)record.getValue();
            try {
                if (insertLeftTable) {
                    LinkedList<RowData> linkedList = leftTableCurrentData;
                    synchronized (linkedList) {
                        leftTableCurrentData.add(rowData);
                        break block9;
                    }
                }
                LinkedList<RowData> linkedList = rightTableCurrentData;
                synchronized (linkedList) {
                    rightTableCurrentData.add(rowData);
                }
            }
            catch (Exception e) {
                throw new IllegalStateException("Failed to insert table data", e);
            }
        }
    }

    public static final class TestingFetcherResultFuture
    extends TableFunctionResultFuture<RowData> {
        private static final long serialVersionUID = -312754413938303160L;

        public void complete(Collection<RowData> result) {
            this.getResultFuture().complete(result);
        }
    }

    private static class MyAsyncExecutionControllerDelegate
    extends TableAsyncExecutionController<RowData, RowData, RowData> {
        private static boolean insertTableDataAfterEmit = true;

        public MyAsyncExecutionControllerDelegate(TableAsyncExecutionController<RowData, RowData, RowData> innerAec) {
            super(innerAec.getAsyncInvoke(), innerAec.getEmitWatermark(), entry -> {
                if (insertTableDataAfterEmit) {
                    StreamingDeltaJoinOperator.InputIndexAwareStreamRecordQueueEntry inputIndexAwareEntry = (StreamingDeltaJoinOperator.InputIndexAwareStreamRecordQueueEntry)entry;
                    int inputIndex = inputIndexAwareEntry.getInputIndex();
                    StreamingDeltaJoinOperatorTest.insertTableData((StreamRecord<RowData>)inputIndexAwareEntry.getInputElement(), inputIndex == 0);
                }
                innerAec.getEmitResult().accept(entry);
            }, innerAec.getInferDrivenInputIndex(), innerAec.getInferBlockingKey());
        }
    }

    public static class MyAsyncFunction
    extends RichAsyncFunction<RowData, Object> {
        private static final long serialVersionUID = 1L;
        private static final long TERMINATION_TIMEOUT = 5000L;
        private static final int THREAD_POOL_SIZE = 10;
        private static ExecutorService executorService;
        @Nullable
        private static CountDownLatch lock;
        private static final AtomicInteger leftInvokeCount;
        private static final AtomicInteger rightInvokeCount;
        private static Optional<Throwable> expectedThrownException;
        private Boolean treatRightAsLookupTable;

        public void tagInvokingSideDuringRuntime(boolean isLeftInvoking) {
            this.treatRightAsLookupTable = isLeftInvoking;
        }

        public static void block() throws Exception {
            lock = new CountDownLatch(1);
        }

        public static void release() {
            Objects.requireNonNull(lock).countDown();
        }

        public static void setExpectedThrownException(Throwable t) {
            expectedThrownException = Optional.of(t);
        }

        public static void clearExpectedThrownException() {
            expectedThrownException = Optional.empty();
        }

        public void asyncInvoke(RowData input, ResultFuture<Object> resultFuture) {
            executorService.submit(() -> {
                try {
                    RowDataKeySelector lookupSideJoinKeySelector;
                    RowDataKeySelector streamSideJoinKeySelector;
                    LinkedList<RowData> lookupTableData;
                    LinkedList<RowData> linkedList;
                    if (expectedThrownException.isPresent()) {
                        throw expectedThrownException.get();
                    }
                    if (lock != null) {
                        lock.await();
                    }
                    if (Objects.requireNonNull(this.treatRightAsLookupTable).booleanValue()) {
                        linkedList = rightTableCurrentData;
                        synchronized (linkedList) {
                            lookupTableData = new LinkedList<RowData>(rightTableCurrentData);
                        }
                        streamSideJoinKeySelector = leftJoinKeySelector.copy();
                        lookupSideJoinKeySelector = rightJoinKeySelector.copy();
                        leftInvokeCount.incrementAndGet();
                    } else {
                        linkedList = leftTableCurrentData;
                        synchronized (linkedList) {
                            lookupTableData = new LinkedList<RowData>(leftTableCurrentData);
                        }
                        streamSideJoinKeySelector = rightJoinKeySelector.copy();
                        lookupSideJoinKeySelector = leftJoinKeySelector.copy();
                        rightInvokeCount.incrementAndGet();
                    }
                    ArrayList<RowData> results = new ArrayList<RowData>();
                    for (RowData row : lookupTableData) {
                        if (!((RowData)streamSideJoinKeySelector.getKey((Object)input)).equals(lookupSideJoinKeySelector.getKey((Object)row))) continue;
                        results.add(row);
                    }
                    resultFuture.complete(results);
                }
                catch (Throwable e) {
                    resultFuture.completeExceptionally((Throwable)new RuntimeException("Failed to look up table", e));
                }
            });
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            Class<MyAsyncFunction> clazz = MyAsyncFunction.class;
            synchronized (MyAsyncFunction.class) {
                if (executorService == null) {
                    executorService = Executors.newFixedThreadPool(10);
                }
                // ** MonitorExit[var2_2] (shouldn't be in output)
                return;
            }
        }

        public void close() throws Exception {
            super.close();
            this.freeExecutor();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void freeExecutor() {
            Class<MyAsyncFunction> clazz = MyAsyncFunction.class;
            synchronized (MyAsyncFunction.class) {
                if (executorService == null) {
                    // ** MonitorExit[var1_1] (shouldn't be in output)
                    return;
                }
                executorService.shutdown();
                try {
                    if (!executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                        executorService.shutdownNow();
                    }
                }
                catch (InterruptedException interrupted) {
                    executorService.shutdownNow();
                    Thread.currentThread().interrupt();
                }
                executorService = null;
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return;
            }
        }

        static {
            leftInvokeCount = new AtomicInteger(0);
            rightInvokeCount = new AtomicInteger(0);
            expectedThrownException = Optional.empty();
        }
    }
}

