/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.runtime.generated.GeneratedCollectorWrapper;
import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.LookupJoinHarnessTest;
import org.apache.flink.table.runtime.operators.join.lookup.KeyedLookupJoinWrapper;
import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Test;

class KeyedLookupJoinHarnessTest {
    private final InternalTypeInfo<RowData> inputRowType = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{new IntType(), VarCharType.STRING_TYPE});
    private final RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()});

    KeyedLookupJoinHarnessTest() {
    }

    @Test
    void testTemporalInnerJoin() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarness(JoinType.INNER_JOIN, FilterOnTable.WITHOUT_FILTER, false);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord(1, "a2"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(1, "a2"));
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a3"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c2"));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a2", 2, "Julian-2"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(1, "a2", 2, "Julian-2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a3", 3, "Julian-3"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(3, "c", 3, "Jark"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jark-2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jackson-2"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTemporalInnerJoinLookupKeyContainsPk() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarness(JoinType.INNER_JOIN, FilterOnTable.WITHOUT_FILTER, true);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord(1, "a2"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(1, "a2"));
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a3"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c2"));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a2", 2, "Julian-2"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(1, "a2", 2, "Julian-2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a3", 3, "Julian-3"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(3, "c", 3, "Jark"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jark-2"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTemporalInnerJoinWithFilter() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER, false);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord(3, "c2"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(3, "c2"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c3"));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jark-2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jackson-2"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(3, "c2", 6, "Jark-2"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(3, "c2", 6, "Jackson-2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c3", 9, "Jark-3"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c3", 9, "Jackson-3"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTemporalInnerJoinWithFilterLookupKeyContainsPk() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER, true);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord(3, "c2"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(3, "c2"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c3"));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jark-2"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(3, "c2", 6, "Jark-2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c3", 9, "Jark-3"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTemporalLeftJoin() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITHOUT_FILTER, false);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord(2, "b2"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(2, "b2"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b3"));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b2", 2, "default-2"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(2, "b2", 2, "default-2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b3", 3, "default-3"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTemporalLeftJoinLookupKeyContainsPk() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITHOUT_FILTER, true);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord(2, "b2"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(2, "b2"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b3"));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b2", 2, "default-2"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(2, "b2", 2, "default-2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b3", 3, "default-3"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTemporalLeftJoinWithFilter() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER, false);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b2"));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord(3, "c2"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(3, "c2"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c3"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(4, "d"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, null));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        expectedOutput.add(StreamRecordUtils.deleteRecord(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b2", 2, "default-2"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jark-2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jackson-2"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(3, "c2", 6, "Jark-2"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(3, "c2", 6, "Jackson-2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c3", 9, "Jark-3"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c3", 9, "Jackson-3"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, null, null, null));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTemporalLeftJoinWithFilterLookupKeyContainsPk() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER, true);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b2"));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord(3, "c2"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(3, "c2"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c3"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(4, "d"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, null));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        expectedOutput.add(StreamRecordUtils.deleteRecord(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b2", 2, "default-2"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(3, "c", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jark-2"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(3, "c2", 6, "Jark-2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c3", 9, "Jark-3"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, null, null, null));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTemporalLeftJoinWithTtlLookupKeyContainsPk() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER, true, 1000L);
        testHarness.open();
        testHarness.setStateTtlProcessingTime(1L);
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        testHarness.setStateTtlProcessingTime(1002L);
        testHarness.processElement(StreamRecordUtils.deleteRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b2"));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord(3, "c2"));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", null, null));
        expectedOutput.add(StreamRecordUtils.deleteRecord(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b2", 2, "default-2"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(3, "c", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jark-2"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTemporalLeftJoinWithTtlWithoutPk() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER, false, 1000L);
        testHarness.open();
        testHarness.setStateTtlProcessingTime(1L);
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        testHarness.setStateTtlProcessingTime(1002L);
        testHarness.processElement(StreamRecordUtils.deleteRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b2"));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord(3, "c2"));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b2", 2, "default-2"));
        expectedOutput.add(StreamRecordUtils.deleteRecord(3, "c", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jark-2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jackson-2"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTemporalInnerJoinWithTtlLookupKeyContainsPk() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER, true, 1000L);
        testHarness.open();
        testHarness.setStateTtlProcessingTime(1L);
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        testHarness.setStateTtlProcessingTime(1002L);
        testHarness.processElement(StreamRecordUtils.deleteRecord(2, "b"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, "b2"));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord(3, "c"));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord(3, "c2"));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b2", 2, "default-2"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jark-2"));
        this.assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarness(JoinType joinType, FilterOnTable filterOnTable, boolean lookupKeyContainsPrimaryKey) throws Exception {
        return this.createHarness(joinType, filterOnTable, lookupKeyContainsPrimaryKey, -1L);
    }

    private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarness(JoinType joinType, FilterOnTable filterOnTable, boolean lookupKeyContainsPrimaryKey, long stateTtl) throws Exception {
        StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig((long)(stateTtl < 1L ? 1000000L : stateTtl));
        boolean isLeftJoin = joinType == JoinType.LEFT_JOIN;
        TestingEvolvingOutputFetcherFunction fetcher = lookupKeyContainsPrimaryKey ? new TestingEvolvingOutputFetcherFunctionWithPk() : new TestingEvolvingOutputFetcherFunction();
        Object joinRunner = filterOnTable == FilterOnTable.WITHOUT_FILTER ? new LookupJoinRunner(new GeneratedFunctionWrapper<TestingEvolvingOutputFetcherFunctionWithPk>((TestingEvolvingOutputFetcherFunctionWithPk)fetcher), new GeneratedCollectorWrapper<LookupJoinHarnessTest.TestingFetcherCollector>(new LookupJoinHarnessTest.TestingFetcherCollector()), new GeneratedFunctionWrapper<LookupJoinHarnessTest.TestingPreFilterCondition>(new LookupJoinHarnessTest.TestingPreFilterCondition()), isLeftJoin, 2) : new LookupJoinWithCalcRunner(new GeneratedFunctionWrapper<TestingEvolvingOutputFetcherFunctionWithPk>((TestingEvolvingOutputFetcherFunctionWithPk)fetcher), new GeneratedFunctionWrapper<LookupJoinHarnessTest.CalculateOnTemporalTable>(new LookupJoinHarnessTest.CalculateOnTemporalTable()), new GeneratedCollectorWrapper<LookupJoinHarnessTest.TestingFetcherCollector>(new LookupJoinHarnessTest.TestingFetcherCollector()), new GeneratedFunctionWrapper<LookupJoinHarnessTest.TestingPreFilterCondition>(new LookupJoinHarnessTest.TestingPreFilterCondition()), isLeftJoin, 2);
        RowDataSerializer temporalSerializer = new RowDataSerializer(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()});
        KeyedLookupJoinWrapper keyedLookupJoinWrapper = new KeyedLookupJoinWrapper(joinRunner, ttlConfig, (TypeSerializer)temporalSerializer, lookupKeyContainsPrimaryKey);
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)keyedLookupJoinWrapper);
        RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, this.inputRowType.toRowFieldTypes());
        return new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)keySelector, (TypeInformation)keySelector.getProducedType());
    }

    public static class TestingEvolvingOutputFetcherFunction
    extends RichFlatMapFunction<RowData, RowData> {
        private static final long serialVersionUID = 1L;
        private static final Map<Integer, List<GenericRowData>> baseData = new HashMap<Integer, List<GenericRowData>>();
        private transient Map<Integer, Integer> accessCounter;

        public void open(OpenContext openContext) throws Exception {
            baseData.clear();
            baseData.put(1, Collections.singletonList(GenericRowData.of((Object[])new Object[]{1, StringData.fromString((String)"Julian")})));
            baseData.put(3, Arrays.asList(GenericRowData.of((Object[])new Object[]{3, StringData.fromString((String)"Jark")}), GenericRowData.of((Object[])new Object[]{3, StringData.fromString((String)"Jackson")})));
            baseData.put(4, Collections.singletonList(GenericRowData.of((Object[])new Object[]{4, StringData.fromString((String)"Fabian")})));
            this.accessCounter = new HashMap<Integer, Integer>();
        }

        protected int counter(int id) {
            int currentCnt = this.accessCounter.computeIfAbsent(id, key -> 0) + 1;
            this.accessCounter.put(id, currentCnt);
            return currentCnt;
        }

        protected void collectUpdatedRow(RowData originalRow, int currentCnt, Collector<RowData> out) {
            if (currentCnt > 1) {
                out.collect((Object)GenericRowData.of((Object[])new Object[]{originalRow.getInt(0) * currentCnt, StringData.fromString((String)(originalRow.getString(1) + "-" + currentCnt))}));
            } else {
                out.collect((Object)originalRow);
            }
        }

        protected List<GenericRowData> lookup(int id) {
            return baseData.get(id);
        }

        public void flatMap(RowData value, Collector<RowData> out) throws Exception {
            int id = value.getInt(0);
            int currentCnt = this.counter(id);
            List<GenericRowData> rows = this.lookup(id);
            if (rows != null) {
                for (GenericRowData row : rows) {
                    this.collectUpdatedRow((RowData)row, currentCnt, out);
                }
            } else if (currentCnt > 1) {
                out.collect((Object)GenericRowData.of((Object[])new Object[]{currentCnt, StringData.fromString((String)("default-" + currentCnt))}));
            }
        }
    }

    public static class TestingEvolvingOutputFetcherFunctionWithPk
    extends TestingEvolvingOutputFetcherFunction {
        @Override
        public void flatMap(RowData value, Collector<RowData> out) throws Exception {
            int id = value.getInt(0);
            int currentCnt = this.counter(id);
            List<GenericRowData> rows = this.lookup(id);
            if (rows != null) {
                this.collectUpdatedRow((RowData)rows.get(0), currentCnt, out);
            } else if (currentCnt > 1) {
                out.collect((Object)GenericRowData.of((Object[])new Object[]{currentCnt, StringData.fromString((String)("default-" + currentCnt))}));
            }
        }
    }

    private static enum FilterOnTable {
        WITH_FILTER,
        WITHOUT_FILTER;

    }

    private static enum JoinType {
        INNER_JOIN,
        LEFT_JOIN;

    }
}

