/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.window.tvf.operator;

import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.SessionWindowAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.operator.UnalignedWindowTableFunctionOperator;
import org.apache.flink.table.runtime.operators.window.tvf.operator.WindowTableFunctionOperatorTestBase;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class UnalignedWindowTableFunctionOperatorTest
extends WindowTableFunctionOperatorTestBase {
    protected static final RowDataKeySelector KEY_SELECTOR = HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, INPUT_ROW_TYPE.getChildren().toArray(new LogicalType[0]));

    UnalignedWindowTableFunctionOperatorTest(ZoneId shiftTimeZone) {
        super(shiftTimeZone);
    }

    @Parameters(name="TimeZone = {0}")
    private static Collection<Object[]> runMode() {
        return Arrays.asList({UTC_ZONE_ID}, {SHANGHAI_ZONE_ID});
    }

    @TestTemplate
    void testEventTimeSessionWindows() throws Exception {
        SessionWindowAssigner assigner = SessionWindowAssigner.withGap((Duration)Duration.ofSeconds(3L));
        UnalignedWindowTableFunctionOperator operator = this.createOperator((GroupWindowAssigner<TimeWindow>)assigner, 2);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(UnalignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 20L));
        testHarness.processElement(UnalignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 3999L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(new Watermark(2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(UnalignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 20L, this.localMills(20L), this.localMills(3020L), 3019L));
        expectedOutput.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(UnalignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 2000L));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(UnalignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 2000L, this.localMills(2000L), this.localMills(5000L), 4999L));
        expectedOutput.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(UnalignedWindowTableFunctionOperatorTest.insertRecord("key2", 2, 7999L));
        testHarness.processElement(UnalignedWindowTableFunctionOperatorTest.insertRecord("key2", 3, 5999L));
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(new Watermark(5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        expectedOutput.add(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(7999L));
        testHarness.processWatermark(new Watermark(8999L));
        expectedOutput.add(new Watermark(8999L));
        testHarness.processWatermark(new Watermark(9999L));
        expectedOutput.add(new Watermark(9999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(10999L));
        testHarness.processElement(UnalignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 999L));
        testHarness.processElement(UnalignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, new Long[]{null}));
        expectedOutput.add(UnalignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 3999L, this.localMills(3999L), this.localMills(10999L), 10998L));
        expectedOutput.add(UnalignedWindowTableFunctionOperatorTest.insertRecord("key2", 2, 7999L, this.localMills(3999L), this.localMills(10999L), 10998L));
        expectedOutput.add(UnalignedWindowTableFunctionOperatorTest.insertRecord("key2", 3, 5999L, this.localMills(3999L), this.localMills(10999L), 10998L));
        expectedOutput.add(new Watermark(10999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((long)operator.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
        Assertions.assertThat((long)operator.getNumNullRowTimeRecordsDropped().getCount()).isEqualTo(1L);
        testHarness.close();
    }

    @TestTemplate
    void testEventTimeSessionWindowsWithChangelog() throws Exception {
        SessionWindowAssigner assigner = SessionWindowAssigner.withGap((Duration)Duration.ofSeconds(3L));
        UnalignedWindowTableFunctionOperator operator = this.createOperator((GroupWindowAssigner<TimeWindow>)assigner, 2);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 1, 20L));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.DELETE, "key2", 1, 1999L));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key3", 1, 2999L));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key4", 1, 1999L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.DELETE, "key4", 1, 2999L));
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.UPDATE_BEFORE, "key3", 1, 3999L));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.UPDATE_AFTER, "key3", 1, 4999L));
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(new Watermark(2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 1, 20L, this.localMills(20L), this.localMills(3020L), 3019L));
        expectedOutput.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.DELETE, "key2", 1, 1999L, this.localMills(1999L), this.localMills(4999L), 4998L));
        expectedOutput.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key4", 1, 1999L, this.localMills(1999L), this.localMills(5999L), 5998L));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.DELETE, "key4", 1, 2999L, this.localMills(1999L), this.localMills(5999L), 5998L));
        expectedOutput.add(new Watermark(5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key3", 1, 2999L, this.localMills(2999L), this.localMills(7999L), 7998L));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.UPDATE_BEFORE, "key3", 1, 3999L, this.localMills(2999L), this.localMills(7999L), 7998L));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.UPDATE_AFTER, "key3", 1, 4999L, this.localMills(2999L), this.localMills(7999L), 7998L));
        expectedOutput.add(new Watermark(7999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @TestTemplate
    void testProcessTimeSessionWindows() throws Exception {
        SessionWindowAssigner assigner = SessionWindowAssigner.withGap((Duration)Duration.ofSeconds(3L)).withProcessingTime();
        UnalignedWindowTableFunctionOperator operator = this.createOperator((GroupWindowAssigner<TimeWindow>)assigner, -1);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.setProcessingTime(20L);
        testHarness.processElement(UnalignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, Long.MAX_VALUE));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(1999L);
        testHarness.setProcessingTime(2999L);
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(3999L);
        testHarness.processElement(UnalignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, Long.MAX_VALUE));
        expectedOutput.add(UnalignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, Long.MAX_VALUE, this.localMills(20L), this.localMills(3020L), 3019L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(4999L);
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(5999L);
        testHarness.processElement(UnalignedWindowTableFunctionOperatorTest.insertRecord("key2", 3, Long.MAX_VALUE));
        testHarness.setProcessingTime(7999L);
        testHarness.processElement(UnalignedWindowTableFunctionOperatorTest.insertRecord("key2", 2, Long.MAX_VALUE));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(6999L);
        testHarness.setProcessingTime(7999L);
        testHarness.setProcessingTime(8999L);
        testHarness.setProcessingTime(9999L);
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.setProcessingTime(10999L);
        expectedOutput.add(UnalignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, Long.MAX_VALUE, this.localMills(3999L), this.localMills(10999L), 10998L));
        expectedOutput.add(UnalignedWindowTableFunctionOperatorTest.insertRecord("key2", 3, Long.MAX_VALUE, this.localMills(3999L), this.localMills(10999L), 10998L));
        expectedOutput.add(UnalignedWindowTableFunctionOperatorTest.insertRecord("key2", 2, Long.MAX_VALUE, this.localMills(3999L), this.localMills(10999L), 10998L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((Long)((Long)operator.getWatermarkLatency().getValue())).isEqualTo((Object)0L);
        testHarness.close();
    }

    @TestTemplate
    void testProcessTimeSessionWindowsWithChangelog() throws Exception {
        SessionWindowAssigner assigner = SessionWindowAssigner.withGap((Duration)Duration.ofSeconds(3L)).withProcessingTime();
        UnalignedWindowTableFunctionOperator operator = this.createOperator((GroupWindowAssigner<TimeWindow>)assigner, -1);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.setProcessingTime(20L);
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 1, Long.MAX_VALUE));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key2", 1, Long.MAX_VALUE));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(1999L);
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.DELETE, "key3", 1, Long.MAX_VALUE));
        testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.DELETE, "key2", 1, Long.MAX_VALUE));
        testHarness.setProcessingTime(2999L);
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.setProcessingTime(3999L);
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key1", 1, Long.MAX_VALUE, this.localMills(20L), this.localMills(3020L), 3019L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = this.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.setProcessingTime(4999L);
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.INSERT, "key2", 1, Long.MAX_VALUE, this.localMills(20L), this.localMills(4999L), 4998L));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.DELETE, "key3", 1, Long.MAX_VALUE, this.localMills(1999L), this.localMills(4999L), 4998L));
        expectedOutput.add(StreamRecordUtils.binaryRecord(RowKind.DELETE, "key2", 1, Long.MAX_VALUE, this.localMills(20L), this.localMills(4999L), 4998L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @TestTemplate
    void testSessionWindowsWithoutPartitionKeys() throws Exception {
        SessionWindowAssigner assigner = SessionWindowAssigner.withGap((Duration)Duration.ofSeconds(3L));
        UnalignedWindowTableFunctionOperator operator = this.createOperator((GroupWindowAssigner<TimeWindow>)assigner, 2);
        EmptyRowDataKeySelector keySelector = EmptyRowDataKeySelector.INSTANCE;
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)keySelector, (TypeInformation)keySelector.getProducedType());
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(UnalignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 1999L));
        testHarness.processElement(UnalignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 3999L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(new Watermark(3999L));
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(new Watermark(5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)keySelector, (TypeInformation)keySelector.getProducedType());
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(6999L));
        expectedOutput.add(UnalignedWindowTableFunctionOperatorTest.insertRecord("key1", 1, 1999L, this.localMills(1999L), this.localMills(6999L), 6998L));
        expectedOutput.add(UnalignedWindowTableFunctionOperatorTest.insertRecord("key2", 1, 3999L, this.localMills(1999L), this.localMills(6999L), 6998L));
        expectedOutput.add(new Watermark(6999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(UnalignedWindowTableFunctionOperator operator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)KEY_SELECTOR, (TypeInformation)KEY_SELECTOR.getProducedType());
    }

    private UnalignedWindowTableFunctionOperator createOperator(GroupWindowAssigner<TimeWindow> windowAssigner, int rowTimeIndex) {
        return new UnalignedWindowTableFunctionOperator(windowAssigner, windowAssigner.getWindowSerializer(new ExecutionConfig()), (TypeSerializer)new RowDataSerializer(INPUT_ROW_TYPE), rowTimeIndex, this.shiftTimeZone);
    }
}

