/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.sink;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerStateBackend;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.test.util.MigrationTest;
import org.apache.flink.types.RowKind;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class SinkUpsertMaterializerMigrationTest
implements MigrationTest {
    private static final String FOLDER_NAME = "sink-upsert-materializer";
    @Parameterized.Parameter(value=0)
    public SinkOperationMode migrateFrom;
    @Parameterized.Parameter(value=1)
    public SinkOperationMode migrateTo;

    @Parameterized.Parameters(name="{0} -> {1}")
    public static List<Object[]> parameters() {
        ArrayList<Object[]> result = new ArrayList<Object[]>();
        Set versions = FlinkVersion.rangeOf((FlinkVersion)FlinkVersion.v2_2, (FlinkVersion)FlinkVersion.v2_2);
        for (FlinkVersion fromVersion : versions) {
            for (SinkUpsertMaterializerStateBackend backend : SinkUpsertMaterializerStateBackend.values()) {
                result.add(new Object[]{new SinkOperationMode(fromVersion, backend), new SinkOperationMode(FlinkVersion.current(), backend)});
            }
        }
        return result;
    }

    @Test
    public void testMigration() throws Exception {
        String path = OperatorSnapshotUtil.getResourceFilename((String)("sink-upsert-materializer/" + SinkUpsertMaterializerMigrationTest.getFileName(this.migrateFrom)));
        try (OneInputStreamOperatorTestHarness<RowData, RowData> harness = this.createHarness(this.migrateTo, path);){
            this.testCorrectnessAfterSnapshot(harness);
        }
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createHarness(SinkOperationMode mode, String snapshotPath) throws Exception {
        int[] inputUpsertKey = new int[]{0};
        SinkUpsertMaterializer materializer = SinkUpsertMaterializer.create((StateTtlConfig)SinkUpsertMaterializerTest.TTL_CONFIG, (RowType)RowType.of((LogicalType[])SinkUpsertMaterializerTest.LOGICAL_TYPES), (GeneratedRecordEqualiser)SinkUpsertMaterializerTest.EQUALISER, (GeneratedRecordEqualiser)SinkUpsertMaterializerTest.UPSERT_KEY_EQUALISER, (int[])inputUpsertKey);
        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = SinkUpsertMaterializerTest.createHarness((OneInputStreamOperator<RowData, RowData>)materializer, mode.stateBackend, SinkUpsertMaterializerTest.LOGICAL_TYPES);
        harness.setup((TypeSerializer)new RowDataSerializer(SinkUpsertMaterializerTest.LOGICAL_TYPES));
        if (snapshotPath != null) {
            harness.initializeState(snapshotPath);
        }
        harness.open();
        harness.setStateTtlProcessingTime(1L);
        return harness;
    }

    private void testCorrectnessBeforeSnapshot(OneInputStreamOperatorTestHarness<RowData, RowData> testHarness) throws Exception {
        testHarness.processElement(StreamRecordUtils.insertRecord(1L, 1, "a1"));
        SinkUpsertMaterializerTest.ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
        testHarness.processElement(StreamRecordUtils.updateAfterRecord(1L, 1, "a11"));
        SinkUpsertMaterializerTest.ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11"));
        testHarness.processElement(StreamRecordUtils.insertRecord(3L, 1, "a3"));
        SinkUpsertMaterializerTest.ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
    }

    private void testCorrectnessAfterSnapshot(OneInputStreamOperatorTestHarness<RowData, RowData> testHarness) throws Exception {
        testHarness.processElement(StreamRecordUtils.deleteRecord(1L, 1, "a111"));
        SinkUpsertMaterializerTest.ASSERTOR.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)testHarness);
        testHarness.processElement(StreamRecordUtils.deleteRecord(3L, 1, "a33"));
        SinkUpsertMaterializerTest.ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, 3L, 1, "a33"));
        testHarness.processElement(StreamRecordUtils.insertRecord(4L, 1, "a4"));
        SinkUpsertMaterializerTest.ASSERTOR.shouldEmit((AbstractStreamOperatorTestHarness<RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, 4L, 1, "a4"));
        testHarness.setStateTtlProcessingTime(1002L);
        testHarness.processElement(StreamRecordUtils.deleteRecord(4L, 1, "a4"));
        SinkUpsertMaterializerTest.ASSERTOR.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)testHarness);
    }

    private static String getFileName(SinkOperationMode mode) {
        return String.format("migration-flink-%s-%s-%s-snapshot", new Object[]{mode.version, mode.stateBackend, "V1"});
    }

    @MigrationTest.SnapshotsGenerator
    public void writeSnapshot(FlinkVersion version) throws Exception {
        for (SinkUpsertMaterializerStateBackend stateBackend : SinkUpsertMaterializerStateBackend.values()) {
            SinkOperationMode mode = new SinkOperationMode(version, stateBackend);
            try (OneInputStreamOperatorTestHarness<RowData, RowData> harness = this.createHarness(mode, null);){
                this.testCorrectnessBeforeSnapshot(harness);
                Path parent = Paths.get("src/test/resources", FOLDER_NAME);
                Files.createDirectories(parent, new FileAttribute[0]);
                OperatorSnapshotUtil.writeStateHandle((OperatorSubtaskState)harness.snapshot(1L, 1L), (String)parent.resolve(SinkUpsertMaterializerMigrationTest.getFileName(mode)).toString());
            }
        }
    }

    public static void main(String ... s) throws Exception {
        new SinkUpsertMaterializerMigrationTest().writeSnapshot(FlinkVersion.current());
    }

    private static class SinkOperationMode {
        private final FlinkVersion version;
        private final SinkUpsertMaterializerStateBackend stateBackend;

        private SinkOperationMode(FlinkVersion version, SinkUpsertMaterializerStateBackend stateBackend) {
            this.version = version;
            this.stateBackend = stateBackend;
        }

        public String toString() {
            return String.format("flink=%s, state=%s}", new Object[]{this.version, this.stateBackend});
        }
    }
}

