/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.sink;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerStateBackend;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.types.RowKind;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class SinkUpsertMaterializerTest {
    static final int UPSERT_KEY = 0;
    @Parameterized.Parameter
    public SinkUpsertMaterializerStateBackend stateBackend;
    static final StateTtlConfig TTL_CONFIG = StateConfigUtil.createTtlConfig((long)1000L);
    static final LogicalType[] LOGICAL_TYPES = new LogicalType[]{new BigIntType(), new IntType(), new VarCharType()};
    static final RowDataHarnessAssertor ASSERTOR = new RowDataHarnessAssertor(LOGICAL_TYPES);
    static final GeneratedRecordEqualiser EQUALISER = new GeneratedRecordEqualiser("", "", new Object[0]){

        public RecordEqualiser newInstance(ClassLoader classLoader) {
            return new TestRecordEqualiser();
        }
    };
    static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER = new GeneratedRecordEqualiser("", "", new Object[0]){

        public RecordEqualiser newInstance(ClassLoader classLoader) {
            return new TestUpsertKeyEqualiser();
        }
    };

    @Parameterized.Parameters(name="stateBackend={0}")
    public static Object[][] generateTestParameters() {
        ArrayList<Object[]> result = new ArrayList<Object[]>();
        for (SinkUpsertMaterializerStateBackend backend : SinkUpsertMaterializerStateBackend.values()) {
            result.add(new Object[]{backend});
        }
        return (Object[][])result.toArray((T[])new Object[0][]);
    }

    @Test
    public void testUpsertKeySerializerFailure() throws Exception {
        LogicalType[] types = new LogicalType[]{new VarCharType(), new IntType()};
        OneInputStreamOperator<RowData, RowData> materializer = this.createOperator(types, 1);
        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = SinkUpsertMaterializerTest.createHarness(materializer, this.stateBackend, types);){
            testHarness.open();
            RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(types);
            testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "any string", -1));
            assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "any string", -1));
            testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "any string", 999));
            assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "any string", 999));
        }
    }

    @Test
    public void testUpsertKeySerializerSilentCorruption() throws Exception {
        LogicalType[] types = new LogicalType[]{new VarCharType(), new BigIntType(), new IntType()};
        OneInputStreamOperator<RowData, RowData> materializer = this.createOperator(types, 1);
        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = SinkUpsertMaterializerTest.createHarness(materializer, this.stateBackend, types);){
            testHarness.open();
            RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(types);
            testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, "any string", 32L, 97));
            assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, "any string", 32L, 97));
            testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.DELETE, "any string", 32L, 98));
            assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, "any string", 32L, 98));
        }
    }

    @Test
    public void testUpsertEqualizer() throws Exception {
        LogicalType[] types = new LogicalType[]{new IntType(), new BigIntType()};
        OneInputStreamOperator<RowData, RowData> materializer = this.createOperator(types, 1);
        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = SinkUpsertMaterializerTest.createHarness(materializer, this.stateBackend, types);){
            testHarness.open();
            RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(types);
            testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.INSERT, 0, 33L));
            assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, 0, 33L));
            testHarness.processElement(StreamRecordUtils.binaryRecord(RowKind.DELETE, 1, 33L));
            assertor.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, 1, 33L));
        }
    }

    @Test
    public void testNoUpsertKeyFlow() throws Exception {
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarness(this.createOperatorWithoutUpsertKey());
        testHarness.open();
        testHarness.setStateTtlProcessingTime(1L);
        testHarness.processElement(StreamRecordUtils.insertRecord(1L, 1, "a1"));
        ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
        testHarness.processElement(StreamRecordUtils.insertRecord(2L, 1, "a2"));
        ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3L, 1, "a3"));
        ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(2L, 1, "a2"));
        ASSERTOR.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)testHarness);
        testHarness.processElement(StreamRecordUtils.deleteRecord(3L, 1, "a3"));
        ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(1L, 1, "a1"));
        ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, 1L, 1, "a1"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4L, 1, "a4"));
        ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, 4L, 1, "a4"));
        testHarness.setStateTtlProcessingTime(1002L);
        testHarness.processElement(StreamRecordUtils.deleteRecord(4L, 1, "a4"));
        ASSERTOR.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)testHarness);
        testHarness.close();
    }

    @Test
    public void testInputHasUpsertKeyWithNonDeterministicColumn() throws Exception {
        OneInputStreamOperator<RowData, RowData> materializer = this.createOperator(LOGICAL_TYPES, 0);
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarness(materializer);
        testHarness.open();
        testHarness.setStateTtlProcessingTime(1L);
        testHarness.processElement(StreamRecordUtils.insertRecord(1L, 1, "a1"));
        ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord(1L, 1, "a11"));
        ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3L, 1, "a3"));
        ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(1L, 1, "a111"));
        ASSERTOR.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)testHarness);
        testHarness.processElement(StreamRecordUtils.deleteRecord(3L, 1, "a33"));
        ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, 3L, 1, "a33"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4L, 1, "a4"));
        ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, 4L, 1, "a4"));
        testHarness.setStateTtlProcessingTime(1002L);
        testHarness.processElement(StreamRecordUtils.deleteRecord(4L, 1, "a4"));
        ASSERTOR.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)testHarness);
        testHarness.close();
    }

    @Test
    public void testRetractionWithoutUpsertKey() throws Exception {
        this.testRetractions((int[])null);
    }

    @Test
    public void testRetractionWithUpsertKey() throws Exception {
        this.testRetractions(0);
    }

    public void testRetractions(int ... upsertKey) throws Exception {
        this.testThreeElementProcessing("retract first - should emit nothing until empty - then delete", upsertKey, Tuple2.of(StreamRecordUtils.deleteRecord(1L, 1, "a1"), null), Tuple2.of(StreamRecordUtils.deleteRecord(2L, 1, "a2"), null), Tuple2.of(StreamRecordUtils.deleteRecord(3L, 1, "a3"), (Object)StreamRecordUtils.rowOfKind(RowKind.DELETE, 3L, 1, "a3")));
        this.testThreeElementProcessing("retract middle - should emit nothing until empty - then delete", upsertKey, Tuple2.of(StreamRecordUtils.deleteRecord(2L, 1, "a2"), null), Tuple2.of(StreamRecordUtils.deleteRecord(1L, 1, "a1"), null), Tuple2.of(StreamRecordUtils.deleteRecord(3L, 1, "a3"), (Object)StreamRecordUtils.rowOfKind(RowKind.DELETE, 3L, 1, "a3")));
        this.testThreeElementProcessing("retract last - should emit penultimate until empty - then delete", upsertKey, Tuple2.of(StreamRecordUtils.deleteRecord(3L, 1, "a3"), (Object)StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")), Tuple2.of(StreamRecordUtils.deleteRecord(2L, 1, "a2"), (Object)StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")), Tuple2.of(StreamRecordUtils.deleteRecord(1L, 1, "a1"), (Object)StreamRecordUtils.rowOfKind(RowKind.DELETE, 1L, 1, "a1")));
        this.testThreeElementProcessing("retract in arbitrary order: 1,3,2", upsertKey, Tuple2.of(StreamRecordUtils.deleteRecord(1L, 1, "a1"), null), Tuple2.of(StreamRecordUtils.deleteRecord(3L, 1, "a3"), (Object)StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")), Tuple2.of(StreamRecordUtils.deleteRecord(2L, 1, "a2"), (Object)StreamRecordUtils.rowOfKind(RowKind.DELETE, 2L, 1, "a2")));
        this.testThreeElementProcessing("retract in arbitrary order: 2,3,1", upsertKey, Tuple2.of(StreamRecordUtils.deleteRecord(2L, 1, "a2"), null), Tuple2.of(StreamRecordUtils.deleteRecord(3L, 1, "a3"), (Object)StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")), Tuple2.of(StreamRecordUtils.deleteRecord(1L, 1, "a1"), (Object)StreamRecordUtils.rowOfKind(RowKind.DELETE, 1L, 1, "a1")));
        this.testThreeElementProcessing("retract in arbitrary order: 3,1,2", upsertKey, Tuple2.of(StreamRecordUtils.deleteRecord(3L, 1, "a3"), (Object)StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")), Tuple2.of(StreamRecordUtils.deleteRecord(1L, 1, "a1"), null), Tuple2.of(StreamRecordUtils.deleteRecord(2L, 1, "a2"), (Object)StreamRecordUtils.rowOfKind(RowKind.DELETE, 2L, 1, "a2")));
    }

    @SafeVarargs
    private void testThreeElementProcessing(String description, int[] upsertKey, Tuple2<StreamRecord<RowData>, RowData> ... inputAndOutput) throws Exception {
        Tuple2[] merged = new Tuple2[inputAndOutput.length + 3];
        merged[0] = Tuple2.of(StreamRecordUtils.insertRecord(1L, 1, "a1"), (Object)StreamRecordUtils.rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
        merged[1] = Tuple2.of(StreamRecordUtils.insertRecord(2L, 1, "a2"), (Object)StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2"));
        merged[2] = Tuple2.of(StreamRecordUtils.insertRecord(3L, 1, "a3"), (Object)StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
        System.arraycopy(inputAndOutput, 0, merged, 3, inputAndOutput.length);
        this.testElementProcessing(description, upsertKey, merged);
    }

    @SafeVarargs
    private void testElementProcessing(String description, int[] upsertKey, Tuple2<StreamRecord<RowData>, RowData> ... inputAndOutput) throws Exception {
        OneInputStreamOperator<RowData, RowData> materializer = this.createOperator(LOGICAL_TYPES, upsertKey);
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarness(materializer);
        testHarness.open();
        for (Tuple2<StreamRecord<RowData>, RowData> el0 : inputAndOutput) {
            testHarness.processElement((StreamRecord)el0.f0);
            if (el0.f1 == null) {
                ASSERTOR.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)testHarness);
                continue;
            }
            ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, description, (RowData)el0.f1);
        }
        testHarness.close();
    }

    private OneInputStreamOperator<RowData, RowData> createOperatorWithoutUpsertKey() {
        return this.createOperator(LOGICAL_TYPES, (int[])null);
    }

    private OneInputStreamOperator<RowData, RowData> createOperator(LogicalType[] types, int ... upsertKey) {
        return SinkUpsertMaterializer.create((StateTtlConfig)TTL_CONFIG, (RowType)RowType.of((LogicalType[])types), (GeneratedRecordEqualiser)EQUALISER, (GeneratedRecordEqualiser)UPSERT_KEY_EQUALISER, (int[])upsertKey);
    }

    private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarness(OneInputStreamOperator<RowData, RowData> m2) throws Exception {
        return SinkUpsertMaterializerTest.createHarness(m2, this.stateBackend, LOGICAL_TYPES);
    }

    static KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarness(OneInputStreamOperator<RowData, RowData> materializer, SinkUpsertMaterializerStateBackend backend, LogicalType[] types) throws Exception {
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(materializer, (KeySelector)HandwrittenSelectorUtil.getRowDataSelector(new int[]{1}, types), (TypeInformation)HandwrittenSelectorUtil.getRowDataSelector(new int[]{1}, types).getProducedType());
        testHarness.setStateBackend(backend.create(true));
        return testHarness;
    }

    @Test
    public void testEmptyUpsertKey() throws Exception {
        this.testRecovery(this.createOperator(LOGICAL_TYPES, new int[0]), this.createOperatorWithoutUpsertKey());
        this.testRecovery(this.createOperatorWithoutUpsertKey(), this.createOperator(LOGICAL_TYPES, new int[0]));
    }

    private void testRecovery(OneInputStreamOperator<RowData, RowData> from, OneInputStreamOperator<RowData, RowData> to) throws Exception {
        OperatorSubtaskState snapshot;
        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = this.createHarness(from);){
            testHarness.open();
            snapshot = testHarness.snapshot(1L, 1L);
        }
        testHarness = this.createHarness(to);
        try {
            testHarness.initializeState(snapshot);
            testHarness.open();
        }
        finally {
            if (testHarness != null) {
                testHarness.close();
            }
        }
    }

    @Test
    public void testStateIsBounded() throws Exception {
        int dop = 2;
        int numIterations = 10;
        OperatorSnapshotFinalizer[] snapshots = new OperatorSnapshotFinalizer[dop];
        long[] prevStateSizes = new long[dop];
        for (int i = 0; i < numIterations; ++i) {
            for (int subtask = 0; subtask < dop; ++subtask) {
                snapshots[subtask] = this.initAndSnapshot(snapshots[subtask], i);
                long currentStateSize = snapshots[subtask].getJobManagerOwnedState().getManagedOperatorState().stream().mapToLong(StateObject::getStateSize).sum();
                if (i > 0) {
                    Assertions.assertEquals((long)prevStateSizes[subtask], (long)currentStateSize);
                }
                prevStateSizes[subtask] = currentStateSize;
            }
            List union = Arrays.stream(snapshots).flatMap(s -> s.getJobManagerOwnedState().getManagedOperatorState().stream()).collect(Collectors.toList());
            for (int j = 0; j < dop; ++j) {
                snapshots[j] = new OperatorSnapshotFinalizer(snapshots[j].getJobManagerOwnedState().toBuilder().setManagedOperatorState(new StateObjectCollection(union)).build(), snapshots[j].getTaskLocalState());
            }
        }
    }

    private OperatorSnapshotFinalizer initAndSnapshot(OperatorSnapshotFinalizer from, int newCheckpointID) throws Exception {
        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = SinkUpsertMaterializerTest.createHarness(this.createOperator(LOGICAL_TYPES, 0), this.stateBackend, LOGICAL_TYPES);){
            if (from != null) {
                harness.initializeState(from.getJobManagerOwnedState());
            }
            harness.open();
            OperatorSnapshotFinalizer operatorSnapshotFinalizer = harness.snapshotWithLocalState((long)newCheckpointID, (long)newCheckpointID);
            return operatorSnapshotFinalizer;
        }
    }

    private static class TestUpsertKeyEqualiser
    implements RecordEqualiser {
        private TestUpsertKeyEqualiser() {
        }

        public boolean equals(RowData row1, RowData row2) {
            return row1.getRowKind() == row2.getRowKind() && row1.getLong(0) == row2.getLong(0);
        }
    }

    private static class TestRecordEqualiser
    implements RecordEqualiser {
        private TestRecordEqualiser() {
        }

        public boolean equals(RowData row1, RowData row2) {
            return row1.getRowKind() == row2.getRowKind() && row1.getLong(0) == row2.getLong(0) && row1.getInt(1) == row2.getInt(1) && row1.getString(2).equals(row2.getString(2));
        }
    }
}

