/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.window;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.window.WindowJoinOperatorBuilder;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class WindowJoinOperatorTest {
    private static final InternalTypeInfo<RowData> INPUT_ROW_TYPE = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{new BigIntType(), VarCharType.STRING_TYPE});
    private static final InternalTypeInfo<RowData> OUTPUT_ROW_TYPE = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{new BigIntType(), VarCharType.STRING_TYPE, new BigIntType(), VarCharType.STRING_TYPE});
    private static final RowDataHarnessAssertor ASSERTER = new RowDataHarnessAssertor(OUTPUT_ROW_TYPE.toRowFieldTypes());
    private static final RowDataHarnessAssertor SEMI_ANTI_JOIN_ASSERTER = new RowDataHarnessAssertor(INPUT_ROW_TYPE.toRowFieldTypes());
    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
    private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
    private final ZoneId shiftTimeZone;
    private final boolean enableAsyncState;

    WindowJoinOperatorTest(ZoneId shiftTimeZone, boolean enableAsyncState) {
        this.shiftTimeZone = shiftTimeZone;
        this.enableAsyncState = enableAsyncState;
    }

    @Parameters(name="TimeZone = {0}, EnableAsyncState = {1}")
    private static Collection<Object[]> runMode() {
        return Arrays.asList({UTC_ZONE_ID, false}, {UTC_ZONE_ID, true}, {SHANGHAI_ZONE_ID, false}, {SHANGHAI_ZONE_ID, true});
    }

    @TestTemplate
    void testSemiJoin() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(FlinkJoinType.SEMI);
        testHarness.open();
        testHarness.processWatermark1(new Watermark(1L));
        testHarness.processWatermark2(new Watermark(1L));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)1L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)6L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)9L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(3);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(4);
        testHarness.processWatermark1(new Watermark(10L));
        testHarness.processWatermark2(new Watermark(10L));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(new Watermark(1L));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(new Watermark(10L));
        SEMI_ANTI_JOIN_ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)12L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(3);
        testHarness.processWatermark1(new Watermark(13L));
        testHarness.processWatermark2(new Watermark(13L));
        expectedOutput.add(new Watermark(13L));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(2);
        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(new Watermark(18L));
        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @TestTemplate
    void testAntiJoin() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(FlinkJoinType.ANTI);
        testHarness.open();
        testHarness.processWatermark1(new Watermark(1L));
        testHarness.processWatermark2(new Watermark(1L));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)1L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)6L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)9L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(3);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(4);
        testHarness.processWatermark1(new Watermark(10L));
        testHarness.processWatermark2(new Watermark(10L));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(new Watermark(1L));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)6L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(new Watermark(10L));
        SEMI_ANTI_JOIN_ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)12L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(3);
        testHarness.processWatermark1(new Watermark(13L));
        testHarness.processWatermark2(new Watermark(13L));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)12L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(new Watermark(13L));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(2);
        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        expectedOutput.add(new Watermark(18L));
        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @TestTemplate
    void testInnerJoin() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(FlinkJoinType.INNER);
        testHarness.open();
        testHarness.processWatermark1(new Watermark(1L));
        testHarness.processWatermark2(new Watermark(1L));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)1L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)6L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)9L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(3);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(4);
        testHarness.processWatermark1(new Watermark(10L));
        testHarness.processWatermark2(new Watermark(10L));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(new Watermark(1L));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1", TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1", TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(new Watermark(10L));
        ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)12L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(3);
        testHarness.processWatermark1(new Watermark(13L));
        testHarness.processWatermark2(new Watermark(13L));
        expectedOutput.add(new Watermark(13L));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(2);
        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1", TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1", TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(new Watermark(18L));
        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @TestTemplate
    void testLeftOuterJoin() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(FlinkJoinType.LEFT);
        testHarness.open();
        testHarness.processWatermark1(new Watermark(1L));
        testHarness.processWatermark2(new Watermark(1L));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)1L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)6L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)9L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(3);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(4);
        testHarness.processWatermark1(new Watermark(10L));
        testHarness.processWatermark2(new Watermark(10L));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(new Watermark(1L));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1", TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1", TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)6L, (ZoneId)this.shiftTimeZone), "k1", null, null));
        expectedOutput.add(new Watermark(10L));
        ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)12L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(3);
        testHarness.processWatermark1(new Watermark(13L));
        testHarness.processWatermark2(new Watermark(13L));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)12L, (ZoneId)this.shiftTimeZone), "k1", null, null));
        expectedOutput.add(new Watermark(13L));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(2);
        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1", TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1", TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(new Watermark(18L));
        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @TestTemplate
    void testRightOuterJoin() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(FlinkJoinType.RIGHT);
        testHarness.open();
        testHarness.processWatermark1(new Watermark(1L));
        testHarness.processWatermark2(new Watermark(1L));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)1L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)6L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)9L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(3);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(4);
        testHarness.processWatermark1(new Watermark(10L));
        testHarness.processWatermark2(new Watermark(10L));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(new Watermark(1L));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1", TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1", TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(null, null, TimeWindowUtil.toUtcTimestampMills((long)9L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(new Watermark(10L));
        ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)12L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(3);
        testHarness.processWatermark1(new Watermark(13L));
        testHarness.processWatermark2(new Watermark(13L));
        expectedOutput.add(new Watermark(13L));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(2);
        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1", TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1", TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(new Watermark(18L));
        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @TestTemplate
    void testOuterJoin() throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness = this.createTestHarness(FlinkJoinType.FULL);
        testHarness.open();
        testHarness.processWatermark1(new Watermark(1L));
        testHarness.processWatermark2(new Watermark(1L));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)1L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)6L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)9L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(3);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(4);
        testHarness.processWatermark1(new Watermark(10L));
        testHarness.processWatermark2(new Watermark(10L));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(new Watermark(1L));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1", TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1", TimeWindowUtil.toUtcTimestampMills((long)3L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)6L, (ZoneId)this.shiftTimeZone), "k1", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(null, null, TimeWindowUtil.toUtcTimestampMills((long)9L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(new Watermark(10L));
        ASSERTER.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((int)testHarness.numEventTimeTimers()).isEqualTo(0);
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(0);
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)12L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement1(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        testHarness.processElement2(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(3);
        testHarness.processWatermark1(new Watermark(13L));
        testHarness.processWatermark2(new Watermark(13L));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)12L, (ZoneId)this.shiftTimeZone), "k1", null, null));
        expectedOutput.add(new Watermark(13L));
        Assertions.assertThat((int)testHarness.numKeyedStateEntries()).isEqualTo(2);
        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark1(new Watermark(18L));
        testHarness.processWatermark2(new Watermark(18L));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1", TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(StreamRecordUtils.insertRecord(TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1", TimeWindowUtil.toUtcTimestampMills((long)15L, (ZoneId)this.shiftTimeZone), "k1"));
        expectedOutput.add(new Watermark(18L));
        ASSERTER.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness(FlinkJoinType joinType) throws Exception {
        String funcCode = "public class TestWindowJoinCondition extends org.apache.flink.api.common.functions.AbstractRichFunction implements org.apache.flink.table.runtime.generated.JoinCondition {\n\n    public TestWindowJoinCondition(Object[] reference) {\n    }\n\n    @Override\n    public boolean apply(org.apache.flink.table.data.RowData in1, org.apache.flink.table.data.RowData in2) {\n        return true;\n    }\n}\n";
        GeneratedJoinCondition joinFunction = new GeneratedJoinCondition("TestWindowJoinCondition", funcCode, new Object[0]);
        int keyIdx = 1;
        RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{keyIdx}, INPUT_ROW_TYPE.toRowFieldTypes());
        InternalTypeInfo keyType = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[0]);
        WindowJoinOperatorBuilder operatorBuilder = WindowJoinOperatorBuilder.builder().leftSerializer((TypeSerializer)INPUT_ROW_TYPE.toRowSerializer()).rightSerializer((TypeSerializer)INPUT_ROW_TYPE.toRowSerializer()).generatedJoinCondition(joinFunction).leftWindowEndIndex(0).rightWindowEndIndex(0).filterNullKeys(new boolean[]{true}).joinType(joinType).withShiftTimezone(this.shiftTimeZone);
        if (this.enableAsyncState) {
            operatorBuilder.enableAsyncState();
            TwoInputStreamOperator operator = operatorBuilder.build();
            return AsyncKeyedTwoInputStreamOperatorTestHarness.create((TwoInputStreamOperator)operator, (KeySelector)keySelector, (KeySelector)keySelector, (TypeInformation)keyType, (int)1, (int)1, (int)0);
        }
        TwoInputStreamOperator operator = operatorBuilder.build();
        return new KeyedTwoInputStreamOperatorTestHarness(operator, (KeySelector)keySelector, (KeySelector)keySelector, (TypeInformation)keyType);
    }
}

