/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate.window;

import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.runtime.operators.aggregate.window.WindowAggOperatorTestBase;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigners;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class SlicingWindowAggOperatorTest
extends WindowAggOperatorTestBase {
    public SlicingWindowAggOperatorTest(ZoneId shiftTimeZone, boolean enableAsyncState) {
        super(shiftTimeZone, enableAsyncState);
    }

    @Parameters(name="TimeZone = {0}, EnableAsyncState = {1}")
    private static Collection<Object[]> runMode() {
        return Arrays.asList({UTC_ZONE_ID, false}, {UTC_ZONE_ID, true}, {SHANGHAI_ZONE_ID, false}, {SHANGHAI_ZONE_ID, true});
    }

    @TestTemplate
    void testEventTimeHoppingWindows() throws Exception {
        SliceAssigners.HoppingSliceAssigner assigner = SliceAssigners.hopping((int)2, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofSeconds(3L), (Duration)Duration.ofSeconds(1L));
        SlicingSumAndCountAggsFunction aggsFunction = new SlicingSumAndCountAggsFunction((SliceAssigner)assigner);
        OneInputStreamOperator<RowData, RowData> operator = this.buildWindowOperator((WindowAssigner)assigner, aggsFunction, 1);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)3999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)3000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)20L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)0L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1998L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1000L)));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(-2000L), this.localMills(1000L)));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(-1000L), this.localMills(2000L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(-1000L), this.localMills(2000L)));
        expectedOutput.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(3000L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(3000L)));
        expectedOutput.add(new Watermark(2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)aggsFunction.closeCalled.get()).as("Close was not called.", new Object[0])).isGreaterThan(0);
        expectedOutput.clear();
        testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 5L, 5L, this.localMills(1000L), this.localMills(4000L)));
        expectedOutput.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)3500L)));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(2000L), this.localMills(5000L)));
        expectedOutput.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)2999L)));
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(3000L), this.localMills(6000L)));
        expectedOutput.add(new Watermark(5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((long)SlicingWindowAggOperatorTest.getNumLateRecordsDroppedCount(operator)).isEqualTo(1L);
        testHarness.close();
    }

    @TestTemplate
    public void testEventTimeHoppingWindowWithExpiredSliceAndRestore() throws Exception {
        SliceAssigners.HoppingSliceAssigner assigner = SliceAssigners.hopping((int)2, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofSeconds(3L), (Duration)Duration.ofSeconds(1L));
        SlicingSumAndCountAggsFunction aggsFunction = new SlicingSumAndCountAggsFunction((SliceAssigner)assigner);
        OneInputStreamOperator<RowData, RowData> operator = this.buildWindowOperator((WindowAssigner)assigner, aggsFunction, 1);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)1020L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)1001L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)1999L)));
        testHarness.processWatermark(new Watermark(2001L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(-1000L), this.localMills(2000L)));
        expectedOutput.add(new Watermark(2001L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)aggsFunction.closeCalled.get()).as("Close was not called.", new Object[0])).isGreaterThan(0);
        expectedOutput.clear();
        testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1500L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)2998L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)2999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)2000L)));
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(3000L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 4L, 4L, this.localMills(0L), this.localMills(3000L)));
        expectedOutput.add(new Watermark(2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @TestTemplate
    public void testEventTimeHoppingWindowWithExpiredSliceAndNoRestore() throws Exception {
        SliceAssigners.HoppingSliceAssigner assigner = SliceAssigners.hopping((int)2, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofSeconds(3L), (Duration)Duration.ofSeconds(1L));
        SlicingSumAndCountAggsFunction aggsFunction = new SlicingSumAndCountAggsFunction((SliceAssigner)assigner);
        OneInputStreamOperator<RowData, RowData> operator = this.buildWindowOperator((WindowAssigner)assigner, aggsFunction, 1);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)1020L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)1001L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)1999L)));
        testHarness.processWatermark(new Watermark(2001L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(-1000L), this.localMills(2000L)));
        expectedOutput.add(new Watermark(2001L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1500L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)2998L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)2999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)2000L)));
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(3000L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 4L, 4L, this.localMills(0L), this.localMills(3000L)));
        expectedOutput.add(new Watermark(2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @TestTemplate
    void testProcessingTimeHoppingWindows() throws Exception {
        SliceAssigners.HoppingSliceAssigner assigner = SliceAssigners.hopping((int)-1, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofHours(3L), (Duration)Duration.ofHours(1L));
        SlicingSumAndCountAggsFunction aggsFunction = new SlicingSumAndCountAggsFunction((SliceAssigner)assigner);
        OneInputStreamOperator<RowData, RowData> operator = this.buildWindowOperator((WindowAssigner)assigner, aggsFunction, 1);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:00.003"));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T01:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1969-12-31T22:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T01:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T02:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1969-12-31T23:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T02:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T03:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T03:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T03:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T07:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 2L, 2L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T01:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T04:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 5L, 5L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T01:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T04:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 5L, 5L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T02:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T05:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T03:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T06:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)aggsFunction.closeCalled.get()).as("Close was not called.", new Object[0])).isGreaterThan(0);
    }

    @TestTemplate
    void testEventTimeCumulativeWindows() throws Exception {
        SliceAssigners.CumulativeSliceAssigner assigner = SliceAssigners.cumulative((int)2, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofSeconds(3L), (Duration)Duration.ofSeconds(1L));
        SlicingSumAndCountAggsFunction aggsFunction = new SlicingSumAndCountAggsFunction((SliceAssigner)assigner);
        OneInputStreamOperator<RowData, RowData> operator = this.buildWindowOperator((WindowAssigner)assigner, aggsFunction, null);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)2999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)3000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)20L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)0L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1998L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1000L)));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(1000L)));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(2000L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(2000L)));
        expectedOutput.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)aggsFunction.closeCalled.get()).as("Close was not called.", new Object[0])).isGreaterThan(0);
        expectedOutput.clear();
        testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1000L)));
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(3000L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 5L, 5L, this.localMills(0L), this.localMills(3000L)));
        expectedOutput.add(new Watermark(2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, this.localMills(3000L), this.localMills(4000L)));
        expectedOutput.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2, TimestampData.fromEpochMillis((long)3500L)));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, this.localMills(3000L), this.localMills(5000L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 1L, this.localMills(3000L), this.localMills(5000L)));
        expectedOutput.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)2999L)));
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, this.localMills(3000L), this.localMills(6000L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 1L, this.localMills(3000L), this.localMills(6000L)));
        expectedOutput.add(new Watermark(5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((long)SlicingWindowAggOperatorTest.getNumLateRecordsDroppedCount(operator)).isEqualTo(1L);
        testHarness.close();
    }

    @TestTemplate
    void testProcessingTimeCumulativeWindows() throws Exception {
        SliceAssigners.CumulativeSliceAssigner assigner = SliceAssigners.cumulative((int)-1, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofDays(1L), (Duration)Duration.ofHours(8L));
        SlicingSumAndCountAggsFunction aggsFunction = new SlicingSumAndCountAggsFunction((SliceAssigner)assigner);
        OneInputStreamOperator<RowData, RowData> operator = this.buildWindowOperator((WindowAssigner)assigner, aggsFunction, null);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:00.003"));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T08:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T08:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T16:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T16:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-02T00:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-03T08:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T08:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T08:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T16:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T16:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-03T00:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-03T00:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)aggsFunction.closeCalled.get()).as("Close was not called.", new Object[0])).isGreaterThan(0);
    }

    @TestTemplate
    void testEventTimeTumblingWindows() throws Exception {
        SliceAssigners.TumblingSliceAssigner assigner = SliceAssigners.tumbling((int)2, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofSeconds(3L));
        SlicingSumAndCountAggsFunction aggsFunction = new SlicingSumAndCountAggsFunction((SliceAssigner)assigner);
        OneInputStreamOperator<RowData, RowData> operator = this.buildWindowOperator((WindowAssigner)assigner, aggsFunction, null);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)3999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)3000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)20L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)0L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1998L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1000L)));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)aggsFunction.closeCalled.get()).as("Close was not called.", new Object[0])).isGreaterThan(0);
        expectedOutput.clear();
        testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(3000L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(3000L)));
        expectedOutput.add(new Watermark(2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)2500L)));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)2999L)));
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 2L, 2L, this.localMills(3000L), this.localMills(6000L)));
        expectedOutput.add(new Watermark(5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((long)SlicingWindowAggOperatorTest.getNumLateRecordsDroppedCount(operator)).isEqualTo(2L);
        testHarness.close();
    }

    @TestTemplate
    void testProcessingTimeTumblingWindows() throws Exception {
        SliceAssigners.TumblingSliceAssigner assigner = SliceAssigners.tumbling((int)-1, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofHours(5L));
        SlicingSumAndCountAggsFunction aggsFunction = new SlicingSumAndCountAggsFunction((SliceAssigner)assigner);
        OneInputStreamOperator<RowData, RowData> operator = this.buildWindowOperator((WindowAssigner)assigner, aggsFunction, null);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:00.003"));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T05:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T05:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T05:00:00")));
        testHarness.endInput();
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T10:00:01"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T05:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T10:00:00")));
        Assertions.assertThat((long)SlicingWindowAggOperatorTest.getWatermarkLatency(operator)).isEqualTo((Object)0L);
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @TestTemplate
    void testInvalidWindows() {
        SliceAssigners.HoppingSliceAssigner assigner = SliceAssigners.hopping((int)2, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofSeconds(3L), (Duration)Duration.ofSeconds(1L));
        SlicingSumAndCountAggsFunction aggsFunction = new SlicingSumAndCountAggsFunction((SliceAssigner)assigner);
        Assertions.assertThatThrownBy(() -> this.lambda$testInvalidWindows$0((SliceAssigner)assigner, aggsFunction)).hasMessageContaining("Hopping window requires a COUNT(*) in the aggregate functions.");
    }

    private /* synthetic */ void lambda$testInvalidWindows$0(SliceAssigner assigner, SlicingSumAndCountAggsFunction aggsFunction) throws Throwable {
        this.buildWindowOperator((WindowAssigner)assigner, aggsFunction, null);
    }

    protected static class SlicingSumAndCountAggsFunction
    extends WindowAggOperatorTestBase.SumAndCountAggsFunctionBase<Long> {
        private final SliceAssigner assigner;

        public SlicingSumAndCountAggsFunction(SliceAssigner assigner) {
            this.assigner = assigner;
        }

        @Override
        protected long getWindowStart(Long window) {
            return this.assigner.getWindowStart(window.longValue());
        }

        @Override
        protected long getWindowEnd(Long window) {
            return window;
        }
    }
}

