package org.apache.flink.test.checkpointing;

import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase.class */
public class StatefulJobWBroadcastStateMigrationITCase extends SnapshotMigrationTestBase {
    private static final int NUM_SOURCE_ELEMENTS = 4;
    private static final FlinkVersion currentVersion = FlinkVersion.v1_16;
    private static final SnapshotMigrationTestBase.ExecutionMode executionMode = SnapshotMigrationTestBase.ExecutionMode.VERIFY_SNAPSHOT;
    private final SnapshotMigrationTestBase.SnapshotSpec snapshotSpec;

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase$CheckingKeyedBroadcastFunction.class */
    private static class CheckingKeyedBroadcastFunction extends KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1333992081671604521L;
        private final Map<Long, Long> expectedFirstState;
        private final Map<String, Long> expectedSecondState;
        private MapStateDescriptor<Long, Long> firstStateDesc;
        private MapStateDescriptor<String, Long> secondStateDesc;

        CheckingKeyedBroadcastFunction(Map<Long, Long> map, Map<String, Long> map2) {
            this.expectedFirstState = map;
            this.expectedSecondState = map2;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.firstStateDesc = new MapStateDescriptor<>("broadcast-state-1", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
            this.secondStateDesc = new MapStateDescriptor<>("broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
        }

        public void processElement(Tuple2<Long, Long> tuple2, KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>.ReadOnlyContext readOnlyContext, Collector<Tuple2<Long, Long>> collector) throws Exception {
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : readOnlyContext.getBroadcastState(this.firstStateDesc).immutableEntries()) {
                hashMap.put(entry.getKey(), entry.getValue());
            }
            Assert.assertEquals(this.expectedFirstState, hashMap);
            HashMap hashMap2 = new HashMap();
            for (Map.Entry entry2 : readOnlyContext.getBroadcastState(this.secondStateDesc).immutableEntries()) {
                hashMap2.put(entry2.getKey(), entry2.getValue());
            }
            Assert.assertEquals(this.expectedSecondState, hashMap2);
            collector.collect(tuple2);
        }

        public void processBroadcastElement(Tuple2<Long, Long> tuple2, KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>.Context context, Collector<Tuple2<Long, Long>> collector) throws Exception {
        }

        public /* bridge */ /* synthetic */ void processBroadcastElement(Object obj, KeyedBroadcastProcessFunction.Context context, Collector collector) throws Exception {
            processBroadcastElement((Tuple2<Long, Long>) obj, (KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>.Context) context, (Collector<Tuple2<Long, Long>>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedBroadcastProcessFunction.ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
            processElement((Tuple2<Long, Long>) obj, (KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>.ReadOnlyContext) readOnlyContext, (Collector<Tuple2<Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase$CheckingKeyedSingleBroadcastFunction.class */
    private static class CheckingKeyedSingleBroadcastFunction extends KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1333992081671604521L;
        private final Map<Long, String> expectedState;
        private MapStateDescriptor<Long, String> stateDesc;

        CheckingKeyedSingleBroadcastFunction(Map<Long, String> map) {
            this.expectedState = map;
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.stateDesc = new MapStateDescriptor<>("broadcast-state-3", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        }

        public void processElement(Tuple2<Long, Long> tuple2, KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>.ReadOnlyContext readOnlyContext, Collector<Tuple2<Long, Long>> collector) throws Exception {
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : readOnlyContext.getBroadcastState(this.stateDesc).immutableEntries()) {
                hashMap.put(entry.getKey(), entry.getValue());
            }
            Assert.assertEquals(this.expectedState, hashMap);
            collector.collect(tuple2);
        }

        public void processBroadcastElement(Tuple2<Long, Long> tuple2, KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>.Context context, Collector<Tuple2<Long, Long>> collector) throws Exception {
        }

        public /* bridge */ /* synthetic */ void processBroadcastElement(Object obj, KeyedBroadcastProcessFunction.Context context, Collector collector) throws Exception {
            processBroadcastElement((Tuple2<Long, Long>) obj, (KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>.Context) context, (Collector<Tuple2<Long, Long>>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedBroadcastProcessFunction.ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
            processElement((Tuple2<Long, Long>) obj, (KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>.ReadOnlyContext) readOnlyContext, (Collector<Tuple2<Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase$CheckpointingKeyedBroadcastFunction.class */
    private static class CheckpointingKeyedBroadcastFunction extends KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1333992081671604521L;
        private MapStateDescriptor<Long, Long> firstStateDesc;
        private MapStateDescriptor<String, Long> secondStateDesc;

        private CheckpointingKeyedBroadcastFunction() {
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.firstStateDesc = new MapStateDescriptor<>("broadcast-state-1", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
            this.secondStateDesc = new MapStateDescriptor<>("broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
        }

        public void processElement(Tuple2<Long, Long> tuple2, KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>.ReadOnlyContext readOnlyContext, Collector<Tuple2<Long, Long>> collector) throws Exception {
            collector.collect(tuple2);
        }

        public void processBroadcastElement(Tuple2<Long, Long> tuple2, KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>.Context context, Collector<Tuple2<Long, Long>> collector) throws Exception {
            context.getBroadcastState(this.firstStateDesc).put(tuple2.f0, tuple2.f1);
            context.getBroadcastState(this.secondStateDesc).put(Long.toString(((Long) tuple2.f0).longValue()), tuple2.f1);
        }

        public /* bridge */ /* synthetic */ void processBroadcastElement(Object obj, KeyedBroadcastProcessFunction.Context context, Collector collector) throws Exception {
            processBroadcastElement((Tuple2<Long, Long>) obj, (KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>.Context) context, (Collector<Tuple2<Long, Long>>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedBroadcastProcessFunction.ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
            processElement((Tuple2<Long, Long>) obj, (KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>.ReadOnlyContext) readOnlyContext, (Collector<Tuple2<Long, Long>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase$CheckpointingKeyedSingleBroadcastFunction.class */
    private static class CheckpointingKeyedSingleBroadcastFunction extends KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1333992081671604521L;
        private MapStateDescriptor<Long, String> stateDesc;

        private CheckpointingKeyedSingleBroadcastFunction() {
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.stateDesc = new MapStateDescriptor<>("broadcast-state-3", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        }

        public void processElement(Tuple2<Long, Long> tuple2, KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>.ReadOnlyContext readOnlyContext, Collector<Tuple2<Long, Long>> collector) throws Exception {
            collector.collect(tuple2);
        }

        public void processBroadcastElement(Tuple2<Long, Long> tuple2, KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>.Context context, Collector<Tuple2<Long, Long>> collector) throws Exception {
            context.getBroadcastState(this.stateDesc).put(tuple2.f0, Long.toString(((Long) tuple2.f1).longValue()));
        }

        public /* bridge */ /* synthetic */ void processBroadcastElement(Object obj, KeyedBroadcastProcessFunction.Context context, Collector collector) throws Exception {
            processBroadcastElement((Tuple2<Long, Long>) obj, (KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>.Context) context, (Collector<Tuple2<Long, Long>>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedBroadcastProcessFunction.ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
            processElement((Tuple2<Long, Long>) obj, (KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>.ReadOnlyContext) readOnlyContext, (Collector<Tuple2<Long, Long>>) collector);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v21, types: [java.util.Collection] */
    @Parameterized.Parameters(name = "Test snapshot: {0}")
    public static Collection<SnapshotMigrationTestBase.SnapshotSpec> parameters() {
        LinkedList linkedList = new LinkedList();
        linkedList.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("jobmanager", SnapshotMigrationTestBase.SnapshotType.SAVEPOINT_CANONICAL, FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_14)));
        linkedList.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("hashmap", SnapshotMigrationTestBase.SnapshotType.SAVEPOINT_CANONICAL, FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
        linkedList.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("rocksdb", SnapshotMigrationTestBase.SnapshotType.SAVEPOINT_CANONICAL, FlinkVersion.rangeOf(FlinkVersion.v1_8, currentVersion)));
        linkedList.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("hashmap", SnapshotMigrationTestBase.SnapshotType.SAVEPOINT_NATIVE, FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
        linkedList.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("rocksdb", SnapshotMigrationTestBase.SnapshotType.SAVEPOINT_NATIVE, FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
        linkedList.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("hashmap", SnapshotMigrationTestBase.SnapshotType.CHECKPOINT, FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
        linkedList.addAll(SnapshotMigrationTestBase.SnapshotSpec.withVersions("rocksdb", SnapshotMigrationTestBase.SnapshotType.CHECKPOINT, FlinkVersion.rangeOf(FlinkVersion.v1_15, currentVersion)));
        if (executionMode == SnapshotMigrationTestBase.ExecutionMode.CREATE_SNAPSHOT) {
            linkedList = (Collection) linkedList.stream().filter(snapshotSpec -> {
                return snapshotSpec.getFlinkVersion().equals(currentVersion);
            }).collect(Collectors.toList());
        }
        return linkedList;
    }

    public StatefulJobWBroadcastStateMigrationITCase(SnapshotMigrationTestBase.SnapshotSpec snapshotSpec) throws Exception {
        this.snapshotSpec = snapshotSpec;
    }

    @Test
    public void testSavepoint() throws Exception {
        SourceFunction checkingNonParallelSourceWithListState;
        SourceFunction checkingNonParallelSourceWithListState2;
        SourceFunction checkingParallelSourceWithUnionListState;
        SourceFunction checkingParallelSourceWithUnionListState2;
        KeyedBroadcastProcessFunction checkingKeyedBroadcastFunction;
        KeyedBroadcastProcessFunction checkingKeyedSingleBroadcastFunction;
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        String stateBackendType = this.snapshotSpec.getStateBackendType();
        boolean z = -1;
        switch (stateBackendType.hashCode()) {
            case 697541006:
                if (stateBackendType.equals("hashmap")) {
                    z = 2;
                    break;
                }
                break;
            case 1368770220:
                if (stateBackendType.equals("rocksdb")) {
                    z = false;
                    break;
                }
                break;
            case 1712403792:
                if (stateBackendType.equals("jobmanager")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                executionEnvironment.setStateBackend(new EmbeddedRocksDBStateBackend());
                break;
            case true:
                executionEnvironment.setStateBackend(new MemoryStateBackend());
                break;
            case true:
                executionEnvironment.setStateBackend(new HashMapStateBackend());
                break;
            default:
                throw new UnsupportedOperationException();
        }
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setParallelism(NUM_SOURCE_ELEMENTS);
        executionEnvironment.setMaxParallelism(NUM_SOURCE_ELEMENTS);
        HashMap hashMap = new HashMap();
        hashMap.put(0L, 0L);
        hashMap.put(1L, 1L);
        hashMap.put(2L, 2L);
        hashMap.put(3L, 3L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("0", 0L);
        hashMap2.put("1", 1L);
        hashMap2.put("2", 2L);
        hashMap2.put("3", 3L);
        HashMap hashMap3 = new HashMap();
        hashMap3.put(0L, "0");
        hashMap3.put(1L, "1");
        hashMap3.put(2L, "2");
        hashMap3.put(3L, "3");
        if (executionMode == SnapshotMigrationTestBase.ExecutionMode.CREATE_SNAPSHOT) {
            checkingNonParallelSourceWithListState = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
            checkingNonParallelSourceWithListState2 = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
            checkingParallelSourceWithUnionListState = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
            checkingParallelSourceWithUnionListState2 = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
            checkingKeyedBroadcastFunction = new CheckpointingKeyedBroadcastFunction();
            checkingKeyedSingleBroadcastFunction = new CheckpointingKeyedSingleBroadcastFunction();
        } else {
            if (executionMode != SnapshotMigrationTestBase.ExecutionMode.VERIFY_SNAPSHOT) {
                throw new IllegalStateException("Unknown ExecutionMode " + executionMode);
            }
            checkingNonParallelSourceWithListState = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
            checkingNonParallelSourceWithListState2 = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
            checkingParallelSourceWithUnionListState = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
            checkingParallelSourceWithUnionListState2 = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
            checkingKeyedBroadcastFunction = new CheckingKeyedBroadcastFunction(hashMap, hashMap2);
            checkingKeyedSingleBroadcastFunction = new CheckingKeyedSingleBroadcastFunction(hashMap3);
        }
        KeyedStream keyBy = executionEnvironment.addSource(checkingNonParallelSourceWithListState).uid("CheckpointingSource1").keyBy(new KeySelector<Tuple2<Long, Long>, Long>() { // from class: org.apache.flink.test.checkpointing.StatefulJobWBroadcastStateMigrationITCase.1
            private static final long serialVersionUID = -4514793867774977152L;

            public Long getKey(Tuple2<Long, Long> tuple2) throws Exception {
                return (Long) tuple2.f0;
            }
        });
        KeyedStream keyBy2 = executionEnvironment.addSource(checkingParallelSourceWithUnionListState).uid("CheckpointingSource2").keyBy(new KeySelector<Tuple2<Long, Long>, Long>() { // from class: org.apache.flink.test.checkpointing.StatefulJobWBroadcastStateMigrationITCase.2
            private static final long serialVersionUID = 4940496713319948104L;

            public Long getKey(Tuple2<Long, Long> tuple2) throws Exception {
                return (Long) tuple2.f0;
            }
        });
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("broadcast-state-1", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
        MapStateDescriptor mapStateDescriptor2 = new MapStateDescriptor("broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
        MapStateDescriptor mapStateDescriptor3 = new MapStateDescriptor("broadcast-state-3", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        BroadcastStream broadcast = executionEnvironment.addSource(checkingNonParallelSourceWithListState2).uid("BrCheckpointingSource1").broadcast(new MapStateDescriptor[]{mapStateDescriptor, mapStateDescriptor2});
        BroadcastStream broadcast2 = executionEnvironment.addSource(checkingParallelSourceWithUnionListState2).uid("BrCheckpointingSource2").broadcast(new MapStateDescriptor[]{mapStateDescriptor3});
        keyBy.connect(broadcast).process(checkingKeyedBroadcastFunction).uid("BrProcess1").addSink(new MigrationTestUtils.AccumulatorCountingSink());
        keyBy2.connect(broadcast2).process(checkingKeyedSingleBroadcastFunction).uid("BrProcess2").addSink(new MigrationTestUtils.AccumulatorCountingSink());
        if (executionMode == SnapshotMigrationTestBase.ExecutionMode.CREATE_SNAPSHOT) {
            executeAndSnapshot(executionEnvironment, "src/test/resources/" + getSnapshotPath(this.snapshotSpec), this.snapshotSpec.getSnapshotType(), new Tuple2<>(MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 8));
        } else {
            restoreAndExecute(executionEnvironment, getResourceFilename(getSnapshotPath(this.snapshotSpec)), new Tuple2<>(MigrationTestUtils.CheckingNonParallelSourceWithListState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 2), new Tuple2<>(MigrationTestUtils.CheckingParallelSourceWithUnionListState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 8), new Tuple2<>(MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 8));
        }
    }

    private String getSnapshotPath(SnapshotMigrationTestBase.SnapshotSpec snapshotSpec) {
        return "new-stateful-broadcast-udf-migration-itcase-" + snapshotSpec;
    }
}
