package org.apache.flink.table.runtime.operators.deduplicate;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.types.RowKind;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.class */
public class RowTimeDeduplicateFunctionTest {
    private final long miniBatchSize = 4;
    private Time minTtlTime = Time.milliseconds(10);
    private InternalTypeInfo inputRowType = InternalTypeInfo.ofFields(new LogicalType[]{new VarCharType(Integer.MAX_VALUE), new IntType(), new BigIntType()});
    private TypeSerializer<RowData> serializer = this.inputRowType.toSerializer();
    private int rowTimeIndex = 2;
    private int rowKeyIndex = 0;
    private RowDataKeySelector rowKeySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{this.rowKeyIndex}, this.inputRowType.toRowFieldTypes());
    private RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(this.inputRowType.toRowFieldTypes(), new GenericRowRecordSortComparator(this.rowKeyIndex, this.inputRowType.toRowFieldTypes()[this.rowKeyIndex]));
    private final boolean miniBatchEnable;

    public RowTimeDeduplicateFunctionTest(boolean z) {
        this.miniBatchEnable = z;
    }

    @Test
    public void testRowTimeDeduplicateKeepFirstRow() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 13, 99L));
        arrayList.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 101L));
        arrayList.add(new Watermark(102L));
        arrayList.add(StreamRecordUtils.record(RowKind.INSERT, "key3", 5, 299L));
        arrayList.add(new Watermark(302L));
        arrayList.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 12, 400L));
        arrayList.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 401L));
        arrayList.add(new Watermark(402L));
        testRowTimeDeduplicateKeepFirstRow(true, true, arrayList);
        testRowTimeDeduplicateKeepFirstRow(true, false, arrayList);
        testRowTimeDeduplicateKeepFirstRow(false, true, arrayList);
        arrayList.clear();
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 13, 99L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 101L));
        arrayList.add(new Watermark(102L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key3", 5, 299L));
        arrayList.add(new Watermark(302L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 400L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 401L));
        arrayList.add(new Watermark(402L));
        testRowTimeDeduplicateKeepFirstRow(false, false, arrayList);
    }

    @Test
    public void testRowTimeDeduplicateKeepLastRow() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 13, 99L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_BEFORE, "key1", 13, 99L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 100L));
        arrayList.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 101L));
        arrayList.add(new Watermark(102L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_BEFORE, "key1", 12, 100L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 300L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_BEFORE, "key2", 11, 101L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 301L));
        arrayList.add(StreamRecordUtils.record(RowKind.INSERT, "key3", 5, 299L));
        arrayList.add(new Watermark(302L));
        arrayList.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 12, 400L));
        arrayList.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 401L));
        arrayList.add(new Watermark(402L));
        testRowTimeDeduplicateKeepLastRow(true, true, arrayList);
        testRowTimeDeduplicateKeepLastRow(true, false, arrayList);
        arrayList.clear();
        arrayList.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 13, 99L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 100L));
        arrayList.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 101L));
        arrayList.add(new Watermark(102L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 300L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 301L));
        arrayList.add(StreamRecordUtils.record(RowKind.INSERT, "key3", 5, 299L));
        arrayList.add(new Watermark(302L));
        arrayList.add(StreamRecordUtils.record(RowKind.INSERT, "key1", 12, 400L));
        arrayList.add(StreamRecordUtils.record(RowKind.INSERT, "key2", 11, 401L));
        arrayList.add(new Watermark(402L));
        testRowTimeDeduplicateKeepLastRow(false, true, arrayList);
        arrayList.clear();
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 13, 99L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 100L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 101L));
        arrayList.add(new Watermark(102L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 300L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 301L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key3", 5, 299L));
        arrayList.add(new Watermark(302L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key1", 12, 400L));
        arrayList.add(StreamRecordUtils.record(RowKind.UPDATE_AFTER, "key2", 11, 401L));
        arrayList.add(new Watermark(402L));
        testRowTimeDeduplicateKeepLastRow(false, false, arrayList);
    }

    private void testRowTimeDeduplicateKeepFirstRow(boolean z, boolean z2, List<Object> list) throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness;
        KeyedMapBundleOperator<RowData, RowData, RowData, RowData> keyedMapBundleOperator = null;
        KeyedProcessOperator<RowData, RowData, RowData> keyedProcessOperator = null;
        if (this.miniBatchEnable) {
            keyedMapBundleOperator = new KeyedMapBundleOperator<>(new RowTimeMiniBatchDeduplicateFunction(this.inputRowType, this.serializer, this.minTtlTime.toMilliseconds(), this.rowTimeIndex, z, z2, false), new CountBundleTrigger(4L));
            createTestHarness = createTestHarness(keyedMapBundleOperator);
        } else {
            keyedProcessOperator = new KeyedProcessOperator<>(new RowTimeDeduplicateFunction(this.inputRowType, this.minTtlTime.toMilliseconds(), this.rowTimeIndex, z, z2, false));
            createTestHarness = createTestHarness(keyedProcessOperator);
        }
        ArrayList arrayList = new ArrayList();
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 13, 99L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 13, 99L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 100L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 101L));
        createTestHarness.processWatermark(new Watermark(102L));
        arrayList.addAll(createTestHarness.getOutput());
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness2 = this.miniBatchEnable ? createTestHarness(keyedMapBundleOperator) : createTestHarness(keyedProcessOperator);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 12, 300L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 11, 301L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key3", 5, 299L));
        createTestHarness2.processWatermark(new Watermark(302L));
        createTestHarness2.setStateTtlProcessingTime(this.minTtlTime.toMilliseconds() + 1);
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 12, 400L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 11, 401L));
        createTestHarness2.processWatermark(402L);
        arrayList.addAll(createTestHarness2.getOutput());
        this.assertor.assertOutputEqualsSorted("output wrong.", list, arrayList);
        createTestHarness2.close();
    }

    private void testRowTimeDeduplicateKeepLastRow(boolean z, boolean z2, List<Object> list) throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness;
        KeyedMapBundleOperator<RowData, RowData, RowData, RowData> keyedMapBundleOperator = null;
        KeyedProcessOperator<RowData, RowData, RowData> keyedProcessOperator = null;
        if (this.miniBatchEnable) {
            keyedMapBundleOperator = new KeyedMapBundleOperator<>(new RowTimeMiniBatchDeduplicateFunction(this.inputRowType, this.serializer, this.minTtlTime.toMilliseconds(), this.rowTimeIndex, z, z2, true), new CountBundleTrigger(4L));
            createTestHarness = createTestHarness(keyedMapBundleOperator);
        } else {
            keyedProcessOperator = new KeyedProcessOperator<>(new RowTimeDeduplicateFunction(this.inputRowType, this.minTtlTime.toMilliseconds(), this.rowTimeIndex, z, z2, true));
            createTestHarness = createTestHarness(keyedProcessOperator);
        }
        ArrayList arrayList = new ArrayList();
        createTestHarness.open();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 13, 99L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 12, 100L));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 11, 101L));
        createTestHarness.processWatermark(new Watermark(102L));
        arrayList.addAll(createTestHarness.getOutput());
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness2 = this.miniBatchEnable ? createTestHarness(keyedMapBundleOperator) : createTestHarness(keyedProcessOperator);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 12, 300L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 11, 301L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key3", 5, 299L));
        createTestHarness2.processWatermark(new Watermark(302L));
        createTestHarness2.setStateTtlProcessingTime(this.minTtlTime.toMilliseconds() + 1);
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 12, 400L));
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 11, 401L));
        createTestHarness2.processWatermark(402L);
        arrayList.addAll(createTestHarness2.getOutput());
        this.assertor.assertOutputEqualsSorted("output wrong.", list, arrayList);
        createTestHarness2.close();
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(KeyedProcessOperator<RowData, RowData, RowData> keyedProcessOperator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness(keyedProcessOperator, this.rowKeySelector, this.rowKeySelector.getProducedType());
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(KeyedMapBundleOperator<RowData, RowData, RowData, RowData> keyedMapBundleOperator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness(keyedMapBundleOperator, this.rowKeySelector, this.rowKeySelector.getProducedType());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Parameterized.Parameters(name = "miniBatchEnable = {0}")
    public static Collection<Boolean[]> runMode() {
        return Arrays.asList(new Boolean[]{false}, new Boolean[]{true});
    }
}
