/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.over;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.operators.over.AbstractNonTimeUnboundedPrecedingOver;
import org.apache.flink.table.runtime.operators.over.NonTimeOverWindowTestBase;
import org.apache.flink.table.runtime.operators.over.NonTimeRangeUnboundedPrecedingFunction;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class NonTimeRangeUnboundedPrecedingFunctionTest
extends NonTimeOverWindowTestBase {
    private NonTimeRangeUnboundedPrecedingFunction<RowData> getNonTimeRangeUnboundedPrecedingFunction(long retentionTime, GeneratedRecordComparator generatedSortKeyComparator) {
        return new NonTimeRangeUnboundedPrecedingFunction<RowData>(retentionTime, aggsHandleFunction, GENERATED_ROW_VALUE_EQUALISER, GENERATED_SORT_KEY_EQUALISER, generatedSortKeyComparator, this.accTypes, this.inputFieldTypes, SORT_KEY_TYPES, SORT_KEY_SELECTOR){};
    }

    @Test
    public void testInsertOnlyRecordsWithCustomSortKey() throws Exception {
        KeyedProcessOperator operator = new KeyedProcessOperator(this.getNonTimeRangeUnboundedPrecedingFunction(0L, GENERATED_SORT_KEY_COMPARATOR_ASC));
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)operator);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 200L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 6L, 600L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 2L, 200L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 4L, 400L));
        List<RowData> expectedRows = Arrays.asList(this.outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), this.outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L), this.outputRecord(RowKind.INSERT, "key1", 5L, 500L, 8L), this.outputRecord(RowKind.INSERT, "key1", 6L, 600L, 14L), this.outputRecord(RowKind.INSERT, "key2", 1L, 100L, 1L), this.outputRecord(RowKind.INSERT, "key2", 2L, 200L, 3L), this.outputRecord(RowKind.INSERT, "key1", 4L, 400L, 7L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 8L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 12L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 14L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 18L));
        List actualRows = testHarness.extractOutputValues();
        this.validateRows(actualRows, expectedRows);
    }

    @Test
    public void testInsertOnlyRecordsWithCustomSortKeyAndLongSumAgg() throws Exception {
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)new NonTimeRangeUnboundedPrecedingFunction<RowData>(0L, aggsSumLongHandleFunction, GENERATED_ROW_VALUE_EQUALISER, GENERATED_SORT_KEY_EQUALISER, GENERATED_SORT_KEY_COMPARATOR_ASC, this.accTypes, this.inputFieldTypes, SORT_KEY_TYPES, SORT_KEY_SELECTOR){});
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)operator);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 200L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 6L, 600L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 2L, 200L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 4L, 400L));
        List<RowData> expectedRows = Arrays.asList(this.outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), this.outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L), this.outputRecord(RowKind.INSERT, "key1", 5L, 500L, 8L), this.outputRecord(RowKind.INSERT, "key1", 6L, 600L, 14L), this.outputRecord(RowKind.INSERT, "key2", 1L, 100L, 1L), this.outputRecord(RowKind.INSERT, "key2", 2L, 200L, 3L), this.outputRecord(RowKind.INSERT, "key1", 4L, 400L, 7L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 8L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 12L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 14L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 18L));
        List actualRows = testHarness.extractOutputValues();
        this.validateRows(actualRows, expectedRows);
    }

    @Test
    public void testInsertOnlyRecordsWithDuplicateSortKeys() throws Exception {
        KeyedProcessOperator operator = new KeyedProcessOperator(this.getNonTimeRangeUnboundedPrecedingFunction(0L, GENERATED_SORT_KEY_COMPARATOR_ASC));
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)operator);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 200L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 502L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 501L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 6L, 600L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 2L, 200L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 203L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 201L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 4L, 400L));
        List<RowData> expectedRows = Arrays.asList(this.outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), this.outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L), this.outputRecord(RowKind.INSERT, "key1", 5L, 500L, 8L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 8L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 13L), this.outputRecord(RowKind.INSERT, "key1", 5L, 502L, 13L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 13L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 18L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 502L, 13L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 502L, 18L), this.outputRecord(RowKind.INSERT, "key1", 5L, 501L, 18L), this.outputRecord(RowKind.INSERT, "key1", 6L, 600L, 24L), this.outputRecord(RowKind.INSERT, "key2", 1L, 100L, 1L), this.outputRecord(RowKind.INSERT, "key2", 2L, 200L, 3L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 3L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 2L, 200L, 5L), this.outputRecord(RowKind.INSERT, "key1", 2L, 203L, 5L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 18L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 20L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 502L, 18L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 502L, 20L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 501L, 18L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 501L, 20L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 24L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 26L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 5L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 2L, 200L, 7L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 203L, 5L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 2L, 203L, 7L), this.outputRecord(RowKind.INSERT, "key1", 2L, 201L, 7L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 20L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 22L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 502L, 20L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 502L, 22L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 501L, 20L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 501L, 22L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 26L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 28L), this.outputRecord(RowKind.INSERT, "key1", 4L, 400L, 11L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 22L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 26L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 502L, 22L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 502L, 26L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 501L, 22L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 501L, 26L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 28L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 32L));
        List actualRows = testHarness.extractOutputValues();
        this.validateRows(actualRows, expectedRows);
    }

    @Test
    public void testRetractingRecordsWithCustomSortKey() throws Exception {
        KeyedProcessOperator operator = new KeyedProcessOperator(this.getNonTimeRangeUnboundedPrecedingFunction(0L, GENERATED_SORT_KEY_COMPARATOR_ASC));
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)operator);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 200L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 6L, 600L));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("key1", 2L, 200L));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord("key1", 3L, 200L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 2L, 200L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key3", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 4L, 400L));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("key1", 3L, 200L));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord("key1", 3L, 300L));
        List<RowData> expectedRows = Arrays.asList(this.outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), this.outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L), this.outputRecord(RowKind.INSERT, "key1", 5L, 500L, 8L), this.outputRecord(RowKind.INSERT, "key1", 6L, 600L, 14L), this.outputRecord(RowKind.DELETE, "key1", 2L, 200L, 3L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 8L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 6L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 14L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 12L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 3L, 200L, 4L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 6L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 9L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 12L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 15L), this.outputRecord(RowKind.INSERT, "key2", 1L, 100L, 1L), this.outputRecord(RowKind.INSERT, "key2", 2L, 200L, 3L), this.outputRecord(RowKind.INSERT, "key3", 1L, 100L, 1L), this.outputRecord(RowKind.INSERT, "key1", 4L, 400L, 8L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 9L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 13L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 15L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 19L), this.outputRecord(RowKind.DELETE, "key1", 3L, 200L, 4L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 4L, 400L, 8L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 4L, 400L, 5L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 13L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 10L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 19L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 16L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 3L, 300L, 4L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 4L, 400L, 5L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 4L, 400L, 8L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 10L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 13L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 16L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 19L));
        List actualRows = testHarness.extractOutputValues();
        this.validateRows(actualRows, expectedRows);
    }

    @Test
    public void testRetractWithFirstDuplicateSortKey() throws Exception {
        KeyedProcessOperator operator = new KeyedProcessOperator(this.getNonTimeRangeUnboundedPrecedingFunction(0L, GENERATED_SORT_KEY_COMPARATOR_ASC));
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)operator);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 200L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 201L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 502L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 501L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 6L, 600L));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("key1", 5L, 500L));
        List<RowData> expectedRows = Arrays.asList(this.outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), this.outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 3L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 2L, 200L, 5L), this.outputRecord(RowKind.INSERT, "key1", 2L, 201L, 5L), this.outputRecord(RowKind.INSERT, "key1", 5L, 500L, 10L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 10L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 15L), this.outputRecord(RowKind.INSERT, "key1", 5L, 502L, 15L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 15L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 20L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 502L, 15L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 502L, 20L), this.outputRecord(RowKind.INSERT, "key1", 5L, 501L, 20L), this.outputRecord(RowKind.INSERT, "key1", 6L, 600L, 26L), this.outputRecord(RowKind.DELETE, "key1", 5L, 500L, 20L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 502L, 20L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 502L, 15L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 501L, 20L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 501L, 15L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 26L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 21L));
        List actualRows = testHarness.extractOutputValues();
        this.validateRows(actualRows, expectedRows);
    }

    @Test
    public void testRetractWithMiddleDuplicateSortKey() throws Exception {
        KeyedProcessOperator operator = new KeyedProcessOperator(this.getNonTimeRangeUnboundedPrecedingFunction(0L, GENERATED_SORT_KEY_COMPARATOR_ASC));
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)operator);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 200L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 201L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 502L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 501L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 6L, 600L));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("key1", 5L, 502L));
        List<RowData> expectedRows = Arrays.asList(this.outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), this.outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 3L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 2L, 200L, 5L), this.outputRecord(RowKind.INSERT, "key1", 2L, 201L, 5L), this.outputRecord(RowKind.INSERT, "key1", 5L, 500L, 10L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 10L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 15L), this.outputRecord(RowKind.INSERT, "key1", 5L, 502L, 15L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 15L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 20L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 502L, 15L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 502L, 20L), this.outputRecord(RowKind.INSERT, "key1", 5L, 501L, 20L), this.outputRecord(RowKind.INSERT, "key1", 6L, 600L, 26L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 20L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 15L), this.outputRecord(RowKind.DELETE, "key1", 5L, 502L, 20L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 501L, 20L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 501L, 15L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 26L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 21L));
        List actualRows = testHarness.extractOutputValues();
        this.validateRows(actualRows, expectedRows);
    }

    @Test
    public void testRetractWithLastDuplicateSortKey() throws Exception {
        KeyedProcessOperator operator = new KeyedProcessOperator(this.getNonTimeRangeUnboundedPrecedingFunction(0L, GENERATED_SORT_KEY_COMPARATOR_ASC));
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)operator);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 200L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 201L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 502L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 501L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 6L, 600L));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("key1", 5L, 501L));
        List<RowData> expectedRows = Arrays.asList(this.outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), this.outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 3L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 2L, 200L, 5L), this.outputRecord(RowKind.INSERT, "key1", 2L, 201L, 5L), this.outputRecord(RowKind.INSERT, "key1", 5L, 500L, 10L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 10L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 15L), this.outputRecord(RowKind.INSERT, "key1", 5L, 502L, 15L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 15L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 20L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 502L, 15L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 502L, 20L), this.outputRecord(RowKind.INSERT, "key1", 5L, 501L, 20L), this.outputRecord(RowKind.INSERT, "key1", 6L, 600L, 26L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 20L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 15L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 502L, 20L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 502L, 15L), this.outputRecord(RowKind.DELETE, "key1", 5L, 501L, 20L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 26L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 21L));
        List actualRows = testHarness.extractOutputValues();
        this.validateRows(actualRows, expectedRows);
    }

    @Test
    public void testRetractWithDescendingSort() throws Exception {
        KeyedProcessOperator operator = new KeyedProcessOperator(this.getNonTimeRangeUnboundedPrecedingFunction(0L, GENERATED_SORT_KEY_COMPARATOR_DESC));
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)operator);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 200L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 201L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 6L, 600L));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("key1", 2L, 200L));
        List<RowData> expectedRows = Arrays.asList(this.outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), this.outputRecord(RowKind.INSERT, "key1", 2L, 200L, 2L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 1L, 100L, 1L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 1L, 100L, 3L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 2L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 2L, 200L, 4L), this.outputRecord(RowKind.INSERT, "key1", 2L, 201L, 4L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 1L, 100L, 3L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 1L, 100L, 5L), this.outputRecord(RowKind.INSERT, "key1", 5L, 500L, 5L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 4L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 2L, 200L, 9L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 201L, 4L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 2L, 201L, 9L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 1L, 100L, 5L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 1L, 100L, 10L), this.outputRecord(RowKind.INSERT, "key1", 6L, 600L, 6L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 5L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 11L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 9L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 2L, 200L, 15L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 201L, 9L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 2L, 201L, 15L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 1L, 100L, 10L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 1L, 100L, 16L), this.outputRecord(RowKind.DELETE, "key1", 2L, 200L, 15L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 201L, 15L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 2L, 201L, 13L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 1L, 100L, 16L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 1L, 100L, 14L));
        List actualRows = testHarness.extractOutputValues();
        this.validateRows(actualRows, expectedRows);
    }

    @Test
    public void testRetractWithEarlyOut() throws Exception {
        KeyedProcessOperator operator = new KeyedProcessOperator(this.getNonTimeRangeUnboundedPrecedingFunction(0L, GENERATED_SORT_KEY_COMPARATOR_ASC));
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)operator);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 0L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 0L, 101L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 0L, 102L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1L, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 200L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 201L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 502L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 501L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 6L, 600L));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("key1", 0L, 100L));
        List<RowData> expectedRows = Arrays.asList(this.outputRecord(RowKind.INSERT, "key1", 0L, 100L, 0L), this.outputRecord(RowKind.INSERT, "key1", 0L, 101L, 0L), this.outputRecord(RowKind.INSERT, "key1", 0L, 102L, 0L), this.outputRecord(RowKind.INSERT, "key1", 1L, 100L, 1L), this.outputRecord(RowKind.INSERT, "key1", 2L, 200L, 3L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 3L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 2L, 200L, 5L), this.outputRecord(RowKind.INSERT, "key1", 2L, 201L, 5L), this.outputRecord(RowKind.INSERT, "key1", 5L, 500L, 10L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 10L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 15L), this.outputRecord(RowKind.INSERT, "key1", 5L, 502L, 15L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 15L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 20L), this.outputRecord(RowKind.UPDATE_BEFORE, "key1", 5L, 502L, 15L), this.outputRecord(RowKind.UPDATE_AFTER, "key1", 5L, 502L, 20L), this.outputRecord(RowKind.INSERT, "key1", 5L, 501L, 20L), this.outputRecord(RowKind.INSERT, "key1", 6L, 600L, 26L), this.outputRecord(RowKind.DELETE, "key1", 0L, 100L, 0L));
        List actualRows = testHarness.extractOutputValues();
        this.validateRows(actualRows, expectedRows);
    }

    @Test
    public void testInsertAndRetractAllWithStateValidation() throws Exception {
        NonTimeRangeUnboundedPrecedingFunction<RowData> function = this.getNonTimeRangeUnboundedPrecedingFunction(0L, GENERATED_SORT_KEY_COMPARATOR_ASC);
        KeyedProcessOperator operator = new KeyedProcessOperator(function);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)operator);
        testHarness.open();
        GenericRowData firstRecord = GenericRowData.of((Object[])new Object[]{"key1", 1L, 100L});
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1L, 100L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)firstRecord, 0, 1, 0, 1, 0, 1, true);
        GenericRowData secondRecord = GenericRowData.of((Object[])new Object[]{"key1", 2L, 200L});
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 200L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)secondRecord, 1, 2, 0, 1, 1, 2, true);
        GenericRowData thirdRecord = GenericRowData.of((Object[])new Object[]{"key1", 2L, 201L});
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 201L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)thirdRecord, 1, 2, 1, 2, 2, 3, true);
        GenericRowData fourthRecord = GenericRowData.of((Object[])new Object[]{"key1", 5L, 500L});
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 500L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)fourthRecord, 2, 3, 0, 1, 3, 4, true);
        GenericRowData fifthRecord = GenericRowData.of((Object[])new Object[]{"key1", 5L, 502L});
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 502L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)fifthRecord, 2, 3, 1, 2, 4, 5, true);
        GenericRowData sixthRecord = GenericRowData.of((Object[])new Object[]{"key1", 5L, 501L});
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 501L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)sixthRecord, 2, 3, 2, 3, 5, 6, true);
        GenericRowData seventhRecord = GenericRowData.of((Object[])new Object[]{"key1", 6L, 600L});
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 6L, 600L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)seventhRecord, 3, 4, 0, 1, 6, 7, true);
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("key1", 5L, 502L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)fifthRecord, 2, 4, 1, 2, 4, 6, false);
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("key1", 6L, 600L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)seventhRecord, 3, 3, 0, 0, 6, 5, false);
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("key1", 2L, 201L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)thirdRecord, 1, 3, 1, 1, 2, 4, false);
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("key1", 2L, 200L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)secondRecord, 1, 2, -1, 0, 1, 3, false);
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("key1", 5L, 500L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)fourthRecord, 1, 2, 0, 1, 3, 2, false);
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("key1", 5L, 501L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)sixthRecord, 1, 1, -1, 0, 5, 1, false);
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("key1", 1L, 100L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)firstRecord, 0, 0, -1, 0, 0, 0, false);
        List actualRows = testHarness.extractOutputValues();
        Assertions.assertThat((int)actualRows.size()).isEqualTo(40);
        Assertions.assertThat((long)function.getNumOfSortKeysNotFound().getCount()).isEqualTo(0L);
        Assertions.assertThat((long)function.getNumOfIdsNotFound().getCount()).isEqualTo(0L);
    }

    @Test
    public void testInsertWithStateTTLExpiration() throws Exception {
        Duration stateTtlTime = Duration.ofMillis(10L);
        NonTimeRangeUnboundedPrecedingFunction<RowData> function = this.getNonTimeRangeUnboundedPrecedingFunction(stateTtlTime.toMillis(), GENERATED_SORT_KEY_COMPARATOR_ASC);
        KeyedProcessOperator operator = new KeyedProcessOperator(function);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)operator);
        testHarness.open();
        GenericRowData firstRecord = GenericRowData.of((Object[])new Object[]{"key1", 1L, 100L});
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1L, 100L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)firstRecord, 0, 1, 0, 1, 0, 1, true);
        GenericRowData secondRecord = GenericRowData.of((Object[])new Object[]{"key1", 2L, 200L});
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 200L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)secondRecord, 1, 2, 0, 1, 1, 2, true);
        GenericRowData thirdRecord = GenericRowData.of((Object[])new Object[]{"key1", 2L, 201L});
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 201L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)thirdRecord, 1, 2, 1, 2, 2, 3, true);
        testHarness.setStateTtlProcessingTime(stateTtlTime.toMillis() + 1L);
        GenericRowData fourthRecord = GenericRowData.of((Object[])new Object[]{"key1", 5L, 500L});
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 500L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)fourthRecord, 0, 1, 0, 1, 0, 1, true);
        List actualRows = testHarness.extractOutputValues();
        Assertions.assertThat((int)actualRows.size()).isEqualTo(6);
        Assertions.assertThat((long)function.getNumOfSortKeysNotFound().getCount()).isEqualTo(0L);
        Assertions.assertThat((long)function.getNumOfIdsNotFound().getCount()).isEqualTo(0L);
    }

    @Test
    public void testInsertAndRetractWithStateTTLExpiration() throws Exception {
        Duration stateTtlTime = Duration.ofMillis(10L);
        NonTimeRangeUnboundedPrecedingFunction<RowData> function = this.getNonTimeRangeUnboundedPrecedingFunction(stateTtlTime.toMillis(), GENERATED_SORT_KEY_COMPARATOR_ASC);
        KeyedProcessOperator operator = new KeyedProcessOperator(function);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)operator);
        testHarness.open();
        GenericRowData firstRecord = GenericRowData.of((Object[])new Object[]{"key1", 1L, 100L});
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1L, 100L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)firstRecord, 0, 1, 0, 1, 0, 1, true);
        GenericRowData secondRecord = GenericRowData.of((Object[])new Object[]{"key1", 2L, 200L});
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 200L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)secondRecord, 1, 2, 0, 1, 1, 2, true);
        GenericRowData thirdRecord = GenericRowData.of((Object[])new Object[]{"key1", 2L, 201L});
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2L, 201L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)thirdRecord, 1, 2, 1, 2, 2, 3, true);
        GenericRowData fourthRecord = GenericRowData.of((Object[])new Object[]{"key1", 5L, 500L});
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 500L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)fourthRecord, 2, 3, 0, 1, 3, 4, true);
        GenericRowData fifthRecord = GenericRowData.of((Object[])new Object[]{"key1", 5L, 502L});
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 5L, 502L));
        this.validateState((AbstractNonTimeUnboundedPrecedingOver<RowData>)function, (RowData)fifthRecord, 2, 3, 1, 2, 4, 5, true);
        testHarness.setStateTtlProcessingTime(stateTtlTime.toMillis() + 1L);
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("key1", 5L, 502L));
        List sortedList = (List)function.getRuntimeContext().getState(function.sortedListStateDescriptor).value();
        Assertions.assertThat((List)sortedList).isNull();
        MapState mapState = function.getRuntimeContext().getMapState(function.accStateDescriptor);
        Assertions.assertThat((boolean)mapState.isEmpty()).isTrue();
        Long idValue = (Long)function.getRuntimeContext().getState(function.idStateDescriptor).value();
        Assertions.assertThat((Long)idValue).isNull();
        List actualRows = testHarness.extractOutputValues();
        Assertions.assertThat((int)actualRows.size()).isEqualTo(9);
        Assertions.assertThat((long)function.getNumOfSortKeysNotFound().getCount()).isEqualTo(1L);
        Assertions.assertThat((long)function.getNumOfIdsNotFound().getCount()).isEqualTo(0L);
    }

    @Override
    void validateNumAccRows(int numAccRows, int expectedNumAccRows, int totalRows) {
        Assertions.assertThat((int)numAccRows).isEqualTo(expectedNumAccRows);
    }

    @Override
    void validateEntry(AbstractNonTimeUnboundedPrecedingOver<RowData> function, RowData record, int idOffset) throws Exception {
        Assertions.assertThat((Object)((RowData)function.getRuntimeContext().getMapState(function.accStateDescriptor).get((Object)((RowData)SORT_KEY_SELECTOR.getKey((Object)record))))).isNotNull();
    }
}

