package org.apache.flink.table.runtime.operators.join;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.runtime.generated.GeneratedCollectorWrapper;
import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.LookupJoinHarnessTest;
import org.apache.flink.table.runtime.operators.join.lookup.KeyedLookupJoinWrapper;
import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.util.Collector;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.class */
public class KeyedLookupJoinHarnessTest {
    private final InternalTypeInfo<RowData> inputRowType = InternalTypeInfo.ofFields(new LogicalType[]{new IntType(), VarCharType.STRING_TYPE});
    private final RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()});
    StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(10000000);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest$FilterOnTable.class */
    public enum FilterOnTable {
        WITH_FILTER,
        WITHOUT_FILTER
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest$JoinType.class */
    public enum JoinType {
        INNER_JOIN,
        LEFT_JOIN
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest$TestingEvolvingOutputFetcherFunction.class */
    public static class TestingEvolvingOutputFetcherFunction extends RichFlatMapFunction<RowData, RowData> {
        private static final long serialVersionUID = 1;
        private static final Map<Integer, List<GenericRowData>> baseData = new HashMap();
        private transient Map<Integer, Integer> accessCounter;

        public void open(Configuration configuration) throws Exception {
            baseData.clear();
            baseData.put(1, Collections.singletonList(GenericRowData.of(new Object[]{1, StringData.fromString("Julian")})));
            baseData.put(3, Arrays.asList(GenericRowData.of(new Object[]{3, StringData.fromString("Jark")}), GenericRowData.of(new Object[]{3, StringData.fromString("Jackson")})));
            baseData.put(4, Collections.singletonList(GenericRowData.of(new Object[]{4, StringData.fromString("Fabian")})));
            this.accessCounter = new HashMap();
        }

        protected int counter(int i) {
            int intValue = this.accessCounter.computeIfAbsent(Integer.valueOf(i), num -> {
                return 0;
            }).intValue() + 1;
            this.accessCounter.put(Integer.valueOf(i), Integer.valueOf(intValue));
            return intValue;
        }

        protected void collectUpdatedRow(RowData rowData, int i, Collector<RowData> collector) {
            if (i > 1) {
                collector.collect(GenericRowData.of(new Object[]{Integer.valueOf(rowData.getInt(0) * i), StringData.fromString(rowData.getString(1) + "-" + i)}));
            } else {
                collector.collect(rowData);
            }
        }

        protected List<GenericRowData> lookup(int i) {
            return baseData.get(Integer.valueOf(i));
        }

        public void flatMap(RowData rowData, Collector<RowData> collector) throws Exception {
            int i = rowData.getInt(0);
            int counter = counter(i);
            List<GenericRowData> lookup = lookup(i);
            if (lookup == null) {
                if (counter > 1) {
                    collector.collect(GenericRowData.of(new Object[]{Integer.valueOf(counter), StringData.fromString("default-" + counter)}));
                }
            } else {
                for (int i2 = 0; i2 < lookup.size(); i2++) {
                    collectUpdatedRow((RowData) lookup.get(i2), counter, collector);
                }
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((RowData) obj, (Collector<RowData>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest$TestingEvolvingOutputFetcherFunctionWithPk.class */
    public static class TestingEvolvingOutputFetcherFunctionWithPk extends TestingEvolvingOutputFetcherFunction {
        @Override // org.apache.flink.table.runtime.operators.join.KeyedLookupJoinHarnessTest.TestingEvolvingOutputFetcherFunction
        public void flatMap(RowData rowData, Collector<RowData> collector) throws Exception {
            int i = rowData.getInt(0);
            int counter = counter(i);
            List<GenericRowData> lookup = lookup(i);
            if (lookup != null) {
                collectUpdatedRow((RowData) lookup.get(0), counter, collector);
            } else if (counter > 1) {
                collector.collect(GenericRowData.of(new Object[]{Integer.valueOf(counter), StringData.fromString("default-" + counter)}));
            }
        }

        @Override // org.apache.flink.table.runtime.operators.join.KeyedLookupJoinHarnessTest.TestingEvolvingOutputFetcherFunction
        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((RowData) obj, (Collector<RowData>) collector);
        }
    }

    @Test
    public void testTemporalInnerJoin() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarness = createHarness(JoinType.INNER_JOIN, FilterOnTable.WITHOUT_FILTER, false);
        createHarness.open();
        createHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        createHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        createHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        createHarness.processElement(StreamRecordUtils.updateBeforeRecord(1, "a"));
        createHarness.processElement(StreamRecordUtils.updateAfterRecord(1, "a2"));
        createHarness.processElement(StreamRecordUtils.deleteRecord(1, "a2"));
        createHarness.processElement(StreamRecordUtils.insertRecord(1, "a3"));
        createHarness.processElement(StreamRecordUtils.deleteRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c2"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        arrayList.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        arrayList.add(StreamRecordUtils.deleteRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(1, "a2", 2, "Julian-2"));
        arrayList.add(StreamRecordUtils.deleteRecord(1, "a2", 2, "Julian-2"));
        arrayList.add(StreamRecordUtils.insertRecord(1, "a3", 3, "Julian-3"));
        arrayList.add(StreamRecordUtils.deleteRecord(3, "c", 3, "Jark"));
        arrayList.add(StreamRecordUtils.deleteRecord(3, "c", 3, "Jackson"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jark-2"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jackson-2"));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createHarness.getOutput());
        createHarness.close();
    }

    @Test
    public void testTemporalInnerJoinLookupKeyContainsPk() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarness = createHarness(JoinType.INNER_JOIN, FilterOnTable.WITHOUT_FILTER, true);
        createHarness.open();
        createHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        createHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        createHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        createHarness.processElement(StreamRecordUtils.updateBeforeRecord(1, "a"));
        createHarness.processElement(StreamRecordUtils.updateAfterRecord(1, "a2"));
        createHarness.processElement(StreamRecordUtils.deleteRecord(1, "a2"));
        createHarness.processElement(StreamRecordUtils.insertRecord(1, "a3"));
        createHarness.processElement(StreamRecordUtils.deleteRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c2"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        arrayList.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        arrayList.add(StreamRecordUtils.deleteRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(1, "a2", 2, "Julian-2"));
        arrayList.add(StreamRecordUtils.deleteRecord(1, "a2", 2, "Julian-2"));
        arrayList.add(StreamRecordUtils.insertRecord(1, "a3", 3, "Julian-3"));
        arrayList.add(StreamRecordUtils.deleteRecord(3, "c", 3, "Jark"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jark-2"));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createHarness.getOutput());
        createHarness.close();
    }

    @Test
    public void testTemporalInnerJoinWithFilter() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarness = createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER, false);
        createHarness.open();
        createHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        createHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        createHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        createHarness.processElement(StreamRecordUtils.updateBeforeRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.updateAfterRecord(3, "c2"));
        createHarness.processElement(StreamRecordUtils.deleteRecord(3, "c2"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c3"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        arrayList.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        arrayList.add(StreamRecordUtils.deleteRecord(3, "c", 3, "Jackson"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jark-2"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jackson-2"));
        arrayList.add(StreamRecordUtils.deleteRecord(3, "c2", 6, "Jark-2"));
        arrayList.add(StreamRecordUtils.deleteRecord(3, "c2", 6, "Jackson-2"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c3", 9, "Jark-3"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c3", 9, "Jackson-3"));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createHarness.getOutput());
        createHarness.close();
    }

    @Test
    public void testTemporalInnerJoinWithFilterLookupKeyContainsPk() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarness = createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER, true);
        createHarness.open();
        createHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        createHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        createHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        createHarness.processElement(StreamRecordUtils.updateBeforeRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.updateAfterRecord(3, "c2"));
        createHarness.processElement(StreamRecordUtils.deleteRecord(3, "c2"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c3"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        arrayList.add(StreamRecordUtils.deleteRecord(3, "c", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jark-2"));
        arrayList.add(StreamRecordUtils.deleteRecord(3, "c2", 6, "Jark-2"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c3", 9, "Jark-3"));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createHarness.getOutput());
        createHarness.close();
    }

    @Test
    public void testTemporalLeftJoin() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarness = createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITHOUT_FILTER, false);
        createHarness.open();
        createHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        createHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        createHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        createHarness.processElement(StreamRecordUtils.updateAfterRecord(2, "b2"));
        createHarness.processElement(StreamRecordUtils.deleteRecord(2, "b2"));
        createHarness.processElement(StreamRecordUtils.insertRecord(2, "b3"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        arrayList.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        arrayList.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(2, "b2", 2, "default-2"));
        arrayList.add(StreamRecordUtils.deleteRecord(2, "b2", 2, "default-2"));
        arrayList.add(StreamRecordUtils.insertRecord(2, "b3", 3, "default-3"));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createHarness.getOutput());
        createHarness.close();
    }

    @Test
    public void testTemporalLeftJoinLookupKeyContainsPk() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarness = createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITHOUT_FILTER, true);
        createHarness.open();
        createHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        createHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        createHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        createHarness.processElement(StreamRecordUtils.updateAfterRecord(2, "b2"));
        createHarness.processElement(StreamRecordUtils.deleteRecord(2, "b2"));
        createHarness.processElement(StreamRecordUtils.insertRecord(2, "b3"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        arrayList.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        arrayList.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(2, "b2", 2, "default-2"));
        arrayList.add(StreamRecordUtils.deleteRecord(2, "b2", 2, "default-2"));
        arrayList.add(StreamRecordUtils.insertRecord(2, "b3", 3, "default-3"));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createHarness.getOutput());
        createHarness.close();
    }

    @Test
    public void testTemporalLeftJoinWithFilter() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarness = createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER, false);
        createHarness.open();
        createHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        createHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        createHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        createHarness.processElement(StreamRecordUtils.deleteRecord(2, "b"));
        createHarness.processElement(StreamRecordUtils.insertRecord(2, "b2"));
        createHarness.processElement(StreamRecordUtils.updateBeforeRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.updateAfterRecord(3, "c2"));
        createHarness.processElement(StreamRecordUtils.deleteRecord(3, "c2"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c3"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        arrayList.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        arrayList.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        arrayList.add(StreamRecordUtils.deleteRecord(2, "b", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(2, "b2", 2, "default-2"));
        arrayList.add(StreamRecordUtils.deleteRecord(3, "c", 3, "Jackson"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jark-2"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jackson-2"));
        arrayList.add(StreamRecordUtils.deleteRecord(3, "c2", 6, "Jark-2"));
        arrayList.add(StreamRecordUtils.deleteRecord(3, "c2", 6, "Jackson-2"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c3", 9, "Jark-3"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c3", 9, "Jackson-3"));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createHarness.getOutput());
        createHarness.close();
    }

    @Test
    public void testTemporalLeftJoinWithFilterLookupKeyContainsPk() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarness = createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER, true);
        createHarness.open();
        createHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        createHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        createHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        createHarness.processElement(StreamRecordUtils.deleteRecord(2, "b"));
        createHarness.processElement(StreamRecordUtils.insertRecord(2, "b2"));
        createHarness.processElement(StreamRecordUtils.updateBeforeRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.updateAfterRecord(3, "c2"));
        createHarness.processElement(StreamRecordUtils.deleteRecord(3, "c2"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c3"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        arrayList.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        arrayList.add(StreamRecordUtils.deleteRecord(2, "b", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(2, "b2", 2, "default-2"));
        arrayList.add(StreamRecordUtils.deleteRecord(3, "c", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c2", 6, "Jark-2"));
        arrayList.add(StreamRecordUtils.deleteRecord(3, "c2", 6, "Jark-2"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c3", 9, "Jark-3"));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createHarness.getOutput());
        createHarness.close();
    }

    private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarness(JoinType joinType, FilterOnTable filterOnTable, boolean z) throws Exception {
        boolean z2 = joinType == JoinType.LEFT_JOIN;
        Function testingEvolvingOutputFetcherFunctionWithPk = z ? new TestingEvolvingOutputFetcherFunctionWithPk() : new TestingEvolvingOutputFetcherFunction();
        KeyedProcessOperator keyedProcessOperator = new KeyedProcessOperator(new KeyedLookupJoinWrapper(filterOnTable == FilterOnTable.WITHOUT_FILTER ? new LookupJoinRunner(new GeneratedFunctionWrapper(testingEvolvingOutputFetcherFunctionWithPk), new GeneratedCollectorWrapper(new LookupJoinHarnessTest.TestingFetcherCollector()), z2, 2) : new LookupJoinWithCalcRunner(new GeneratedFunctionWrapper(testingEvolvingOutputFetcherFunctionWithPk), new GeneratedFunctionWrapper(new LookupJoinHarnessTest.CalculateOnTemporalTable()), new GeneratedCollectorWrapper(new LookupJoinHarnessTest.TestingFetcherCollector()), z2, 2), this.ttlConfig, new RowDataSerializer(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()}), z));
        RowDataKeySelector rowDataSelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, this.inputRowType.toRowFieldTypes());
        return new KeyedOneInputStreamOperatorTestHarness<>(keyedProcessOperator, rowDataSelector, rowDataSelector.getProducedType());
    }
}
