package org.apache.flink.table.runtime.operators.deduplicate;

import java.util.ArrayList;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunctionTest.class */
public class RowTimeMiniBatchLatestChangeDeduplicateFunctionTest extends RowTimeDeduplicateFunctionTestBase {
    @Test
    public void testKeepLastRowWithoutGenerateUpdateBeforeAndWithGenerateInsert() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(false, true, true);
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 10, 1L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 11, 2L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 13, 1L));
        Assert.assertTrue(createTestHarness.getOutput().isEmpty());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 12, 1L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord("book", 11, 2L));
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 14, 3L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 15, 1L));
        createTestHarness.processWatermark(new Watermark(3L));
        arrayList.add(StreamRecordUtils.updateAfterRecord("book", 14, 3L));
        arrayList.add(new Watermark(3L));
        createTestHarness.close();
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
    }

    @Test
    public void testKeepLastRowWithoutGenerateUpdateBeforeAndWithoutGenerateInsert() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(false, false, true);
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 10, 1L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 11, 2L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 13, 1L));
        Assert.assertTrue(createTestHarness.getOutput().isEmpty());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 12, 1L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.updateAfterRecord("book", 11, 2L));
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 14, 3L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 15, 1L));
        createTestHarness.processWatermark(new Watermark(3L));
        arrayList.add(StreamRecordUtils.updateAfterRecord("book", 14, 3L));
        arrayList.add(new Watermark(3L));
        createTestHarness.close();
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
    }

    @Test
    public void testKeepLastRowWithGenerateUpdateBeforeAndWithGenerateInsert() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(true, true, true);
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 10, 1L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 11, 2L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 13, 1L));
        Assert.assertTrue(createTestHarness.getOutput().isEmpty());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 12, 1L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord("book", 11, 2L));
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 14, 3L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 15, 1L));
        createTestHarness.processWatermark(new Watermark(3L));
        arrayList.add(StreamRecordUtils.updateBeforeRecord("book", 11, 2L));
        arrayList.add(StreamRecordUtils.updateAfterRecord("book", 14, 3L));
        arrayList.add(new Watermark(3L));
        createTestHarness.close();
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
    }

    @Test
    public void testKeepLastRowWithGenerateUpdateBeforeAndWithoutGenerateInsert() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(true, false, true);
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 10, 1L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 11, 2L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 13, 1L));
        Assert.assertTrue(createTestHarness.getOutput().isEmpty());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 12, 1L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord("book", 11, 2L));
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 14, 3L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 15, 1L));
        createTestHarness.processWatermark(new Watermark(3L));
        arrayList.add(StreamRecordUtils.updateBeforeRecord("book", 11, 2L));
        arrayList.add(StreamRecordUtils.updateAfterRecord("book", 14, 3L));
        arrayList.add(new Watermark(3L));
        createTestHarness.close();
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
    }

    @Test
    public void testKeepLastRowWithGenerateUpdateBeforeAndWithGenerateInsertAndStateTtl() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(true, true, true);
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 10, 1L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 11, 2L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 13, 1L));
        Assert.assertTrue(createTestHarness.getOutput().isEmpty());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 12, 1L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord("book", 11, 2L));
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.setStateTtlProcessingTime(10L);
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 14, 3L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 15, 1L));
        createTestHarness.processWatermark(new Watermark(3L));
        arrayList.add(StreamRecordUtils.insertRecord("book", 14, 3L));
        arrayList.add(new Watermark(3L));
        createTestHarness.close();
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
    }

    @Test
    public void testKeepFirstRowWithoutGenerateUpdateBeforeAndWithGenerateInsert() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(false, true, false);
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 10, 1L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 11, 2L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 13, 1L));
        Assert.assertTrue(createTestHarness.getOutput().isEmpty());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 12, 1L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord("book", 10, 1L));
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 14, 3L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 15, 1L));
        createTestHarness.processWatermark(new Watermark(3L));
        arrayList.add(new Watermark(3L));
        createTestHarness.close();
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
    }

    @Test
    public void testKeepFirstRowWithoutGenerateUpdateBeforeAndWithoutGenerateInsert() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(false, false, false);
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 10, 1L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 11, 2L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 13, 1L));
        Assert.assertTrue(createTestHarness.getOutput().isEmpty());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 12, 1L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.updateAfterRecord("book", 10, 1L));
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 14, 3L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 15, 1L));
        createTestHarness.processWatermark(new Watermark(3L));
        arrayList.add(new Watermark(3L));
        createTestHarness.close();
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
    }

    @Test
    public void testKeepFirstRowWithGenerateUpdateBeforeAndWithGenerateInsert() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(true, true, false);
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 10, 1L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 11, 2L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 13, 1L));
        Assert.assertTrue(createTestHarness.getOutput().isEmpty());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 12, 1L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord("book", 10, 1L));
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 14, 3L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 15, 1L));
        createTestHarness.processWatermark(new Watermark(3L));
        arrayList.add(new Watermark(3L));
        createTestHarness.close();
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
    }

    @Test
    public void testKeepFirstRowWithGenerateUpdateBeforeAndWithoutGenerateInsert() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(true, false, false);
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 10, 1L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 11, 2L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 13, 1L));
        Assert.assertTrue(createTestHarness.getOutput().isEmpty());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 12, 1L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord("book", 10, 1L));
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 14, 3L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 15, 1L));
        createTestHarness.processWatermark(new Watermark(3L));
        arrayList.add(new Watermark(3L));
        createTestHarness.close();
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
    }

    @Test
    public void testKeepFirstRowWithGenerateUpdateBeforeAndWithGenerateInsertAndStateTtl() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(true, true, false);
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 10, 1L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 11, 2L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 13, 1L));
        Assert.assertTrue(createTestHarness.getOutput().isEmpty());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 12, 1L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord("book", 10, 1L));
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
        createTestHarness.setStateTtlProcessingTime(10L);
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 14, 3L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("book", 15, 1L));
        createTestHarness.processWatermark(new Watermark(3L));
        arrayList.add(StreamRecordUtils.insertRecord("book", 15, 1L));
        arrayList.add(new Watermark(3L));
        createTestHarness.close();
        this.assertor.assertOutputEqualsSorted("output wrong.", arrayList, createTestHarness.getOutput());
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(boolean z, boolean z2, boolean z3) throws Exception {
        return createTestHarness(new KeyedMapBundleOperator<>(new RowTimeMiniBatchLatestChangeDeduplicateFunction(this.inputRowType, this.serializer, this.minTtlTime.toMilliseconds(), this.rowTimeIndex, z, z2, z3), new CountBundleTrigger(4L)));
    }
}
