/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.deduplicate;

import java.util.ArrayList;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunctionTestBase;
import org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateKeepFirstRowFunction;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class RowTimeDeduplicateKeepFirstRowFunctionTest
extends RowTimeDeduplicateFunctionTestBase {
    @Test
    public void testRowTimeDeduplicateKeepFirstRow() throws Exception {
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        RowTimeDeduplicateKeepFirstRowFunction deduplicateFunction = new RowTimeDeduplicateKeepFirstRowFunction(this.inputRowType, this.minTtlTime.toMillis(), this.rowTimeIndex);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((OneInputStreamOperator<RowData, RowData>)new KeyedProcessOperator((KeyedProcessFunction)deduplicateFunction));
        ArrayList<Object> actualOutput = new ArrayList<Object>();
        testHarness.open();
        testHarness.processWatermark(new Watermark(50L));
        Assertions.assertThat((long)deduplicateFunction.getNumLateRecordsDropped().getCount()).isEqualTo(0L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 0, 1L));
        expectedOutput.add(new Watermark(50L));
        Assertions.assertThat((long)deduplicateFunction.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 14, 101L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 13, 99L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 15, 99L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 100L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 101L));
        testHarness.processWatermark(new Watermark(102L));
        actualOutput.addAll(testHarness.getOutput());
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 13, 99L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 101L));
        expectedOutput.add(new Watermark(102L));
        Assertions.assertThat((long)deduplicateFunction.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        deduplicateFunction = new RowTimeDeduplicateKeepFirstRowFunction(this.inputRowType, this.minTtlTime.toMillis(), this.rowTimeIndex);
        testHarness = this.createTestHarness((OneInputStreamOperator<RowData, RowData>)new KeyedProcessOperator((KeyedProcessFunction)deduplicateFunction));
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 300L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 301L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key3", 5, 299L));
        testHarness.processWatermark(new Watermark(302L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key3", 5, 299L));
        expectedOutput.add(new Watermark(302L));
        testHarness.setStateTtlProcessingTime(this.minTtlTime.toMillis() + 1L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 400L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 401L));
        testHarness.processWatermark(402L);
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 12, 400L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 401L));
        expectedOutput.add(new Watermark(402L));
        testHarness.setStateTtlProcessingTime(2L * this.minTtlTime.toMillis() + 1L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 22, 500L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 21, 501L));
        testHarness.setStateTtlProcessingTime(3L * this.minTtlTime.toMillis() + 1L);
        testHarness.processWatermark(502L);
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 22, 500L));
        expectedOutput.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 21, 501L));
        expectedOutput.add(new Watermark(502L));
        actualOutput.addAll(testHarness.getOutput());
        this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, actualOutput);
        testHarness.close();
    }
}

