/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.stream;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.bundle.trigger.CoBundleTrigger;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountCoBundleTrigger;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.stream.MiniBatchStreamingJoinOperator;
import org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperatorTestBase;
import org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;

final class StreamingMiniBatchJoinOperatorTest
extends StreamingJoinOperatorTestBase {
    private RowDataKeySelector leftUniqueKeySelector;
    private RowDataKeySelector rightUniqueKeySelector;
    private final Function<String, JoinInputSideSpec[]> inputSpecExtractor = testDisplayName -> {
        if (testDisplayName.contains("JoinKeyContainsUniqueKey")) {
            return new JoinInputSideSpec[]{JoinInputSideSpec.withUniqueKeyContainedByJoinKey((InternalTypeInfo)this.leftTypeInfo, (KeySelector)this.leftUniqueKeySelector), JoinInputSideSpec.withUniqueKeyContainedByJoinKey((InternalTypeInfo)this.rightTypeInfo, (KeySelector)this.rightUniqueKeySelector)};
        }
        if (testDisplayName.contains("HasUniqueKey")) {
            return new JoinInputSideSpec[]{JoinInputSideSpec.withUniqueKey((InternalTypeInfo)this.leftTypeInfo, (KeySelector)this.leftUniqueKeySelector), JoinInputSideSpec.withUniqueKey((InternalTypeInfo)this.rightTypeInfo, (KeySelector)this.rightUniqueKeySelector)};
        }
        return new JoinInputSideSpec[]{JoinInputSideSpec.withoutUniqueKey(), JoinInputSideSpec.withoutUniqueKey()};
    };
    private final Function<String, RowDataKeySelector[]> ukSelectorExtractor = testDisplayName -> {
        if (testDisplayName.contains("JoinKeyContainsUniqueKey")) {
            return new RowDataKeySelector[]{HandwrittenSelectorUtil.getRowDataSelector(new int[]{1}, this.leftTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])), HandwrittenSelectorUtil.getRowDataSelector(new int[]{1}, this.rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]))};
        }
        if (testDisplayName.contains("HasUniqueKey")) {
            return new RowDataKeySelector[]{HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, this.leftTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])), HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, this.rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]))};
        }
        return new RowDataKeySelector[]{null, null};
    };
    private final Function<Set<String>, Integer> miniBatchSizeExtractor = tags -> {
        int size = 5;
        if (tags.isEmpty()) {
            return size;
        }
        for (String tag : tags) {
            String[] splits = tag.split("=");
            int value = Integer.parseInt(splits[1].trim());
            if (!splits[0].trim().startsWith("miniBatchSize")) continue;
            size = value;
            break;
        }
        return size;
    };
    private final Function<String, Boolean[]> joinTypeExtractor = testDisplayName -> {
        if (testDisplayName.contains("InnerJoin")) {
            return new Boolean[]{false, false};
        }
        if (testDisplayName.contains("LeftJoin")) {
            return new Boolean[]{true, false};
        }
        if (testDisplayName.contains("RightJoin")) {
            return new Boolean[]{false, true};
        }
        return new Boolean[]{true, true};
    };
    private final Function<String, FlinkJoinType> flinkJoinTypeExtractor = testDisplayName -> {
        if (testDisplayName.contains("InnerJoin")) {
            return FlinkJoinType.INNER;
        }
        if (testDisplayName.contains("LeftJoin")) {
            return FlinkJoinType.LEFT;
        }
        if (testDisplayName.contains("RightJoin")) {
            return FlinkJoinType.RIGHT;
        }
        return FlinkJoinType.FULL;
    };
    private final Function<String, InternalTypeInfo<RowData>> leftTypeInfoExtractor = testDisplayName -> {
        if (testDisplayName.contains("SimpleSchema")) {
            return InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{new CharType(false, 1), new BigIntType()}, (String[])new String[]{"id1", "val1"}));
        }
        return this.leftTypeInfo;
    };
    private final Function<String, InternalTypeInfo<RowData>> rightTypeInfoExtractor = testDisplayName -> {
        if (testDisplayName.contains("SimpleSchema")) {
            return InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{new CharType(false, 1), new BigIntType()}, (String[])new String[]{"id2", "val2"}));
        }
        return InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{new CharType(false, 20), new CharType(false, 20), new CharType(true, 10)}, (String[])new String[]{"order_id#", "line_order_id0", "line_order_ship_mode"}));
    };
    private final Function<String, RowDataKeySelector> leftKeySelectorExtractor = testDisplayName -> {
        if (testDisplayName.contains("SimpleSchema")) {
            return HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, this.leftTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]));
        }
        return this.leftKeySelector;
    };
    private final Function<String, RowDataKeySelector> rightKeySelectorExtractor = testDisplayName -> {
        if (testDisplayName.contains("SimpleSchema")) {
            return HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, this.rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]));
        }
        return HandwrittenSelectorUtil.getRowDataSelector(new int[]{1}, this.rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]));
    };

    StreamingMiniBatchJoinOperatorTest() {
    }

    @Tag(value="miniBatchSize=3")
    @Test
    void testLeftJoinWithLeftArriveFirst() throws Exception {
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#2", "AIR"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464", "Ord#X", "LineOrd#2", "AIR"));
    }

    @Tag(value="miniBatchSize=1")
    @Test
    void testLeftJoinWithLeftArriveFirstNoMiniBatch() throws Exception {
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#2", "AIR"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464", "Ord#X", "LineOrd#2", "AIR"));
    }

    @Tag(value="miniBatchSize=3")
    @Test
    void testRightJoinWithRightArriveFirst() throws Exception {
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#2", "AIR"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464", "Ord#X", "LineOrd#2", "AIR"));
    }

    @Tag(value="miniBatchSize=1")
    @Test
    void testRightJoinWithRightArriveFirstWithNoMiniBatch() throws Exception {
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#2", "AIR"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, null, null, null, "Ord#X", "LineOrd#2", "AIR"), StreamRecordUtils.rowOfKind(RowKind.DELETE, null, null, null, "Ord#X", "LineOrd#2", "AIR"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464", "Ord#X", "LineOrd#2", "AIR"));
    }

    @Tag(value="miniBatchSize=3")
    @Test
    void testFullJoinWithRightArriveFirst() throws Exception {
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#2", "AIR"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, null, null, null, "Ord#X", "LineOrd#2", "AIR"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.DELETE, null, null, null, "Ord#X", "LineOrd#2", "AIR"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464", "Ord#X", "LineOrd#2", "AIR"));
    }

    @Tag(value="miniBatchSize=1")
    @Test
    void testFullJoinWithRightArriveFirstWithNoMiniBatch() throws Exception {
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#2", "AIR"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, null, null, null, "Ord#X", "LineOrd#2", "AIR"), StreamRecordUtils.rowOfKind(RowKind.DELETE, null, null, null, "Ord#X", "LineOrd#2", "AIR"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 19464", "Ord#X", "LineOrd#2", "AIR"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464", null, null, null));
    }

    @Tag(value="miniBatchSize=4")
    @Test
    void testInnerJoinJoinKeyContainsUniqueKeyWithoutFold() throws Exception {
        this.testHarness.setStateTtlProcessingTime(1L);
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "1 Bellevue Drive, Pottstown, PA 19464"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "2 Bellevue Drive, Pottstown, PA 19464"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#Y", "LineOrd#1", "TRUCK"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#2", "AIR"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#1", "1 Bellevue Drive, Pottstown, PA 19464", "Ord#Y", "LineOrd#1", "TRUCK"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#2", "2 Bellevue Drive, Pottstown, PA 19464", "Ord#X", "LineOrd#2", "AIR"));
    }

    @Tag(value="miniBatchSize=18")
    @Test
    void testInnerJoinWithJoinKeyContainsUniqueKeyWithinBatch() throws Exception {
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#2", "LineOrd#1", "4 Bellevue Drive, Pottstown, PB 19464"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#7", "RAILWAY"));
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464"));
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#x5", "LineOrd#5", "x3 Bellevue Drive, Pottstown, PAxx 19464"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464"));
        this.testHarness.processElement2(StreamRecordUtils.deleteRecord("Ord#X", "LineOrd#7", "RAILWAY"));
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#3", "LineOrd#x3", "x5 Bellevue Drive, Pottstown, PCxx 19464"));
        this.testHarness.processElement1(StreamRecordUtils.deleteRecord("Ord#3", "LineOrd#x3", "14y0 Bellevue Drive, Pottstown, PJyy 19464"));
        this.testHarness.processElement2(StreamRecordUtils.updateAfterRecord("Ord#X", "LineOrd#7", "AIR"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 19464"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement2(StreamRecordUtils.updateBeforeRecord("Ord#X", "LineOrd#7", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#2", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.updateBeforeRecord("Ord#X", "LineOrd#2", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#1", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.updateBeforeRecord("Ord#Y", "LineOrd#5", "TRUCK"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#Y", "LineOrd#6", "RAILWAY"));
        this.testHarness.processElement2(StreamRecordUtils.updateBeforeRecord("Ord#Z", "LineOrd#4", "RAILWAY"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464", "Ord#Y", "LineOrd#6", "RAILWAY"), StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, "Ord#2", "LineOrd#1", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#X", "LineOrd#1", "AIR"));
    }

    @Tag(value="miniBatchSize=10")
    @Test
    void testInnerJoinWithJoinKeyContainsUniqueKeyCrossBatches() throws Exception {
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#Y", "LineOrd#4", "TRUCK"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 19464"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464"));
        this.testHarness.processElement1(StreamRecordUtils.updateBeforeRecord("Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464"));
        this.testHarness.processElement1(StreamRecordUtils.updateBeforeRecord("Ord#9", "LineOrd#9", "11 Bellevue Drive, Pottstown, PI 19464"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#9", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.updateAfterRecord("Ord#xyz", "LineOrd#1", "SHIP"));
        this.testHarness.processElement2(StreamRecordUtils.deleteRecord("Ord#Y", "LineOrd#4", "TRUCK"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_BEFORE, "Ord#9", "LineOrd#9", "11 Bellevue Drive, Pottstown, PI 19464", "Ord#X", "LineOrd#9", "AIR"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464", "Ord#xyz", "LineOrd#1", "SHIP"));
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#adjust", "LineOrd#4", "14 Bellevue Drive, Pottstown, PJ 19464"));
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#18", "LineOrd#9", "22 Bellevue Drive, Pottstown, PK 19464"));
        this.testHarness.processElement2(StreamRecordUtils.deleteRecord("Ord#X", "LineOrd#x3", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.updateBeforeRecord("Ord#xyz", "LineOrd#1", "SHIP"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#Y", "LineOrd#4", "TRUCK"));
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#14", "LineOrd#4", "18 Bellevue Drive, Pottstown, PL 19464"));
        this.testHarness.processElement1(StreamRecordUtils.deleteRecord("Ord#3", "LineOrd#x3", "x5 Bellevue Drive, Pottstown, PCxx 19464"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#3", "LineOrd#x3", "x5 Bellevue Drive, Pottstown, PCxx 19464"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#10", "LineOrd#100y", "14y0 Bellevue Drive, Pottstown, PJyy 19464"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement2(StreamRecordUtils.updateAfterRecord("Ord#101", "LineOrd#x3", "AIR"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 19464", "Ord#Y", "LineOrd#4", "TRUCK"), StreamRecordUtils.rowOfKind(RowKind.UPDATE_BEFORE, "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464", "Ord#xyz", "LineOrd#1", "SHIP"), StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#3", "LineOrd#x3", "x5 Bellevue Drive, Pottstown, PCxx 19464", "Ord#101", "LineOrd#x3", "AIR"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#3", "LineOrd#x3", "x5 Bellevue Drive, Pottstown, PCxx 19464", "Ord#101", "LineOrd#x3", "AIR"), StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, "Ord#18", "LineOrd#9", "22 Bellevue Drive, Pottstown, PK 19464", "Ord#X", "LineOrd#9", "AIR"), StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, "Ord#14", "LineOrd#4", "18 Bellevue Drive, Pottstown, PL 19464", "Ord#Y", "LineOrd#4", "TRUCK"));
    }

    @Tag(value="miniBatchSize=13")
    @Test
    void testInnerJoinWithHasUniqueKeyWithinBatch() throws Exception {
        List<StreamRecord> records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.insertRecord("Ord#3", "LineOrd#10", "5 Bellevue Drive, Pottstown, PC 19464"), StreamRecordUtils.updateAfterRecord("Ord#3", "LineOrd#10", "xxx Bellevue Drive, Pottstown, PJ 19464"), StreamRecordUtils.updateAfterRecord("Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464"), StreamRecordUtils.insertRecord("Ord#6", "LineOrd#5", "8 Bellevue Drive, Pottstown, PF 19464"), StreamRecordUtils.updateBeforeRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.deleteRecord("Ord#6", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464"), StreamRecordUtils.updateBeforeRecord("Ord#12", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 19464"), StreamRecordUtils.updateAfterRecord("Ord#9", "LineOrd#3", "5 Bellevue Drive, Pottstown, PC 19464"), StreamRecordUtils.deleteRecord("Ord#9", "LineOrd#3", "5 Bellevue Drive, Pottstown, PC 19464"));
        for (StreamRecord row : records) {
            this.testHarness.processElement1(row);
        }
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#5", "LineOrd#2", "SHIP"), StreamRecordUtils.updateAfterRecord("Ord#6", "LineOrd#5", "AIR"));
        for (StreamRecord row : records) {
            this.testHarness.processElement2(row);
        }
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, "Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464", "Ord#6", "LineOrd#5", "AIR"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#5", "LineOrd#2", "SHIP"));
    }

    @Tag(value="miniBatchSize=8")
    @Test
    void testInnerJoinWithHasUniqueKeyCrossBatches() throws Exception {
        this.testHarness.processElement1(StreamRecordUtils.updateBeforeRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#5", "LineOrd#5", "SHIP"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#4", "LineOrd#4", "5 Bellevue Drive, Pottstown, PC 19464"));
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 19464"));
        this.testHarness.processElement2(StreamRecordUtils.updateAfterRecord("Ord#22", "LineOrd#4", "SHIP"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#23", "LineOrd#10", "AIR"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#4", "LineOrd#4", "xxx Bellevue Drive, Pottstown, PJ 19464"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, "Ord#4", "LineOrd#4", "xxx Bellevue Drive, Pottstown, PJ 19464", "Ord#22", "LineOrd#4", "SHIP"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464"));
        this.testHarness.processElement1(StreamRecordUtils.updateBeforeRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive, Pottstown, PF 19464"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#21", "LineOrd#5", "RAILWAY"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#5", "TRUCK"));
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive, Pottstown, PF 19464"));
        this.testHarness.processElement1(StreamRecordUtils.deleteRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"));
        this.testHarness.processElement2(StreamRecordUtils.updateBeforeRecord("Ord#5", "LineOrd#5", "SHIP"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement2(StreamRecordUtils.updateBeforeRecord("Ord#22", "LineOrd#6", "AIR"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464", "Ord#21", "LineOrd#5", "RAILWAY"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464", "Ord#1", "LineOrd#5", "TRUCK"));
    }

    @Tag(value="miniBatchSize=20")
    @Test
    void testInnerJoinWithNoUniqueKeyWithinBatch() throws Exception {
        List<StreamRecord> records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.deleteRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive, Pottstown, PF 19464"), StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.insertRecord("Ord#3", "LineOrd#3", "5 Bellevue Drive, Pottstown, PD 19464"), StreamRecordUtils.updateBeforeRecord("Ord#3", "LineOrd#3", "5 Bellevue Drive, Pottstown, PD 19464"), StreamRecordUtils.updateBeforeRecord("Ord#9", "LineOrd#9", "11 Bellevue Drive, Pottstown, PI 19464"), StreamRecordUtils.updateAfterRecord("Ord#10", "LineOrd#10", "14 Bellevue Drive, Pottstown, PJ 19464"), StreamRecordUtils.updateAfterRecord("Ord#18", "LineOrd#18", "22 Bellevue Drive, Pottstown, PK 19464"), StreamRecordUtils.deleteRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.updateAfterRecord("Ord#9", "LineOrd#9", "11 Bellevue Drive, Pottstown, PI 19464"), StreamRecordUtils.deleteRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive, Pottstown, PF 19464"), StreamRecordUtils.insertRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive, Pottstown, PF 19464"));
        for (StreamRecord row : records) {
            this.testHarness.processElement2(row);
        }
        records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "AIR"), StreamRecordUtils.updateAfterRecord("Ord#1", "LineOrd#2", "SHIP"), StreamRecordUtils.updateBeforeRecord("Ord#1", "LineOrd#2", "RAILWAY"), StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "RAILWAY"), StreamRecordUtils.deleteRecord("Ord#6", "LineOrd#6", "RAILWAY"), StreamRecordUtils.insertRecord("Ord#6", "LineOrd#6", "RAILWAY"));
        for (StreamRecord row : records) {
            this.testHarness.processElement1(row);
        }
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#1", "AIR", "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#1", "AIR", "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, "Ord#1", "LineOrd#2", "SHIP", "Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"));
    }

    @Tag(value="miniBatchSize=4")
    @Test
    void testInnerJoinWithNoUniqueKeyHashCollisionSimpleSchema() throws Exception {
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("1", 1L));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("1", 0x100000000L));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("1", 0x100000000L));
        this.testHarness.processElement2(StreamRecordUtils.deleteRecord("1", 1L));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "1", 0x100000000L, "1", 0x100000000L));
    }

    @Tag(value="miniBatchSize=6")
    @Test
    void testInnerJoinWithNoUniqueKeyCrossBatches() throws Exception {
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "AIR"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"));
        this.testHarness.processElement1(StreamRecordUtils.deleteRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive, Pottstown, PF 19464"));
        this.testHarness.processElement2(StreamRecordUtils.updateAfterRecord("Ord#1", "LineOrd#2", "SHIP"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#3", "LineOrd#3", "5 Bellevue Drive, Pottstown, PD 19464"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#1", "LineOrd#2", "SHIP"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#1", "LineOrd#2", "SHIP"));
        this.testHarness.processElement1(StreamRecordUtils.updateBeforeRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"));
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#3", "AIR"));
        this.testHarness.processElement1(StreamRecordUtils.deleteRecord("Ord#6", "LineOrd#1", "8 Bellevue Drive, Pottstown, PF 19464"));
        this.testHarness.processElement2(StreamRecordUtils.deleteRecord("Ord#1", "LineOrd#2", "SHIP"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#3", "LineOrd#3", "5 Bellevue Drive, Pottstown, PD 19464", "Ord#1", "LineOrd#3", "AIR"), StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#1", "LineOrd#2", "SHIP"), StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#1", "LineOrd#2", "SHIP"), StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#6", "LineOrd#1", "8 Bellevue Drive, Pottstown, PF 19464", "Ord#1", "LineOrd#1", "AIR"));
    }

    @Tag(value="miniBatchSize=10")
    @Test
    void testLeftJoinWithJoinKeyContainsUniqueKey() throws Exception {
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"));
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#2", "LineOrd#1", "4 Bellevue Drive, Pottstown, PB 19464"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464"));
        this.testHarness.processElement1(StreamRecordUtils.updateAfterRecord("Ord#3", "LineOrd#x3", "x5 Bellevue Drive, Pottstown, PCxx 19464"));
        this.testHarness.processElement1(StreamRecordUtils.deleteRecord("Ord#3", "LineOrd#x3", "14y0 Bellevue Drive, Pottstown, PJyy 19464"));
        this.testHarness.processElement1(StreamRecordUtils.insertRecord("Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 19464"));
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#2", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.updateBeforeRecord("Ord#X", "LineOrd#2", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#1", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#Y", "LineOrd#6", "RAILWAY"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464", "Ord#Y", "LineOrd#6", "RAILWAY"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#2", "LineOrd#1", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#X", "LineOrd#1", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.updateBeforeRecord("Ord#Y", "LineOrd#6", "RAILWAY"));
        this.testHarness.processElement2(StreamRecordUtils.updateAfterRecord("Ord#UU", "LineOrd#6", "SHIP"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#3", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#2", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#7", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#8", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#9", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#10", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.insertRecord("Ord#X", "LineOrd#11", "AIR"));
        this.testHarness.processElement2(StreamRecordUtils.updateBeforeRecord("Ord#X", "LineOrd#1", "AIR"));
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_BEFORE, "Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464", "Ord#Y", "LineOrd#6", "RAILWAY"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464", "Ord#UU", "LineOrd#6", "SHIP"), StreamRecordUtils.rowOfKind(RowKind.UPDATE_BEFORE, "Ord#2", "LineOrd#1", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#X", "LineOrd#1", "AIR"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#2", "LineOrd#1", "4 Bellevue Drive, Pottstown, PB 19464", null, null, null));
    }

    @Tag(value="miniBatchSize=4")
    @Test
    void testLeftJoinWithHasUniqueKey() throws Exception {
        List<StreamRecord> records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.insertRecord("Ord#3", "LineOrd#10", "5 Bellevue Drive, Pottstown, PC 19464"), StreamRecordUtils.updateAfterRecord("Ord#3", "LineOrd#10", "xxx Bellevue Drive, Pottstown, PJ 19464"), StreamRecordUtils.updateAfterRecord("Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464"), StreamRecordUtils.insertRecord("Ord#6", "LineOrd#5", "8 Bellevue Drive, Pottstown, PF 19464"), StreamRecordUtils.updateBeforeRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.deleteRecord("Ord#6", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464"), StreamRecordUtils.updateAfterRecord("Ord#6", "LineOrd#7", "8 Bellevue Drive, Pottstown, PF 19464"), StreamRecordUtils.updateAfterRecord("Ord#6", "LineOrd#7", "9 Bellevue Drive, Pottstown, PF 19464"));
        for (StreamRecord row : records) {
            this.testHarness.processElement1(row);
        }
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#3", "LineOrd#10", "xxx Bellevue Drive, Pottstown, PJ 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464", null, null, null));
        records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#5", "LineOrd#2", "SHIP"), StreamRecordUtils.updateAfterRecord("Ord#6", "LineOrd#4", "AIR"), StreamRecordUtils.updateBeforeRecord("Ord#5", "LineOrd#2", "SHIP"), StreamRecordUtils.updateAfterRecord("Ord#5", "LineOrd#2", "TRUCK"), StreamRecordUtils.updateAfterRecord("Ord#7", "LineOrd#0", "AIR"), StreamRecordUtils.updateAfterRecord("Ord#8", "LineOrd#11", "AIR"));
        for (StreamRecord row : records) {
            this.testHarness.processElement2(row);
        }
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#5", "LineOrd#2", "SHIP"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#6", "LineOrd#7", "9 Bellevue Drive, Pottstown, PF 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.UPDATE_BEFORE, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#5", "LineOrd#2", "SHIP"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#5", "LineOrd#2", "TRUCK"));
    }

    @Tag(value="miniBatchSize=2")
    @Test
    void testLeftJoinHasUniqueKeyRetAndAcc() throws Exception {
        this.testLeftJoinWithUpdate();
    }

    @Tag(value="miniBatchSize=2")
    @Test
    void testLeftJoinJoinKeyContainsUniqueKeyRetAndAcc() throws Exception {
        this.testLeftJoinWithUpdate();
    }

    @Tag(value="miniBatchSize=2")
    @Test
    void testLeftJoinHasUniqueKeyWithoutRetract() throws Exception {
        this.testLeftJoinWithoutRetract();
    }

    @Tag(value="miniBatchSize=2")
    @Test
    void testLeftJoinJoinKeyContainsUniqueKeyWithoutRetract() throws Exception {
        this.testLeftJoinWithoutRetract();
    }

    @Tag(value="miniBatchSize=2")
    @Test
    void testLeftJoinJoinKeyContainsUniqueKeyWithoutAcc() throws Exception {
        this.testLeftJoinWithoutAcc();
    }

    @Tag(value="miniBatchSize=2")
    @Test
    void testLeftJoinHasUniqueKeyWithoutAcc() throws Exception {
        this.testLeftJoinWithoutAcc();
    }

    @Tag(value="miniBatchSize=4")
    @Test
    void testLeftJoinHasUniqueKeyWithUpdateMultipleCases() throws Exception {
        this.testLeftJoinWithUpdateRecordsMultipleCases();
    }

    @Tag(value="miniBatchSize=4")
    @Test
    void testLeftJoinJoinKeyContainsUniqueKeyWithUpdateMultipleCases() throws Exception {
        this.testLeftJoinWithUpdateRecordsMultipleCases();
    }

    @Tag(value="miniBatchSize=4")
    @Test
    void testRightJoinWithHasUniqueKey() throws Exception {
        List<StreamRecord> records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.updateAfterRecord("Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464"), StreamRecordUtils.insertRecord("Ord#6", "LineOrd#5", "8 Bellevue Drive, Pottstown, PF 19464"), StreamRecordUtils.updateBeforeRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.deleteRecord("Ord#6", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464"));
        for (StreamRecord row : records) {
            this.testHarness.processElement1(row);
        }
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#5", "LineOrd#2", "SHIP"), StreamRecordUtils.updateAfterRecord("Ord#6", "LineOrd#4", "AIR"), StreamRecordUtils.updateBeforeRecord("Ord#5", "LineOrd#2", "SHIP"), StreamRecordUtils.updateAfterRecord("Ord#5", "LineOrd#2", "TRUCK"), StreamRecordUtils.updateAfterRecord("Ord#7", "LineOrd#0", "AIR"), StreamRecordUtils.updateAfterRecord("Ord#8", "LineOrd#11", "AIR"));
        for (StreamRecord row : records) {
            this.testHarness.processElement2(row);
        }
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, null, null, null, "Ord#6", "LineOrd#4", "AIR"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#5", "LineOrd#2", "SHIP"), StreamRecordUtils.rowOfKind(RowKind.INSERT, null, null, null, "Ord#7", "LineOrd#0", "AIR"), StreamRecordUtils.rowOfKind(RowKind.INSERT, null, null, null, "Ord#8", "LineOrd#11", "AIR"), StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#5", "LineOrd#2", "SHIP"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#5", "LineOrd#2", "TRUCK"));
    }

    @Tag(value="miniBatchSize=4")
    @Test
    void testFullJoinWithHasUniqueKey() throws Exception {
        List<StreamRecord> records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.updateAfterRecord("Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464"), StreamRecordUtils.insertRecord("Ord#6", "LineOrd#5", "8 Bellevue Drive, Pottstown, PF 19464"), StreamRecordUtils.updateBeforeRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.deleteRecord("Ord#6", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464"));
        for (StreamRecord row : records) {
            this.testHarness.processElement1(row);
        }
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#6", "LineOrd#5", "8 Bellevue Drive, Pottstown, PF 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", null, null, null));
        records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#5", "LineOrd#2", "SHIP"), StreamRecordUtils.updateAfterRecord("Ord#6", "LineOrd#4", "AIR"), StreamRecordUtils.updateBeforeRecord("Ord#5", "LineOrd#2", "SHIP"), StreamRecordUtils.updateAfterRecord("Ord#5", "LineOrd#2", "TRUCK"), StreamRecordUtils.updateAfterRecord("Ord#7", "LineOrd#0", "AIR"), StreamRecordUtils.updateAfterRecord("Ord#8", "LineOrd#11", "AIR"));
        for (StreamRecord row : records) {
            this.testHarness.processElement2(row);
        }
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, null, null, null, "Ord#6", "LineOrd#4", "AIR"), StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#5", "LineOrd#2", "SHIP"), StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#6", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, null, null, null, "Ord#7", "LineOrd#0", "AIR"), StreamRecordUtils.rowOfKind(RowKind.INSERT, null, null, null, "Ord#8", "LineOrd#11", "AIR"), StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#5", "LineOrd#2", "SHIP"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#5", "LineOrd#2", "TRUCK"));
    }

    @Tag(value="miniBatchSize=15")
    @Test
    void testLeftJoinWithNoUniqueKey() throws Exception {
        List<StreamRecord> records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.deleteRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive, Pottstown, PF 19464"), StreamRecordUtils.insertRecord("Ord#3", "LineOrd#3", "5 Bellevue Drive, Pottstown, PD 19464"), StreamRecordUtils.updateBeforeRecord("Ord#3", "LineOrd#3", "5 Bellevue Drive, Pottstown, PD 19464"), StreamRecordUtils.updateBeforeRecord("Ord#9", "LineOrd#9", "11 Bellevue Drive, Pottstown, PI 19464"), StreamRecordUtils.updateAfterRecord("Ord#10", "LineOrd#10", "14 Bellevue Drive, Pottstown, PJ 19464"), StreamRecordUtils.deleteRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.updateAfterRecord("Ord#9", "LineOrd#9", "11 Bellevue Drive, Pottstown, PI 19464"), StreamRecordUtils.deleteRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive, Pottstown, PF 19464"), StreamRecordUtils.insertRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive, Pottstown, PF 19464"));
        for (StreamRecord row : records) {
            this.testHarness.processElement2(row);
        }
        records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "AIR"), StreamRecordUtils.updateAfterRecord("Ord#1", "LineOrd#3", "SHIP"), StreamRecordUtils.deleteRecord("Ord#6", "LineOrd#6", "RAILWAY"));
        for (StreamRecord row : records) {
            this.testHarness.processElement1(row);
        }
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#6", "LineOrd#6", "RAILWAY", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#3", "SHIP", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#1", "AIR", "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"));
    }

    private void testLeftJoinWithUpdate() throws Exception {
        List<StreamRecord<RowData>> records = Collections.singletonList(StreamRecordUtils.insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"));
        for (StreamRecord<RowData> row : records) {
            this.testHarness.processElement1(row);
        }
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
        records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#2", "LineOrd#2", "SHIP"), StreamRecordUtils.updateBeforeRecord("Ord#2", "LineOrd#2", "SHIP"), StreamRecordUtils.updateAfterRecord("Ord#2", "LineOrd#2", "AIR"));
        for (StreamRecord<RowData> row : records) {
            this.testHarness.processElement2(row);
        }
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#2", "LineOrd#2", "SHIP"), StreamRecordUtils.rowOfKind(RowKind.UPDATE_BEFORE, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#2", "LineOrd#2", "SHIP"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", "Ord#2", "LineOrd#2", "AIR"));
    }

    private void testLeftJoinWithoutRetract() throws Exception {
        List<StreamRecord> records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.updateAfterRecord("Ord#2", "LineOrd#2", "5 Bellevue Drive, Pottstown, PC 19464"));
        for (StreamRecord row : records) {
            this.testHarness.processElement1(row);
        }
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#2", "LineOrd#2", "5 Bellevue Drive, Pottstown, PC 19464", null, null, null));
        records = Arrays.asList(StreamRecordUtils.updateAfterRecord("Ord#2", "LineOrd#2", "SHIP"), StreamRecordUtils.updateAfterRecord("Ord#2", "LineOrd#2", "AIR"));
        for (StreamRecord row : records) {
            this.testHarness.processElement2(row);
        }
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#2", "LineOrd#2", "5 Bellevue Drive, Pottstown, PC 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#2", "LineOrd#2", "5 Bellevue Drive, Pottstown, PC 19464", "Ord#2", "LineOrd#2", "AIR"));
    }

    private void testLeftJoinWithoutAcc() throws Exception {
        List<StreamRecord> records = Arrays.asList(StreamRecordUtils.updateBeforeRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.deleteRecord("Ord#2", "LineOrd#2", "5 Bellevue Drive, Pottstown, PC 19464"));
        for (StreamRecord row : records) {
            this.testHarness.processElement1(row);
        }
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#2", "LineOrd#2", "5 Bellevue Drive, Pottstown, PC 19464", null, null, null));
        records = Arrays.asList(StreamRecordUtils.deleteRecord("Ord#2", "LineOrd#2", "SHIP"), StreamRecordUtils.updateBeforeRecord("Ord#2", "LineOrd#2", "AIR"));
        for (StreamRecord row : records) {
            this.testHarness.processElement2(row);
        }
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
    }

    private void testLeftJoinWithUpdateRecordsMultipleCases() throws Exception {
        List<StreamRecord> records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.updateAfterRecord("Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464"), StreamRecordUtils.insertRecord("Ord#0", "LineOrd#4", "5 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.updateAfterRecord("Ord#4", "LineOrd#0", "6 Bellevue Drive, Pottstown, PB 19464"));
        for (StreamRecord row : records) {
            this.testHarness.processElement1(row);
        }
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#4", "LineOrd#0", "6 Bellevue Drive, Pottstown, PB 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#0", "LineOrd#4", "5 Bellevue Drive, Pottstown, PB 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464", null, null, null));
        records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#6", "LineOrd#4", "AIR"), StreamRecordUtils.updateAfterRecord("Ord#6", "LineOrd#4", "AIR"), StreamRecordUtils.updateAfterRecord("Ord#6", "LineOrd#5", "AIR"), StreamRecordUtils.updateAfterRecord("Ord#6", "LineOrd#4", "TRUCK"));
        for (StreamRecord row : records) {
            this.testHarness.processElement2(row);
        }
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#0", "LineOrd#4", "5 Bellevue Drive, Pottstown, PB 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#0", "LineOrd#4", "5 Bellevue Drive, Pottstown, PB 19464", "Ord#6", "LineOrd#4", "TRUCK"), StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464", null, null, null), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464", "Ord#6", "LineOrd#5", "AIR"));
        records = Arrays.asList(StreamRecordUtils.deleteRecord("Ord#6", "LineOrd#4", "TRUCK"), StreamRecordUtils.insertRecord("Ord#6", "LineOrd#4", "TRUCK2"), StreamRecordUtils.deleteRecord("Ord#6", "LineOrd#4", "TRUCK3"), StreamRecordUtils.insertRecord("Ord#6", "LineOrd#4", "AIR"));
        for (StreamRecord row : records) {
            this.testHarness.processElement2(row);
        }
        this.assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, "Ord#0", "LineOrd#4", "5 Bellevue Drive, Pottstown, PB 19464", "Ord#6", "LineOrd#4", "TRUCK"), StreamRecordUtils.rowOfKind(RowKind.INSERT, "Ord#0", "LineOrd#4", "5 Bellevue Drive, Pottstown, PB 19464", "Ord#6", "LineOrd#4", "AIR"));
        records = Arrays.asList(StreamRecordUtils.insertRecord("Ord#5", "LineOrd#2", "SHIP"), StreamRecordUtils.updateAfterRecord("Ord#5", "LineOrd#2", "TRUCK"), StreamRecordUtils.updateBeforeRecord("Ord#5", "LineOrd#2", "SHIP"), StreamRecordUtils.updateAfterRecord("Ord#9", "LineOrd#7", "TRUCK"));
        for (StreamRecord row : records) {
            this.testHarness.processElement2(row);
        }
        this.assertor.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
    }

    public MiniBatchStreamingJoinOperator createJoinOperator(TestInfo testInfo) {
        this.leftTypeInfo = this.leftTypeInfoExtractor.apply(testInfo.getDisplayName());
        this.rightTypeInfo = this.rightTypeInfoExtractor.apply(testInfo.getDisplayName());
        this.leftKeySelector = this.leftKeySelectorExtractor.apply(testInfo.getDisplayName());
        this.rightKeySelector = this.rightKeySelectorExtractor.apply(testInfo.getDisplayName());
        RowDataKeySelector[] keySelectors = this.ukSelectorExtractor.apply(testInfo.getDisplayName());
        this.leftUniqueKeySelector = keySelectors[0];
        this.rightUniqueKeySelector = keySelectors[1];
        JoinInputSideSpec[] inputSideSpecs = this.inputSpecExtractor.apply(testInfo.getDisplayName());
        Boolean[] isOuter = this.joinTypeExtractor.apply(testInfo.getDisplayName());
        FlinkJoinType joinType = this.flinkJoinTypeExtractor.apply(testInfo.getDisplayName());
        int batchSize = this.miniBatchSizeExtractor.apply(testInfo.getTags());
        Long[] ttl = (Long[])STATE_RETENTION_TIME_EXTRACTOR.apply(testInfo.getTags());
        return MiniBatchStreamingJoinOperator.newMiniBatchStreamJoinOperator((FlinkJoinType)joinType, (InternalTypeInfo)this.leftTypeInfo, (InternalTypeInfo)this.rightTypeInfo, (GeneratedJoinCondition)this.joinCondition, (JoinInputSideSpec)inputSideSpecs[0], (JoinInputSideSpec)inputSideSpecs[1], (boolean)isOuter[0], (boolean)isOuter[1], (boolean[])new boolean[]{true}, (long)ttl[0], (long)ttl[0], (CoBundleTrigger)new CountCoBundleTrigger((long)batchSize));
    }

    @Override
    public RowType getOutputType() {
        return RowType.of((LogicalType[])((LogicalType[])Stream.concat(this.leftTypeInfo.toRowType().getChildren().stream(), this.rightTypeInfo.toRowType().getChildren().stream()).toArray(LogicalType[]::new)), (String[])((String[])Stream.concat(this.leftTypeInfo.toRowType().getFieldNames().stream(), this.rightTypeInfo.toRowType().getFieldNames().stream()).toArray(String[]::new)));
    }
}

