/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.testutils.statemigration.TestType;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public abstract class StateBackendMigrationTestBase<B extends AbstractStateBackend>
extends TestLogger {
    @Rule
    public final TemporaryFolder tempFolder = new TemporaryFolder();
    private CheckpointStorageLocation checkpointStorageLocation;
    private MockEnvironment env;

    protected abstract B getStateBackend() throws Exception;

    @Before
    public void before() {
        this.env = MockEnvironment.builder().build();
    }

    @After
    public void after() {
        IOUtils.closeQuietly((AutoCloseable)this.env);
    }

    @Test
    public void testKeyedValueStateMigration() throws Exception {
        String stateName = "test-name";
        this.testKeyedValueStateUpgrade((ValueStateDescriptor<TestType>)new ValueStateDescriptor("test-name", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ValueStateDescriptor<TestType>)new ValueStateDescriptor("test-name", (TypeSerializer)new TestType.V2TestTypeSerializer()));
    }

    @Test
    public void testKeyedValueStateSerializerReconfiguration() throws Exception {
        String stateName = "test-name";
        this.testKeyedValueStateUpgrade((ValueStateDescriptor<TestType>)new ValueStateDescriptor("test-name", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ValueStateDescriptor<TestType>)new ValueStateDescriptor("test-name", (TypeSerializer)new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @Test
    public void testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatible() {
        String stateName = "test-name";
        try {
            this.testKeyedValueStateUpgrade((ValueStateDescriptor<TestType>)new ValueStateDescriptor("test-name", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ValueStateDescriptor<TestType>)new ValueStateDescriptor("test-name", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer()));
            Assert.fail((String)"should have failed");
        }
        catch (Exception expected) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)expected, StateMigrationException.class).isPresent());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testKeyedValueStateUpgrade(ValueStateDescriptor<TestType> initialAccessDescriptor, ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueState valueState = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, initialAccessDescriptor);
            backend.setCurrentKey((Object)1);
            valueState.update((Object)new TestType("foo", 1456));
            backend.setCurrentKey((Object)2);
            valueState.update((Object)new TestType("bar", 478));
            backend.setCurrentKey((Object)3);
            valueState.update((Object)new TestType("hello", 189));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            valueState = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, newAccessDescriptorAfterRestore);
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((Object)new TestType("foo", 1456), (Object)valueState.value());
            valueState.update((Object)new TestType("newValue1", 751));
            backend.setCurrentKey((Object)2);
            Assert.assertEquals((Object)new TestType("bar", 478), (Object)valueState.value());
            valueState.update((Object)new TestType("newValue2", 167));
            backend.setCurrentKey((Object)3);
            Assert.assertEquals((Object)new TestType("hello", 189), (Object)valueState.value());
            valueState.update((Object)new TestType("newValue3", 444));
            snapshot = this.runSnapshot(backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    @Test
    public void testKeyedListStateMigration() throws Exception {
        String stateName = "test-name";
        this.testKeyedListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("test-name", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("test-name", (TypeSerializer)new TestType.V2TestTypeSerializer()));
    }

    @Test
    public void testKeyedListStateSerializerReconfiguration() throws Exception {
        String stateName = "test-name";
        this.testKeyedListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("test-name", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("test-name", (TypeSerializer)new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @Test
    public void testKeyedListStateRegistrationFailsIfNewStateSerializerIsIncompatible() {
        String stateName = "test-name";
        try {
            this.testKeyedListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("test-name", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("test-name", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer()));
            Assert.fail((String)"should have failed");
        }
        catch (Exception expected) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)expected, StateMigrationException.class).isPresent());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testKeyedListStateUpgrade(ListStateDescriptor<TestType> initialAccessDescriptor, ListStateDescriptor<TestType> newAccessDescriptorAfterRestore) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListState listState = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, initialAccessDescriptor);
            backend.setCurrentKey((Object)1);
            listState.add((Object)new TestType("key-1", 1));
            listState.add((Object)new TestType("key-1", 2));
            listState.add((Object)new TestType("key-1", 3));
            backend.setCurrentKey((Object)2);
            listState.add((Object)new TestType("key-2", 1));
            backend.setCurrentKey((Object)3);
            listState.add((Object)new TestType("key-3", 1));
            listState.add((Object)new TestType("key-3", 2));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            listState = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, newAccessDescriptorAfterRestore);
            backend.setCurrentKey((Object)1);
            Iterator iterable1 = ((Iterable)listState.get()).iterator();
            Assert.assertEquals((Object)new TestType("key-1", 1), iterable1.next());
            Assert.assertEquals((Object)new TestType("key-1", 2), iterable1.next());
            Assert.assertEquals((Object)new TestType("key-1", 3), iterable1.next());
            Assert.assertFalse((boolean)iterable1.hasNext());
            listState.add((Object)new TestType("new-key-1", 123));
            backend.setCurrentKey((Object)2);
            Iterator iterable2 = ((Iterable)listState.get()).iterator();
            Assert.assertEquals((Object)new TestType("key-2", 1), iterable2.next());
            Assert.assertFalse((boolean)iterable2.hasNext());
            listState.add((Object)new TestType("new-key-2", 456));
            backend.setCurrentKey((Object)3);
            Iterator iterable3 = ((Iterable)listState.get()).iterator();
            Assert.assertEquals((Object)new TestType("key-3", 1), iterable3.next());
            Assert.assertEquals((Object)new TestType("key-3", 2), iterable3.next());
            Assert.assertFalse((boolean)iterable3.hasNext());
            listState.add((Object)new TestType("new-key-3", 777));
            snapshot = this.runSnapshot(backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    @Test
    public void testKeyedMapStateAsIs() throws Exception {
        String stateName = "test-name";
        this.testKeyedMapStateUpgrade((MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("test-name", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer()), (MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("test-name", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer()));
    }

    @Test
    public void testKeyedMapStateStateMigration() throws Exception {
        String stateName = "test-name";
        this.testKeyedMapStateUpgrade((MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("test-name", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer()), (MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("test-name", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V2TestTypeSerializer()));
    }

    @Test
    public void testKeyedMapStateSerializerReconfiguration() throws Exception {
        String stateName = "test-name";
        this.testKeyedMapStateUpgrade((MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("test-name", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer()), (MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("test-name", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @Test
    public void testKeyedMapStateRegistrationFailsIfNewStateSerializerIsIncompatible() {
        String stateName = "test-name";
        try {
            this.testKeyedMapStateUpgrade((MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("test-name", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer()), (MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("test-name", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.IncompatibleTestTypeSerializer()));
            Assert.fail((String)"should have failed");
        }
        catch (Exception expected) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)expected, StateMigrationException.class).isPresent());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testKeyedMapStateUpgrade(MapStateDescriptor<Integer, TestType> initialAccessDescriptor, MapStateDescriptor<Integer, TestType> newAccessDescriptorAfterRestore) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            MapState mapState = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, initialAccessDescriptor);
            backend.setCurrentKey((Object)1);
            mapState.put((Object)1, (Object)new TestType("key-1", 1));
            mapState.put((Object)2, (Object)new TestType("key-1", 2));
            mapState.put((Object)3, (Object)new TestType("key-1", 3));
            backend.setCurrentKey((Object)2);
            mapState.put((Object)1, (Object)new TestType("key-2", 1));
            backend.setCurrentKey((Object)3);
            mapState.put((Object)1, (Object)new TestType("key-3", 1));
            mapState.put((Object)2, (Object)new TestType("key-3", 2));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            mapState = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, newAccessDescriptorAfterRestore);
            backend.setCurrentKey((Object)1);
            Iterator iterable1 = mapState.iterator();
            Map.Entry actual = (Map.Entry)iterable1.next();
            Assert.assertEquals((Object)1, actual.getKey());
            Assert.assertEquals((Object)new TestType("key-1", 1), actual.getValue());
            actual = (Map.Entry)iterable1.next();
            Assert.assertEquals((Object)2, actual.getKey());
            Assert.assertEquals((Object)new TestType("key-1", 2), actual.getValue());
            actual = (Map.Entry)iterable1.next();
            Assert.assertEquals((Object)3, actual.getKey());
            Assert.assertEquals((Object)new TestType("key-1", 3), actual.getValue());
            Assert.assertFalse((boolean)iterable1.hasNext());
            mapState.put((Object)123, (Object)new TestType("new-key-1", 123));
            backend.setCurrentKey((Object)2);
            Iterator iterable2 = mapState.iterator();
            actual = (Map.Entry)iterable2.next();
            Assert.assertEquals((Object)1, actual.getKey());
            Assert.assertEquals((Object)new TestType("key-2", 1), actual.getValue());
            Assert.assertFalse((boolean)iterable2.hasNext());
            mapState.put((Object)456, (Object)new TestType("new-key-2", 456));
            backend.setCurrentKey((Object)3);
            Iterator iterable3 = mapState.iterator();
            actual = (Map.Entry)iterable3.next();
            Assert.assertEquals((Object)1, actual.getKey());
            Assert.assertEquals((Object)new TestType("key-3", 1), actual.getValue());
            actual = (Map.Entry)iterable3.next();
            Assert.assertEquals((Object)2, actual.getKey());
            Assert.assertEquals((Object)new TestType("key-3", 2), actual.getValue());
            Assert.assertFalse((boolean)iterable3.hasNext());
            mapState.put((Object)777, (Object)new TestType("new-key-3", 777));
            snapshot = this.runSnapshot(backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPriorityQueueStateCreationFailsIfNewSerializerIsNotCompatible() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            KeyGroupedInternalPriorityQueue internalPriorityQueue = backend.create("testPriorityQueue", (TypeSerializer)new TestType.V1TestTypeSerializer());
            internalPriorityQueue.add((Object)new TestType("key-1", 123));
            internalPriorityQueue.add((Object)new TestType("key-2", 346));
            internalPriorityQueue.add((Object)new TestType("key-1", 777));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            backend.create("testPriorityQueue", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer());
            Assert.fail((String)"should have failed");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
        }
        finally {
            backend.dispose();
        }
    }

    @Test
    public void testStateBackendRestoreFailsIfNewKeySerializerRequiresMigration() throws Exception {
        try {
            this.testKeySerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.V2TestTypeSerializer());
            Assert.fail((String)"should have failed");
        }
        catch (Exception expected) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)expected, StateMigrationException.class).isPresent());
        }
    }

    @Test
    public void testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration() throws Exception {
        this.testKeySerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.ReconfigurationRequiringTestTypeSerializer());
    }

    @Test
    public void testStateBackendRestoreFailsIfNewKeySerializerIsIncompatible() throws Exception {
        try {
            this.testKeySerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.IncompatibleTestTypeSerializer());
            Assert.fail((String)"should have failed");
        }
        catch (Exception expected) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)expected, StateMigrationException.class).isPresent());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testKeySerializerUpgrade(TypeSerializer<TestType> initialKeySerializer, TypeSerializer<TestType> newKeySerializer) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend<TestType> backend = this.createKeyedBackend(initialKeySerializer);
        String stateName = "test-name";
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("test-name", Integer.class);
            ValueState valueState = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)new TestType("foo", 123));
            valueState.update((Object)1);
            backend.setCurrentKey((Object)new TestType("bar", 456));
            valueState.update((Object)5);
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend(newKeySerializer, snapshot);
            valueState = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)new TestType("foo", 123));
            Assert.assertEquals((long)1L, (long)((Integer)valueState.value()).intValue());
            backend.setCurrentKey((Object)new TestType("bar", 456));
            Assert.assertEquals((long)5L, (long)((Integer)valueState.value()).intValue());
            snapshot = this.runSnapshot(backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    @Test
    public void testKeyedStateRegistrationFailsIfNewNamespaceSerializerRequiresMigration() throws Exception {
        try {
            this.testNamespaceSerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.V2TestTypeSerializer());
            Assert.fail((String)"should have failed");
        }
        catch (Exception expected) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)expected, StateMigrationException.class).isPresent());
        }
    }

    @Test
    public void testKeyedStateRegistrationSucceedsIfNewNamespaceSerializerRequiresReconfiguration() throws Exception {
        this.testNamespaceSerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.ReconfigurationRequiringTestTypeSerializer());
    }

    @Test
    public void testKeyedStateRegistrationFailsIfNewNamespaceSerializerIsIncompatible() throws Exception {
        try {
            this.testNamespaceSerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.IncompatibleTestTypeSerializer());
            Assert.fail((String)"should have failed");
        }
        catch (Exception expected) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)expected, StateMigrationException.class).isPresent());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testNamespaceSerializerUpgrade(TypeSerializer<TestType> initialNamespaceSerializer, TypeSerializer<TestType> newNamespaceSerializerAfterRestore) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        String stateName = "test-name";
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("test-name", Integer.class);
            ValueState valueState = (ValueState)backend.getPartitionedState((Object)new TestType("namespace", 123), initialNamespaceSerializer, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            valueState.update((Object)10);
            backend.setCurrentKey((Object)5);
            valueState.update((Object)50);
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            valueState = (ValueState)backend.getPartitionedState((Object)new TestType("namespace", 123), newNamespaceSerializerAfterRestore, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assert.assertEquals((long)10L, (long)((Integer)valueState.value()).intValue());
            valueState.update((Object)10);
            backend.setCurrentKey((Object)5);
            Assert.assertEquals((long)50L, (long)((Integer)valueState.value()).intValue());
            snapshot = this.runSnapshot(backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    @Test
    public void testOperatorParitionableListStateMigration() throws Exception {
        String stateName = "partitionable-list-state";
        this.testOperatorPartitionableListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("partitionable-list-state", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("partitionable-list-state", (TypeSerializer)new TestType.V2TestTypeSerializer()));
    }

    @Test
    public void testOperatorParitionableListStateSerializerReconfiguration() throws Exception {
        String stateName = "partitionable-list-state";
        this.testOperatorPartitionableListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("partitionable-list-state", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("partitionable-list-state", (TypeSerializer)new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @Test
    public void testOperatorParitionableListStateRegistrationFailsIfNewSerializerIsIncompatible() throws Exception {
        String stateName = "partitionable-list-state";
        try {
            this.testOperatorPartitionableListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("partitionable-list-state", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("partitionable-list-state", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer()));
            Assert.fail((String)"should have failed.");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testOperatorPartitionableListStateUpgrade(ListStateDescriptor<TestType> initialAccessDescriptor, ListStateDescriptor<TestType> newAccessDescriptorAfterRestore) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        OperatorStateBackend backend = this.createOperatorStateBackend();
        try {
            ListState state = backend.getListState(initialAccessDescriptor);
            state.add((Object)new TestType("foo", 13));
            state.add((Object)new TestType("bar", 278));
            OperatorStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            backend.dispose();
            backend = this.restoreOperatorStateBackend(snapshot);
            state = backend.getListState(newAccessDescriptorAfterRestore);
            Iterator iterator = ((Iterable)state.get()).iterator();
            Assert.assertEquals((Object)new TestType("foo", 13), iterator.next());
            Assert.assertEquals((Object)new TestType("bar", 278), iterator.next());
            Assert.assertFalse((boolean)iterator.hasNext());
            state.add((Object)new TestType("new-entry", 777));
        }
        finally {
            backend.dispose();
        }
    }

    @Test
    public void testOperatorUnionListStateMigration() throws Exception {
        String stateName = "union-list-state";
        this.testOperatorUnionListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("union-list-state", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("union-list-state", (TypeSerializer)new TestType.V2TestTypeSerializer()));
    }

    @Test
    public void testOperatorUnionListStateSerializerReconfiguration() throws Exception {
        String stateName = "union-list-state";
        this.testOperatorUnionListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("union-list-state", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("union-list-state", (TypeSerializer)new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @Test
    public void testOperatorUnionListStateRegistrationFailsIfNewSerializerIsIncompatible() {
        String stateName = "union-list-state";
        try {
            this.testOperatorUnionListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("union-list-state", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("union-list-state", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer()));
            Assert.fail((String)"should have failed.");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testOperatorUnionListStateUpgrade(ListStateDescriptor<TestType> initialAccessDescriptor, ListStateDescriptor<TestType> newAccessDescriptorAfterRestore) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        OperatorStateBackend backend = this.createOperatorStateBackend();
        try {
            ListState state = backend.getUnionListState(initialAccessDescriptor);
            state.add((Object)new TestType("foo", 13));
            state.add((Object)new TestType("bar", 278));
            OperatorStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            backend.dispose();
            backend = this.restoreOperatorStateBackend(snapshot);
            state = backend.getUnionListState(newAccessDescriptorAfterRestore);
            Iterator iterator = ((Iterable)state.get()).iterator();
            Assert.assertEquals((Object)new TestType("foo", 13), iterator.next());
            Assert.assertEquals((Object)new TestType("bar", 278), iterator.next());
            Assert.assertFalse((boolean)iterator.hasNext());
            state.add((Object)new TestType("new-entry", 777));
        }
        finally {
            backend.dispose();
        }
    }

    @Test
    public void testBroadcastStateValueMigration() throws Exception {
        String stateName = "broadcast-state";
        this.testBroadcastStateValueUpgrade((MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("broadcast-state", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer()), (MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("broadcast-state", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V2TestTypeSerializer()));
    }

    @Test
    public void testBroadcastStateKeyMigration() throws Exception {
        String stateName = "broadcast-state";
        this.testBroadcastStateKeyUpgrade((MapStateDescriptor<TestType, Integer>)new MapStateDescriptor("broadcast-state", (TypeSerializer)new TestType.V1TestTypeSerializer(), (TypeSerializer)IntSerializer.INSTANCE), (MapStateDescriptor<TestType, Integer>)new MapStateDescriptor("broadcast-state", (TypeSerializer)new TestType.V2TestTypeSerializer(), (TypeSerializer)IntSerializer.INSTANCE));
    }

    @Test
    public void testBroadcastStateValueSerializerReconfiguration() throws Exception {
        String stateName = "broadcast-state";
        this.testBroadcastStateValueUpgrade((MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("broadcast-state", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer()), (MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("broadcast-state", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @Test
    public void testBroadcastStateKeySerializerReconfiguration() throws Exception {
        String stateName = "broadcast-state";
        this.testBroadcastStateKeyUpgrade((MapStateDescriptor<TestType, Integer>)new MapStateDescriptor("broadcast-state", (TypeSerializer)new TestType.V1TestTypeSerializer(), (TypeSerializer)IntSerializer.INSTANCE), (MapStateDescriptor<TestType, Integer>)new MapStateDescriptor("broadcast-state", (TypeSerializer)new TestType.ReconfigurationRequiringTestTypeSerializer(), (TypeSerializer)IntSerializer.INSTANCE));
    }

    @Test
    public void testBroadcastStateRegistrationFailsIfNewValueSerializerIsIncompatible() {
        String stateName = "broadcast-state";
        try {
            this.testBroadcastStateValueUpgrade((MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("broadcast-state", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer()), (MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("broadcast-state", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.IncompatibleTestTypeSerializer()));
            Assert.fail((String)"should have failed.");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
        }
    }

    @Test
    public void testBroadcastStateRegistrationFailsIfNewKeySerializerIsIncompatible() {
        String stateName = "broadcast-state";
        try {
            this.testBroadcastStateKeyUpgrade((MapStateDescriptor<TestType, Integer>)new MapStateDescriptor("broadcast-state", (TypeSerializer)new TestType.V1TestTypeSerializer(), (TypeSerializer)IntSerializer.INSTANCE), (MapStateDescriptor<TestType, Integer>)new MapStateDescriptor("broadcast-state", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer(), (TypeSerializer)IntSerializer.INSTANCE));
            Assert.fail((String)"should have failed.");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, StateMigrationException.class).isPresent());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testBroadcastStateValueUpgrade(MapStateDescriptor<Integer, TestType> initialAccessDescriptor, MapStateDescriptor<Integer, TestType> newAccessDescriptorAfterRestore) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        OperatorStateBackend backend = this.createOperatorStateBackend();
        try {
            BroadcastState state = backend.getBroadcastState(initialAccessDescriptor);
            state.put((Object)3, (Object)new TestType("foo", 13));
            state.put((Object)5, (Object)new TestType("bar", 278));
            OperatorStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            backend.dispose();
            backend = this.restoreOperatorStateBackend(snapshot);
            state = backend.getBroadcastState(newAccessDescriptorAfterRestore);
            Assert.assertEquals((Object)new TestType("foo", 13), (Object)state.get((Object)3));
            Assert.assertEquals((Object)new TestType("bar", 278), (Object)state.get((Object)5));
            state.put((Object)17, (Object)new TestType("new-entry", 777));
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testBroadcastStateKeyUpgrade(MapStateDescriptor<TestType, Integer> initialAccessDescriptor, MapStateDescriptor<TestType, Integer> newAccessDescriptorAfterRestore) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        OperatorStateBackend backend = this.createOperatorStateBackend();
        try {
            BroadcastState state = backend.getBroadcastState(initialAccessDescriptor);
            state.put((Object)new TestType("foo", 13), (Object)3);
            state.put((Object)new TestType("bar", 278), (Object)5);
            OperatorStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            backend.dispose();
            backend = this.restoreOperatorStateBackend(snapshot);
            state = backend.getBroadcastState(newAccessDescriptorAfterRestore);
            Assert.assertEquals((Object)3, (Object)state.get((Object)new TestType("foo", 13)));
            Assert.assertEquals((Object)5, (Object)state.get((Object)new TestType("bar", 278)));
            state.put((Object)new TestType("new-entry", 777), (Object)17);
        }
        finally {
            backend.dispose();
        }
    }

    private CheckpointStreamFactory createStreamFactory() throws Exception {
        if (this.checkpointStorageLocation == null) {
            CheckpointStorage checkpointStorage = this.getStateBackend().createCheckpointStorage(new JobID());
            checkpointStorage.initializeBaseLocations();
            this.checkpointStorageLocation = checkpointStorage.initializeLocationForCheckpoint(1L);
        }
        return this.checkpointStorageLocation;
    }

    private <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception {
        return this.createKeyedBackend(keySerializer, this.env);
    }

    private <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, Environment env) throws Exception {
        return this.createKeyedBackend(keySerializer, 10, new KeyGroupRange(0, 9), env);
    }

    private <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, Environment env) throws Exception {
        AbstractKeyedStateBackend backend = this.getStateBackend().createKeyedStateBackend(env, new JobID(), "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, (MetricGroup)new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry());
        return backend;
    }

    private <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state) throws Exception {
        return this.restoreKeyedBackend(keySerializer, state, this.env);
    }

    private <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state, Environment env) throws Exception {
        return this.restoreKeyedBackend(keySerializer, 10, new KeyGroupRange(0, 9), Collections.singletonList(state), env);
    }

    private <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, List<KeyedStateHandle> state, Environment env) throws Exception {
        AbstractKeyedStateBackend backend = this.getStateBackend().createKeyedStateBackend(env, new JobID(), "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, (MetricGroup)new UnregisteredMetricsGroup(), state, new CloseableRegistry());
        return backend;
    }

    private KeyedStateHandle runSnapshot(RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture, SharedStateRegistry sharedStateRegistry) throws Exception {
        SnapshotResult snapshotResult;
        KeyedStateHandle jobManagerOwnedSnapshot;
        if (!snapshotRunnableFuture.isDone()) {
            snapshotRunnableFuture.run();
        }
        if ((jobManagerOwnedSnapshot = (KeyedStateHandle)(snapshotResult = (SnapshotResult)snapshotRunnableFuture.get()).getJobManagerOwnedSnapshot()) != null) {
            jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry);
        }
        return jobManagerOwnedSnapshot;
    }

    private OperatorStateBackend createOperatorStateBackend() throws Exception {
        return this.getStateBackend().createOperatorStateBackend((Environment)this.env, "test_op", Collections.emptyList(), new CloseableRegistry());
    }

    private OperatorStateBackend createOperatorStateBackend(Collection<OperatorStateHandle> state) throws Exception {
        return this.getStateBackend().createOperatorStateBackend((Environment)this.env, "test_op", state, new CloseableRegistry());
    }

    private OperatorStateBackend restoreOperatorStateBackend(OperatorStateHandle state) throws Exception {
        OperatorStateBackend operatorStateBackend = this.createOperatorStateBackend((Collection<OperatorStateHandle>)StateObjectCollection.singleton((StateObject)state));
        return operatorStateBackend;
    }

    private OperatorStateHandle runSnapshot(RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunnableFuture) throws Exception {
        if (!snapshotRunnableFuture.isDone()) {
            snapshotRunnableFuture.run();
        }
        return (OperatorStateHandle)((SnapshotResult)snapshotRunnableFuture.get()).getJobManagerOwnedSnapshot();
    }

    public static class CustomVoidNamespaceSerializerSnapshot
    implements TypeSerializerSnapshot<VoidNamespace> {
        public TypeSerializer<VoidNamespace> restoreSerializer() {
            return new CustomVoidNamespaceSerializer();
        }

        public TypeSerializerSchemaCompatibility<VoidNamespace> resolveSchemaCompatibility(TypeSerializer<VoidNamespace> newSerializer) {
            return TypeSerializerSchemaCompatibility.compatibleAsIs();
        }

        public void writeSnapshot(DataOutputView out) throws IOException {
        }

        public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
        }

        public boolean equals(Object obj) {
            return obj instanceof CustomVoidNamespaceSerializerSnapshot;
        }

        public int hashCode() {
            return 0;
        }

        public int getCurrentVersion() {
            return 0;
        }
    }

    public static class CustomVoidNamespaceSerializer
    extends TypeSerializer<VoidNamespace> {
        private static final long serialVersionUID = 1L;
        public static final CustomVoidNamespaceSerializer INSTANCE = new CustomVoidNamespaceSerializer();

        public boolean isImmutableType() {
            return true;
        }

        public VoidNamespace createInstance() {
            return VoidNamespace.get();
        }

        public VoidNamespace copy(VoidNamespace from) {
            return VoidNamespace.get();
        }

        public VoidNamespace copy(VoidNamespace from, VoidNamespace reuse) {
            return VoidNamespace.get();
        }

        public int getLength() {
            return 0;
        }

        public void serialize(VoidNamespace record, DataOutputView target) throws IOException {
            target.write(0);
        }

        public VoidNamespace deserialize(DataInputView source) throws IOException {
            source.readByte();
            return VoidNamespace.get();
        }

        public VoidNamespace deserialize(VoidNamespace reuse, DataInputView source) throws IOException {
            source.readByte();
            return VoidNamespace.get();
        }

        public void copy(DataInputView source, DataOutputView target) throws IOException {
            target.write((int)source.readByte());
        }

        public TypeSerializer<VoidNamespace> duplicate() {
            return this;
        }

        public boolean equals(Object obj) {
            return obj instanceof CustomVoidNamespaceSerializer;
        }

        public int hashCode() {
            return ((Object)((Object)this)).getClass().hashCode();
        }

        public TypeSerializerSnapshot<VoidNamespace> snapshotConfiguration() {
            return new CustomVoidNamespaceSerializerSnapshot();
        }
    }
}

