/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.deduplicate;

import java.util.ArrayList;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateFunctionTestBase;
import org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class ProcTimeDeduplicateKeepLastRowFunctionTest
extends ProcTimeDeduplicateFunctionTestBase {
    ProcTimeDeduplicateKeepLastRowFunctionTest() {
    }

    private ProcTimeDeduplicateKeepLastRowFunction createFunctionWithoutStateTtl(boolean generateUpdateBefore, boolean generateInsert) {
        return new ProcTimeDeduplicateKeepLastRowFunction(this.inputRowType, 0L, generateUpdateBefore, generateInsert, true, generatedEqualiser, null);
    }

    private ProcTimeDeduplicateKeepLastRowFunction createFunction(boolean generateUpdateBefore, boolean generateInsert) {
        return new ProcTimeDeduplicateKeepLastRowFunction(this.inputRowType, this.minTime.toMillis(), generateUpdateBefore, generateInsert, true, generatedEqualiser, null);
    }

    private ProcTimeDeduplicateKeepLastRowFunction createFunctionWithFilter(boolean generateUpdateBefore) {
        return new ProcTimeDeduplicateKeepLastRowFunction(this.inputRowType, this.minTime.toMillis(), generateUpdateBefore, true, false, generatedEqualiser, generatedFilterCondition);
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(ProcTimeDeduplicateKeepLastRowFunction func) throws Exception {
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)func);
        return new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)this.rowKeySelector, (TypeInformation)this.rowKeySelector.getProducedType());
    }

    @Test
    void testWithoutGenerateUpdateBefore() throws Exception {
        ProcTimeDeduplicateKeepLastRowFunction func = this.createFunction(false, true);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 13));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 12));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 11));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 1L, 13));
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    void testWithoutGenerateUpdateBeforeAndInsert() throws Exception {
        ProcTimeDeduplicateKeepLastRowFunction func = this.createFunction(false, false);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 13));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 1L, 12));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 2L, 11));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 1L, 13));
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    void testWithGenerateUpdateBefore() throws Exception {
        ProcTimeDeduplicateKeepLastRowFunction func = this.createFunction(true, true);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 12));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("book", 1L, 12));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 1L, 12));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 11));
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    void testWithStateTtlDisabled() throws Exception {
        ProcTimeDeduplicateKeepLastRowFunction func = this.createFunctionWithoutStateTtl(true, true);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 12));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 11));
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    void testWithGenerateUpdateBeforeAndStateTtl() throws Exception {
        ProcTimeDeduplicateKeepLastRowFunction func = this.createFunction(true, true);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 13));
        testHarness.setStateTtlProcessingTime(30L);
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 17));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 18));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 19));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 12));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("book", 1L, 12));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 1L, 13));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 11));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 17));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("book", 1L, 17));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 1L, 19));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 18));
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testWithFilterCondition(boolean updateBefore) throws Exception {
        ProcTimeDeduplicateKeepLastRowFunction func = this.createFunctionWithFilter(updateBefore);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.updateAfterRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord("book", 1L, 15));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord("book", 1L, 8));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord("book", 1L, 9));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord("book", 1L, 13));
        testHarness.processElement(StreamRecordUtils.deleteRecord("book", 1L, null));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 12));
        if (updateBefore) {
            expectedOutput.add(StreamRecordUtils.updateBeforeRecord("book", 1L, 12));
        }
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 1L, 15));
        expectedOutput.add(StreamRecordUtils.deleteRecord("book", 1L, 15));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 13));
        expectedOutput.add(StreamRecordUtils.deleteRecord("book", 1L, 13));
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }
}

