package org.apache.flink.runtime.state;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.sort.ExternalSortLargeRecordsITCase;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.heap.AbstractHeapState;
import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
import org.apache.flink.runtime.state.heap.NestedStateMap;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase.class */
public abstract class StateBackendTestBase<B extends AbstractStateBackend> extends TestLogger {

    @Rule
    public final ExpectedException expectedException = ExpectedException.none();
    private CheckpointStreamFactory checkpointStreamFactory;
    private MockEnvironment env;

    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase$AppendingFold.class */
    private static class AppendingFold implements FoldFunction<Integer, String> {
        private static final long serialVersionUID = 1;

        private AppendingFold() {
        }

        public String fold(String str, Integer num) throws Exception {
            return str + "," + num;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase$AppendingReduce.class */
    private static class AppendingReduce implements ReduceFunction<String> {
        private AppendingReduce() {
        }

        public String reduce(String str, String str2) throws Exception {
            return str + "," + str2;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase$CustomKryoTestSerializer.class */
    public static class CustomKryoTestSerializer extends JavaSerializer {
        public void write(Kryo kryo, Output output, Object obj) {
            super.write(kryo, output, obj);
        }

        public Object read(Kryo kryo, Input input, Class cls) {
            throw new ExpectedKryoTestException();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase$ExceptionThrowingTestSerializer.class */
    public static class ExceptionThrowingTestSerializer extends JavaSerializer {
        public void write(Kryo kryo, Output output, Object obj) {
            throw new ExpectedKryoTestException();
        }

        public Object read(Kryo kryo, Input input, Class cls) {
            throw new ExpectedKryoTestException();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase$ExpectedKryoTestException.class */
    private static class ExpectedKryoTestException extends RuntimeException {
        private ExpectedKryoTestException() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase$ImmutableAggregatingAddingFunction.class */
    private static class ImmutableAggregatingAddingFunction implements AggregateFunction<Long, Long, Long> {
        private ImmutableAggregatingAddingFunction() {
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Long m388createAccumulator() {
            return 0L;
        }

        public Long add(Long l, Long l2) {
            return Long.valueOf(l2.longValue() + l.longValue());
        }

        public Long getResult(Long l) {
            return l;
        }

        public Long merge(Long l, Long l2) {
            return Long.valueOf(l.longValue() + l2.longValue());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase$MutableAggregatingAddingFunction.class */
    private static class MutableAggregatingAddingFunction implements AggregateFunction<Long, MutableLong, Long> {
        private MutableAggregatingAddingFunction() {
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public MutableLong m389createAccumulator() {
            return new MutableLong();
        }

        public MutableLong add(Long l, MutableLong mutableLong) {
            mutableLong.value += l.longValue();
            return mutableLong;
        }

        public Long getResult(MutableLong mutableLong) {
            return Long.valueOf(mutableLong.value);
        }

        public MutableLong merge(MutableLong mutableLong, MutableLong mutableLong2) {
            mutableLong.value += mutableLong2.value;
            return mutableLong;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase$MutableLong.class */
    public static final class MutableLong {
        long value;

        private MutableLong() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase$TestNestedPojoClassA.class */
    public static class TestNestedPojoClassA implements Serializable {
        private Double doubleField;
        private Integer intField;

        public TestNestedPojoClassA() {
        }

        public TestNestedPojoClassA(Double d, Integer num) {
            this.doubleField = d;
            this.intField = num;
        }

        public Double getDoubleField() {
            return this.doubleField;
        }

        public void setDoubleField(Double d) {
            this.doubleField = d;
        }

        public Integer getIntField() {
            return this.intField;
        }

        public void setIntField(Integer num) {
            this.intField = num;
        }

        public String toString() {
            return "TestNestedPojoClassA{doubleField='" + this.doubleField + "', intField=" + this.intField + '}';
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            TestNestedPojoClassA testNestedPojoClassA = (TestNestedPojoClassA) obj;
            if (this.doubleField.equals(testNestedPojoClassA.doubleField)) {
                return this.intField.equals(testNestedPojoClassA.intField);
            }
            return false;
        }

        public int hashCode() {
            return (31 * this.doubleField.hashCode()) + this.intField.hashCode();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase$TestNestedPojoClassB.class */
    public static class TestNestedPojoClassB implements Serializable {
        private Double doubleField;
        private String strField;

        public TestNestedPojoClassB() {
        }

        public TestNestedPojoClassB(Double d, String str) {
            this.doubleField = d;
            this.strField = str;
        }

        public Double getDoubleField() {
            return this.doubleField;
        }

        public void setDoubleField(Double d) {
            this.doubleField = d;
        }

        public String getStrField() {
            return this.strField;
        }

        public void setStrField(String str) {
            this.strField = str;
        }

        public String toString() {
            return "TestNestedPojoClassB{doubleField='" + this.doubleField + "', strField=" + this.strField + '}';
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            TestNestedPojoClassB testNestedPojoClassB = (TestNestedPojoClassB) obj;
            if (this.doubleField.equals(testNestedPojoClassB.doubleField)) {
                return this.strField.equals(testNestedPojoClassB.strField);
            }
            return false;
        }

        public int hashCode() {
            return (31 * this.doubleField.hashCode()) + this.strField.hashCode();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase$TestPojo.class */
    public static class TestPojo implements Serializable {
        private String strField;
        private Integer intField;
        private TestNestedPojoClassA kryoClassAField;
        private TestNestedPojoClassB kryoClassBField;

        public TestPojo() {
        }

        public TestPojo(String str, Integer num) {
            this.strField = str;
            this.intField = num;
            this.kryoClassAField = null;
            this.kryoClassBField = null;
        }

        public TestPojo(String str, Integer num, TestNestedPojoClassA testNestedPojoClassA, TestNestedPojoClassB testNestedPojoClassB) {
            this.strField = str;
            this.intField = num;
            this.kryoClassAField = testNestedPojoClassA;
            this.kryoClassBField = testNestedPojoClassB;
        }

        public String getStrField() {
            return this.strField;
        }

        public void setStrField(String str) {
            this.strField = str;
        }

        public Integer getIntField() {
            return this.intField;
        }

        public void setIntField(Integer num) {
            this.intField = num;
        }

        public TestNestedPojoClassA getKryoClassAField() {
            return this.kryoClassAField;
        }

        public void setKryoClassAField(TestNestedPojoClassA testNestedPojoClassA) {
            this.kryoClassAField = testNestedPojoClassA;
        }

        public TestNestedPojoClassB getKryoClassBField() {
            return this.kryoClassBField;
        }

        public void setKryoClassBField(TestNestedPojoClassB testNestedPojoClassB) {
            this.kryoClassBField = testNestedPojoClassB;
        }

        public String toString() {
            return "TestPojo{strField='" + this.strField + "', intField=" + this.intField + '}';
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            TestPojo testPojo = (TestPojo) obj;
            return this.strField.equals(testPojo.strField) && this.intField.equals(testPojo.intField) && ((this.kryoClassAField == null && testPojo.kryoClassAField == null) || this.kryoClassAField.equals(testPojo.kryoClassAField)) && ((this.kryoClassBField == null && testPojo.kryoClassBField == null) || this.kryoClassBField.equals(testPojo.kryoClassBField));
        }

        public int hashCode() {
            int hashCode = (31 * this.strField.hashCode()) + this.intField.hashCode();
            if (this.kryoClassAField != null) {
                hashCode = (31 * hashCode) + this.kryoClassAField.hashCode();
            }
            if (this.kryoClassBField != null) {
                hashCode = (31 * hashCode) + this.kryoClassBField.hashCode();
            }
            return hashCode;
        }
    }

    @Before
    public void before() {
        this.env = buildMockEnv();
    }

    @After
    public void after() {
        IOUtils.closeQuietly(this.env);
    }

    protected abstract B getStateBackend() throws Exception;

    protected abstract boolean isSerializerPresenceRequiredOnRestore();

    protected CheckpointStreamFactory createStreamFactory() throws Exception {
        if (this.checkpointStreamFactory == null) {
            this.checkpointStreamFactory = getStateBackend().createCheckpointStorage(new JobID()).resolveCheckpointStorageLocation(1L, CheckpointStorageLocationReference.getDefault());
        }
        return this.checkpointStreamFactory;
    }

    protected <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> typeSerializer) throws Exception {
        return createKeyedBackend(typeSerializer, this.env);
    }

    protected <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> typeSerializer, Environment environment) throws Exception {
        return createKeyedBackend(typeSerializer, 10, new KeyGroupRange(0, 9), environment);
    }

    protected <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, Environment environment) throws Exception {
        return getStateBackend().createKeyedStateBackend(environment, new JobID(), "test_op", typeSerializer, i, keyGroupRange, environment.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry());
    }

    protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> typeSerializer, KeyedStateHandle keyedStateHandle) throws Exception {
        return restoreKeyedBackend(typeSerializer, keyedStateHandle, this.env);
    }

    protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> typeSerializer, KeyedStateHandle keyedStateHandle, Environment environment) throws Exception {
        return restoreKeyedBackend(typeSerializer, 10, new KeyGroupRange(0, 9), Collections.singletonList(keyedStateHandle), environment);
    }

    protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, List<KeyedStateHandle> list, Environment environment) throws Exception {
        return getStateBackend().createKeyedStateBackend(environment, new JobID(), "test_op", typeSerializer, i, keyGroupRange, environment.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), list, new CloseableRegistry());
    }

    /* JADX WARN: Type inference failed for: r0v34, types: [java.util.PrimitiveIterator$OfInt] */
    /* JADX WARN: Type inference failed for: r0v54, types: [java.util.PrimitiveIterator$OfInt] */
    @Test
    public void testGetKeys() throws Exception {
        Throwable th;
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            ValueState partitionedState = createKeyedBackend.getPartitionedState("ns1", StringSerializer.INSTANCE, new ValueStateDescriptor("get-keys-test", IntSerializer.INSTANCE));
            for (int i = 0; i < 1000; i++) {
                createKeyedBackend.setCurrentKey(Integer.valueOf(i));
                partitionedState.update(Integer.valueOf(i * 2));
            }
            ValueState partitionedState2 = createKeyedBackend.getPartitionedState("ns2", StringSerializer.INSTANCE, new ValueStateDescriptor("get-keys-test", IntSerializer.INSTANCE));
            for (int i2 = 1000; i2 < 2000; i2++) {
                createKeyedBackend.setCurrentKey(Integer.valueOf(i2));
                partitionedState2.update(Integer.valueOf(i2 * 2));
            }
            Stream sorted = createKeyedBackend.getKeys("get-keys-test", "ns1").sorted();
            Throwable th2 = null;
            try {
                try {
                    ?? it = sorted.mapToInt(num -> {
                        return num.intValue();
                    }).iterator();
                    for (int i3 = 0; i3 < 1000; i3++) {
                        Assert.assertTrue(it.hasNext());
                        Assert.assertEquals(i3, it.nextInt());
                    }
                    Assert.assertFalse(it.hasNext());
                    if (sorted != null) {
                        if (0 != 0) {
                            try {
                                sorted.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            sorted.close();
                        }
                    }
                    sorted = createKeyedBackend.getKeys("get-keys-test", "ns2").sorted();
                    th = null;
                } finally {
                }
                try {
                    try {
                        ?? it2 = sorted.mapToInt(num2 -> {
                            return num2.intValue();
                        }).iterator();
                        for (int i4 = 1000; i4 < 2000; i4++) {
                            Assert.assertTrue(it2.hasNext());
                            Assert.assertEquals(i4, it2.nextInt());
                        }
                        Assert.assertFalse(it2.hasNext());
                        if (sorted != null) {
                            if (0 != 0) {
                                try {
                                    sorted.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                sorted.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            IOUtils.closeQuietly(createKeyedBackend);
            createKeyedBackend.dispose();
        }
    }

    @Test
    public void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, this.env);
        this.env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
        GenericTypeInfo genericTypeInfo = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue(genericTypeInfo.createSerializer(this.env.getExecutionConfig()) instanceof KryoSerializer);
        ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", genericTypeInfo));
        int i = 0;
        createKeyedBackend.setCurrentKey(1);
        try {
            partitionedState.update(new TestPojo("u1", 1));
        } catch (ExpectedKryoTestException e) {
            i = 0 + 1;
        } catch (Exception e2) {
            if (!(e2.getCause() instanceof ExpectedKryoTestException)) {
                throw e2;
            }
            i = 0 + 1;
        }
        try {
            runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        } catch (ExpectedKryoTestException e3) {
            i++;
        } catch (Exception e4) {
            if (!(e4.getCause() instanceof ExpectedKryoTestException)) {
                throw e4;
            }
            i++;
        }
        Assert.assertEquals("Didn't see the expected Kryo exception.", 1L, i);
        createKeyedBackend.dispose();
    }

    @Test
    public void testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, this.env);
        this.env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
        GenericTypeInfo genericTypeInfo = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue(genericTypeInfo.createSerializer(this.env.getExecutionConfig()) instanceof KryoSerializer);
        genericTypeInfo.createSerializer(this.env.getExecutionConfig());
        InternalValueState internalValueState = (ValueState) createKeyedBackend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", genericTypeInfo));
        Assert.assertTrue(internalValueState instanceof InternalValueState);
        internalValueState.setCurrentNamespace(VoidNamespace.INSTANCE);
        int i = 0;
        createKeyedBackend.setCurrentKey(1);
        try {
            internalValueState.update(new TestPojo("u1", 1));
        } catch (ExpectedKryoTestException e) {
            i = 0 + 1;
        } catch (Exception e2) {
            if (!(e2.getCause() instanceof ExpectedKryoTestException)) {
                throw e2;
            }
            i = 0 + 1;
        }
        try {
            runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        } catch (ExpectedKryoTestException e3) {
            i++;
        } catch (Exception e4) {
            if (!(e4.getCause() instanceof ExpectedKryoTestException)) {
                throw e4;
            }
            i++;
        }
        Assert.assertEquals("Didn't see the expected Kryo exception.", 1L, i);
        createKeyedBackend.dispose();
    }

    @Test
    public void testBackendUsesRegisteredKryoSerializer() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, this.env);
        this.env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
        GenericTypeInfo genericTypeInfo = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue(genericTypeInfo.createSerializer(this.env.getExecutionConfig()) instanceof KryoSerializer);
        ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", genericTypeInfo));
        int i = 0;
        createKeyedBackend.setCurrentKey(1);
        try {
            partitionedState.update(new TestPojo("u1", 1));
        } catch (ExpectedKryoTestException e) {
            i = 0 + 1;
        } catch (Exception e2) {
            if (!(e2.getCause() instanceof ExpectedKryoTestException)) {
                throw e2;
            }
            i = 0 + 1;
        }
        try {
            runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        } catch (ExpectedKryoTestException e3) {
            i++;
        } catch (Exception e4) {
            if (!(e4.getCause() instanceof ExpectedKryoTestException)) {
                throw e4;
            }
            i++;
        }
        Assert.assertEquals("Didn't see the expected Kryo exception.", 1L, i);
        createKeyedBackend.dispose();
    }

    @Test
    public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, this.env);
        this.env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
        GenericTypeInfo genericTypeInfo = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue(genericTypeInfo.createSerializer(this.env.getExecutionConfig()) instanceof KryoSerializer);
        InternalValueState internalValueState = (ValueState) createKeyedBackend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", genericTypeInfo));
        Assert.assertTrue(internalValueState instanceof InternalValueState);
        internalValueState.setCurrentNamespace(VoidNamespace.INSTANCE);
        int i = 0;
        createKeyedBackend.setCurrentKey(1);
        try {
            internalValueState.update(new TestPojo("u1", 1));
        } catch (ExpectedKryoTestException e) {
            i = 0 + 1;
        } catch (Exception e2) {
            if (!(e2.getCause() instanceof ExpectedKryoTestException)) {
                throw e2;
            }
            i = 0 + 1;
        }
        try {
            runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        } catch (ExpectedKryoTestException e3) {
            i++;
        } catch (Exception e4) {
            if (!(e4.getCause() instanceof ExpectedKryoTestException)) {
                throw e4;
            }
            i++;
        }
        Assert.assertEquals("Didn't see the expected Kryo exception.", 1L, i);
        createKeyedBackend.dispose();
    }

    @Test
    public void testKryoRegisteringRestoreResilienceWithRegisteredType() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, this.env);
        GenericTypeInfo genericTypeInfo = new GenericTypeInfo(TestPojo.class);
        Assert.assertTrue(genericTypeInfo.createSerializer(this.env.getExecutionConfig()) instanceof KryoSerializer);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", genericTypeInfo);
        ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        createKeyedBackend.setCurrentKey(1);
        partitionedState.update(new TestPojo("u1", 1));
        createKeyedBackend.setCurrentKey(2);
        partitionedState.update(new TestPojo("u2", 2));
        KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        createKeyedBackend.dispose();
        this.env.getExecutionConfig().registerKryoType(TestPojo.class);
        AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot, this.env);
        runSnapshot.discardState();
        ValueState partitionedState2 = restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        restoreKeyedBackend.setCurrentKey(1);
        Assert.assertEquals(partitionedState2.value(), new TestPojo("u1", 1));
        restoreKeyedBackend.setCurrentKey(2);
        Assert.assertEquals(partitionedState2.value(), new TestPojo("u2", 2));
        restoreKeyedBackend.dispose();
    }

    @Test
    public void testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend abstractKeyedStateBackend = null;
        try {
            AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, this.env);
            GenericTypeInfo genericTypeInfo = new GenericTypeInfo(TestPojo.class);
            Assert.assertTrue(genericTypeInfo.createSerializer(this.env.getExecutionConfig()) instanceof KryoSerializer);
            ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", genericTypeInfo));
            createKeyedBackend.setCurrentKey(1);
            partitionedState.update(new TestPojo("u1", 1));
            createKeyedBackend.setCurrentKey(2);
            partitionedState.update(new TestPojo("u2", 2));
            KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            createKeyedBackend.dispose();
            this.env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot, this.env);
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", genericTypeInfo);
            ValueState partitionedState2 = restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            restoreKeyedBackend.setCurrentKey(1);
            partitionedState2.update(new TestPojo("u1", 11));
            KeyedStateHandle runSnapshot2 = runSnapshot(restoreKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            runSnapshot.discardState();
            restoreKeyedBackend.dispose();
            this.env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            this.expectedException.expect(CoreMatchers.anyOf(CoreMatchers.isA(ExpectedKryoTestException.class), Matchers.hasProperty("cause", CoreMatchers.isA(ExpectedKryoTestException.class))));
            abstractKeyedStateBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot2, this.env);
            ValueState partitionedState3 = abstractKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            abstractKeyedStateBackend.setCurrentKey(1);
            partitionedState3.value();
            runSnapshot2.discardState();
            abstractKeyedStateBackend.dispose();
            IOUtils.closeQuietly(abstractKeyedStateBackend);
            abstractKeyedStateBackend.dispose();
        } catch (Throwable th) {
            IOUtils.closeQuietly(abstractKeyedStateBackend);
            abstractKeyedStateBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend abstractKeyedStateBackend = null;
        try {
            AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, this.env);
            GenericTypeInfo genericTypeInfo = new GenericTypeInfo(TestPojo.class);
            Assert.assertTrue(genericTypeInfo.createSerializer(this.env.getExecutionConfig()) instanceof KryoSerializer);
            ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", genericTypeInfo));
            createKeyedBackend.setCurrentKey(1);
            partitionedState.update(new TestPojo("u1", 1));
            createKeyedBackend.setCurrentKey(2);
            partitionedState.update(new TestPojo("u2", 2));
            KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            createKeyedBackend.dispose();
            this.env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot, this.env);
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", genericTypeInfo);
            ValueState partitionedState2 = restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            restoreKeyedBackend.setCurrentKey(1);
            partitionedState2.update(new TestPojo("u1", 11));
            KeyedStateHandle runSnapshot2 = runSnapshot(restoreKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            runSnapshot.discardState();
            restoreKeyedBackend.dispose();
            this.env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
            this.expectedException.expect(CoreMatchers.anyOf(CoreMatchers.isA(ExpectedKryoTestException.class), Matchers.hasProperty("cause", CoreMatchers.isA(ExpectedKryoTestException.class))));
            abstractKeyedStateBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot2, this.env);
            ValueState partitionedState3 = abstractKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            abstractKeyedStateBackend.setCurrentKey(1);
            partitionedState3.value();
            abstractKeyedStateBackend.dispose();
            if (abstractKeyedStateBackend != null) {
                abstractKeyedStateBackend.dispose();
            }
        } catch (Throwable th) {
            if (abstractKeyedStateBackend != null) {
                abstractKeyedStateBackend.dispose();
            }
            throw th;
        }
    }

    @Test
    public void testKryoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        this.env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class);
        this.env.getExecutionConfig().registerKryoType(TestNestedPojoClassB.class);
        AbstractKeyedStateBackend createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, this.env);
        try {
            GenericTypeInfo genericTypeInfo = new GenericTypeInfo(TestPojo.class);
            Assert.assertTrue(genericTypeInfo.createSerializer(this.env.getExecutionConfig()) instanceof KryoSerializer);
            InternalKvState internalKvState = (ValueState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", genericTypeInfo));
            KryoSerializer valueSerializer = internalKvState.getValueSerializer();
            int id = valueSerializer.getKryo().getRegistration(TestPojo.class).getId();
            int id2 = valueSerializer.getKryo().getRegistration(TestNestedPojoClassA.class).getId();
            int id3 = valueSerializer.getKryo().getRegistration(TestNestedPojoClassB.class).getId();
            createKeyedBackend.setCurrentKey(1);
            internalKvState.update(new TestPojo("u1", 1, new TestNestedPojoClassA(Double.valueOf(1.0d), 2), new TestNestedPojoClassB(Double.valueOf(2.3d), "foo")));
            createKeyedBackend.setCurrentKey(2);
            internalKvState.update(new TestPojo("u2", 2, new TestNestedPojoClassA(Double.valueOf(2.0d), 5), new TestNestedPojoClassB(Double.valueOf(3.1d), "bar")));
            KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            createKeyedBackend.dispose();
            this.env.close();
            this.env = buildMockEnv();
            this.env.getExecutionConfig().registerKryoType(TestNestedPojoClassB.class);
            this.env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class);
            createKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot, this.env);
            InternalKvState internalKvState2 = (ValueState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", genericTypeInfo));
            KryoSerializer valueSerializer2 = internalKvState2.getValueSerializer();
            Assert.assertEquals(id, valueSerializer2.getKryo().getRegistration(TestPojo.class).getId());
            Assert.assertEquals(id2, valueSerializer2.getKryo().getRegistration(TestNestedPojoClassA.class).getId());
            Assert.assertEquals(id3, valueSerializer2.getKryo().getRegistration(TestNestedPojoClassB.class).getId());
            createKeyedBackend.setCurrentKey(1);
            internalKvState2.update(new TestPojo("u1", 11, new TestNestedPojoClassA(Double.valueOf(22.1d), 12), new TestNestedPojoClassB(Double.valueOf(1.23d), "foobar")));
            runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            runSnapshot.discardState();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testPojoRestoreResilienceWithDifferentRegistrationOrder() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        this.env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
        this.env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class);
        AbstractKeyedStateBackend createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, this.env);
        try {
            TypeInformation forClass = TypeExtractor.getForClass(TestPojo.class);
            Assert.assertTrue(forClass.createSerializer(this.env.getExecutionConfig()) instanceof PojoSerializer);
            ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", forClass));
            createKeyedBackend.setCurrentKey(1);
            partitionedState.update(new TestPojo("u1", 1, new TestNestedPojoClassA(Double.valueOf(1.0d), 2), new TestNestedPojoClassB(Double.valueOf(2.3d), "foo")));
            createKeyedBackend.setCurrentKey(2);
            partitionedState.update(new TestPojo("u2", 2, new TestNestedPojoClassA(Double.valueOf(2.0d), 5), new TestNestedPojoClassB(Double.valueOf(3.1d), "bar")));
            KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            createKeyedBackend.dispose();
            this.env.close();
            this.env = buildMockEnv();
            this.env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class);
            this.env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
            createKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot, this.env);
            ValueState partitionedState2 = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", forClass));
            createKeyedBackend.setCurrentKey(1);
            partitionedState2.update(new TestPojo("u1", 11, new TestNestedPojoClassA(Double.valueOf(22.1d), 12), new TestNestedPojoClassB(Double.valueOf(1.23d), "foobar")));
            runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            runSnapshot.discardState();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testValueState() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class);
        IntSerializer intSerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer voidNamespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        InternalKvState internalKvState = (ValueState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        InternalKvState internalKvState2 = internalKvState;
        TypeSerializer serializer = valueStateDescriptor.getSerializer();
        createKeyedBackend.setCurrentKey(1);
        Assert.assertNull(internalKvState.value());
        Assert.assertNull(getSerializedValue(internalKvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        internalKvState.update("1");
        createKeyedBackend.setCurrentKey(2);
        Assert.assertNull(internalKvState.value());
        Assert.assertNull(getSerializedValue(internalKvState2, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        internalKvState.update("2");
        createKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("1", internalKvState.value());
        Assert.assertEquals("1", getSerializedValue(internalKvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        createKeyedBackend.setCurrentKey(1);
        internalKvState.update("u1");
        createKeyedBackend.setCurrentKey(2);
        internalKvState.update("u2");
        createKeyedBackend.setCurrentKey(3);
        internalKvState.update("u3");
        KeyedStateHandle runSnapshot2 = runSnapshot(createKeyedBackend.snapshot(682375462379L, 4L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        createKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("u1", internalKvState.value());
        Assert.assertEquals("u1", getSerializedValue(internalKvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        createKeyedBackend.setCurrentKey(2);
        Assert.assertEquals("u2", internalKvState.value());
        Assert.assertEquals("u2", getSerializedValue(internalKvState2, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        createKeyedBackend.setCurrentKey(3);
        Assert.assertEquals("u3", internalKvState.value());
        Assert.assertEquals("u3", getSerializedValue(internalKvState2, 3, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        createKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
        runSnapshot.discardState();
        InternalKvState internalKvState3 = (ValueState) restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        InternalKvState internalKvState4 = internalKvState3;
        restoreKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("1", internalKvState3.value());
        Assert.assertEquals("1", getSerializedValue(internalKvState4, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend.setCurrentKey(2);
        Assert.assertEquals("2", internalKvState3.value());
        Assert.assertEquals("2", getSerializedValue(internalKvState4, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend2 = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot2);
        runSnapshot2.discardState();
        InternalKvState internalKvState5 = (ValueState) restoreKeyedBackend2.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        InternalKvState internalKvState6 = internalKvState5;
        restoreKeyedBackend2.setCurrentKey(1);
        Assert.assertEquals("u1", internalKvState5.value());
        Assert.assertEquals("u1", getSerializedValue(internalKvState6, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend2.setCurrentKey(2);
        Assert.assertEquals("u2", internalKvState5.value());
        Assert.assertEquals("u2", getSerializedValue(internalKvState6, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend2.setCurrentKey(3);
        Assert.assertEquals("u3", internalKvState5.value());
        Assert.assertEquals("u3", getSerializedValue(internalKvState6, 3, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend2.dispose();
    }

    @Test
    public void testValueStateWorkWithTtl() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", MutableLong.class);
            valueStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.seconds(1L)).build());
            ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            createKeyedBackend.setCurrentKey(1);
            partitionedState.update(new MutableLong());
            partitionedState.value();
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testValueStateRace() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        final int i = 1;
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class);
        final IntSerializer intSerializer = IntSerializer.INSTANCE;
        final IntSerializer intSerializer2 = IntSerializer.INSTANCE;
        final InternalKvState internalKvState = (ValueState) createKeyedBackend.getPartitionedState(1, IntSerializer.INSTANCE, valueStateDescriptor);
        final TypeSerializer serializer = valueStateDescriptor.getSerializer();
        final InternalKvState internalKvState2 = internalKvState;
        createKeyedBackend.setCurrentKey(1);
        internalKvState2.setCurrentNamespace(2);
        internalKvState.update("2");
        Assert.assertEquals("2", internalKvState.value());
        Assert.assertNull(getSerializedValue(internalKvState2, 3, intSerializer, 1, IntSerializer.INSTANCE, serializer));
        Assert.assertEquals("2", internalKvState.value());
        internalKvState2.setCurrentNamespace(1);
        createKeyedBackend.setCurrentKey(10);
        Assert.assertNull(internalKvState.value());
        Assert.assertNull(getSerializedValue(internalKvState2, 10, intSerializer, 1, intSerializer2, serializer));
        internalKvState.update("1");
        final CheckedThread checkedThread = new CheckedThread("State getter") { // from class: org.apache.flink.runtime.state.StateBackendTestBase.1
            public void go() throws Exception {
                while (!isInterrupted()) {
                    Assert.assertEquals("1", internalKvState.value());
                }
            }
        };
        final CheckedThread checkedThread2 = new CheckedThread("Serialized state getter") { // from class: org.apache.flink.runtime.state.StateBackendTestBase.2
            public void go() throws Exception {
                while (!isInterrupted() && checkedThread.isAlive()) {
                    Assert.assertEquals("1", (String) StateBackendTestBase.getSerializedValue(internalKvState2, 10, intSerializer, i, intSerializer2, serializer));
                }
            }
        };
        checkedThread.start();
        checkedThread2.start();
        Timer timer = new Timer("stopper");
        timer.schedule(new TimerTask() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.3
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                checkedThread.interrupt();
                checkedThread2.interrupt();
                cancel();
            }
        }, 100L);
        try {
            checkedThread2.sync();
            checkedThread.interrupt();
            checkedThread.sync();
            timer.cancel();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testMultipleValueStates() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), this.env);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("a-string", StringSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor2 = new ValueStateDescriptor("an-integer", IntSerializer.INSTANCE);
        ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        ValueState partitionedState2 = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor2);
        createKeyedBackend.setCurrentKey(1);
        Assert.assertNull(partitionedState.value());
        Assert.assertNull(partitionedState2.value());
        partitionedState.update("1");
        Assert.assertEquals("1", partitionedState.value());
        Assert.assertNull(partitionedState2.value());
        partitionedState2.update(13);
        Assert.assertEquals("1", partitionedState.value());
        Assert.assertEquals(13L, ((Integer) partitionedState2.value()).intValue());
        KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        createKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), Collections.singletonList(runSnapshot), this.env);
        runSnapshot.discardState();
        restoreKeyedBackend.setCurrentKey(1);
        ValueState partitionedState3 = restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        ValueState partitionedState4 = restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor2);
        Assert.assertEquals("1", partitionedState3.value());
        Assert.assertEquals(13L, ((Integer) partitionedState4.value()).intValue());
        restoreKeyedBackend.dispose();
    }

    @Test
    public void testValueStateNullUpdate() throws Exception {
        try {
            LongSerializer.INSTANCE.serialize((Long) null, new DataOutputViewStreamWrapper(new ByteArrayOutputStream()));
            Assert.fail("Should fail with NullPointerException");
        } catch (NullPointerException e) {
        }
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", LongSerializer.INSTANCE, 42L);
        ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        createKeyedBackend.setCurrentKey(1);
        Assert.assertEquals(42L, ((Long) partitionedState.value()).longValue());
        partitionedState.update(1L);
        Assert.assertEquals(1L, ((Long) partitionedState.value()).longValue());
        createKeyedBackend.setCurrentKey(2);
        Assert.assertEquals(42L, ((Long) partitionedState.value()).longValue());
        createKeyedBackend.setCurrentKey(1);
        partitionedState.clear();
        Assert.assertEquals(42L, ((Long) partitionedState.value()).longValue());
        partitionedState.update(17L);
        Assert.assertEquals(17L, ((Long) partitionedState.value()).longValue());
        partitionedState.update((Object) null);
        Assert.assertEquals(42L, ((Long) partitionedState.value()).longValue());
        KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        createKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
        runSnapshot.discardState();
        restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        restoreKeyedBackend.dispose();
    }

    @Test
    public void testListState() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("id", String.class);
        IntSerializer intSerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer voidNamespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        InternalKvState internalKvState = (ListState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
        InternalKvState internalKvState2 = internalKvState;
        TypeSerializer elementSerializer = listStateDescriptor.getElementSerializer();
        Joiner on = Joiner.on(",");
        createKeyedBackend.setCurrentKey(1);
        Assert.assertNull(internalKvState.get());
        Assert.assertNull(getSerializedList(internalKvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, elementSerializer));
        internalKvState.add("1");
        createKeyedBackend.setCurrentKey(2);
        Assert.assertNull(internalKvState.get());
        Assert.assertNull(getSerializedList(internalKvState2, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, elementSerializer));
        internalKvState.update(Arrays.asList("2"));
        createKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("1", on.join((Iterable) internalKvState.get()));
        Assert.assertEquals("1", on.join(getSerializedList(internalKvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, elementSerializer)));
        KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        createKeyedBackend.setCurrentKey(1);
        internalKvState.add("u1");
        createKeyedBackend.setCurrentKey(2);
        internalKvState.add("u2");
        createKeyedBackend.setCurrentKey(3);
        internalKvState.add("u3");
        KeyedStateHandle runSnapshot2 = runSnapshot(createKeyedBackend.snapshot(682375462379L, 4L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        createKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("1,u1", on.join((Iterable) internalKvState.get()));
        Assert.assertEquals("1,u1", on.join(getSerializedList(internalKvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, elementSerializer)));
        createKeyedBackend.setCurrentKey(2);
        Assert.assertEquals("2,u2", on.join((Iterable) internalKvState.get()));
        Assert.assertEquals("2,u2", on.join(getSerializedList(internalKvState2, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, elementSerializer)));
        createKeyedBackend.setCurrentKey(3);
        Assert.assertEquals("u3", on.join((Iterable) internalKvState.get()));
        Assert.assertEquals("u3", on.join(getSerializedList(internalKvState2, 3, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, elementSerializer)));
        createKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
        runSnapshot.discardState();
        InternalKvState internalKvState3 = (ListState) restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
        InternalKvState internalKvState4 = internalKvState3;
        restoreKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("1", on.join((Iterable) internalKvState3.get()));
        Assert.assertEquals("1", on.join(getSerializedList(internalKvState4, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, elementSerializer)));
        restoreKeyedBackend.setCurrentKey(2);
        Assert.assertEquals("2", on.join((Iterable) internalKvState3.get()));
        Assert.assertEquals("2", on.join(getSerializedList(internalKvState4, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, elementSerializer)));
        restoreKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend2 = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot2);
        runSnapshot2.discardState();
        InternalKvState internalKvState5 = (ListState) restoreKeyedBackend2.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
        InternalKvState internalKvState6 = internalKvState5;
        restoreKeyedBackend2.setCurrentKey(1);
        Assert.assertEquals("1,u1", on.join((Iterable) internalKvState5.get()));
        Assert.assertEquals("1,u1", on.join(getSerializedList(internalKvState6, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, elementSerializer)));
        restoreKeyedBackend2.setCurrentKey(2);
        Assert.assertEquals("2,u2", on.join((Iterable) internalKvState5.get()));
        Assert.assertEquals("2,u2", on.join(getSerializedList(internalKvState6, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, elementSerializer)));
        restoreKeyedBackend2.setCurrentKey(3);
        Assert.assertEquals("u3", on.join((Iterable) internalKvState5.get()));
        Assert.assertEquals("u3", on.join(getSerializedList(internalKvState6, 3, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, elementSerializer)));
        restoreKeyedBackend2.dispose();
    }

    @Test
    public void testListStateAddNull() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
        try {
            ListState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor("my-state", Long.class));
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(partitionedState.get());
            this.expectedException.expect(NullPointerException.class);
            partitionedState.add((Object) null);
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testListStateAddAllNullEntries() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
        try {
            ListState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor("my-state", Long.class));
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(partitionedState.get());
            this.expectedException.expect(NullPointerException.class);
            ArrayList arrayList = new ArrayList();
            arrayList.add(3L);
            arrayList.add(null);
            arrayList.add(5L);
            partitionedState.addAll(arrayList);
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testListStateAddAllNull() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
        try {
            ListState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor("my-state", Long.class));
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(partitionedState.get());
            this.expectedException.expect(NullPointerException.class);
            partitionedState.addAll((List) null);
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testListStateUpdateNullEntries() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
        try {
            ListState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor("my-state", Long.class));
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(partitionedState.get());
            this.expectedException.expect(NullPointerException.class);
            ArrayList arrayList = new ArrayList();
            arrayList.add(3L);
            arrayList.add(null);
            arrayList.add(5L);
            partitionedState.update(arrayList);
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testListStateUpdateNull() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
        try {
            ListState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor("my-state", Long.class));
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(partitionedState.get());
            this.expectedException.expect(NullPointerException.class);
            partitionedState.update((List) null);
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testListStateAPIs() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
        try {
            ListState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor("my-state", Long.class));
            createKeyedBackend.setCurrentKey("def");
            Assert.assertNull(partitionedState.get());
            partitionedState.add(17L);
            partitionedState.add(11L);
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{17L, 11L}));
            partitionedState.update(Collections.emptyList());
            Assert.assertNull(partitionedState.get());
            partitionedState.update(Arrays.asList(10L, 16L));
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{16L, 10L}));
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{16L, 10L}));
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("g");
            Assert.assertNull(partitionedState.get());
            Assert.assertNull(partitionedState.get());
            partitionedState.addAll(Collections.emptyList());
            Assert.assertNull(partitionedState.get());
            partitionedState.addAll(Arrays.asList(3L, 4L));
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{3L, 4L}));
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{3L, 4L}));
            partitionedState.addAll(new ArrayList());
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{3L, 4L}));
            partitionedState.addAll(Arrays.asList(5L, 6L));
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{3L, 4L, 5L, 6L}));
            partitionedState.addAll(new ArrayList());
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{3L, 4L, 5L, 6L}));
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{3L, 4L, 5L, 6L}));
            partitionedState.update(Arrays.asList(1L, 2L));
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{1L, 2L}));
            createKeyedBackend.setCurrentKey("def");
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{10L, 16L}));
            partitionedState.clear();
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("g");
            partitionedState.add(3L);
            partitionedState.add(2L);
            partitionedState.add(1L);
            createKeyedBackend.setCurrentKey("def");
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("g");
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{1L, 2L, 3L, 2L, 1L}));
            partitionedState.update(Arrays.asList(5L, 6L));
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{5L, 6L}));
            partitionedState.clear();
            Assert.assertThat("State backend is not empty.", Integer.valueOf(createKeyedBackend.numKeyValueStateEntries()), Is.is(0));
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testListStateMerging() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
        try {
            InternalListState partitionedState = createKeyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, new ListStateDescriptor("my-state", Long.class));
            createKeyedBackend.setCurrentKey("abc");
            partitionedState.setCurrentNamespace(1);
            partitionedState.add(33L);
            partitionedState.add(55L);
            partitionedState.setCurrentNamespace(2);
            partitionedState.add(22L);
            partitionedState.add(11L);
            partitionedState.setCurrentNamespace(3);
            partitionedState.add(44L);
            createKeyedBackend.setCurrentKey("def");
            partitionedState.setCurrentNamespace(1);
            partitionedState.add(11L);
            partitionedState.add(44L);
            partitionedState.setCurrentNamespace(3);
            partitionedState.add(22L);
            partitionedState.add(55L);
            partitionedState.add(33L);
            createKeyedBackend.setCurrentKey("jkl");
            partitionedState.setCurrentNamespace(1);
            partitionedState.add(11L);
            partitionedState.add(22L);
            partitionedState.add(33L);
            partitionedState.add(44L);
            partitionedState.add(55L);
            createKeyedBackend.setCurrentKey("mno");
            partitionedState.setCurrentNamespace(3);
            partitionedState.add(11L);
            partitionedState.add(22L);
            partitionedState.add(33L);
            partitionedState.add(44L);
            partitionedState.add(55L);
            createKeyedBackend.setCurrentKey("abc");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{11L, 22L, 33L, 44L, 55L}));
            createKeyedBackend.setCurrentKey("def");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{11L, 22L, 33L, 44L, 55L}));
            createKeyedBackend.setCurrentKey("ghi");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("jkl");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{11L, 22L, 33L, 44L, 55L}));
            createKeyedBackend.setCurrentKey("mno");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new Long[]{11L, 22L, 33L, 44L, 55L}));
            createKeyedBackend.setCurrentKey("abc");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            createKeyedBackend.setCurrentKey("def");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            createKeyedBackend.setCurrentKey("ghi");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            createKeyedBackend.setCurrentKey("jkl");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            createKeyedBackend.setCurrentKey("mno");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            Assert.assertThat("State backend is not empty.", Integer.valueOf(createKeyedBackend.numKeyValueStateEntries()), Is.is(0));
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testReducingState() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("id", new AppendingReduce(), String.class);
        IntSerializer intSerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer voidNamespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        InternalKvState internalKvState = (ReducingState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, reducingStateDescriptor);
        InternalKvState internalKvState2 = internalKvState;
        TypeSerializer serializer = reducingStateDescriptor.getSerializer();
        createKeyedBackend.setCurrentKey(1);
        Assert.assertNull(internalKvState.get());
        Assert.assertNull(getSerializedValue(internalKvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        internalKvState.add("1");
        createKeyedBackend.setCurrentKey(2);
        Assert.assertNull(internalKvState.get());
        Assert.assertNull(getSerializedValue(internalKvState2, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        internalKvState.add("2");
        createKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("1", internalKvState.get());
        Assert.assertEquals("1", getSerializedValue(internalKvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        createKeyedBackend.setCurrentKey(1);
        internalKvState.add("u1");
        createKeyedBackend.setCurrentKey(2);
        internalKvState.add("u2");
        createKeyedBackend.setCurrentKey(3);
        internalKvState.add("u3");
        KeyedStateHandle runSnapshot2 = runSnapshot(createKeyedBackend.snapshot(682375462379L, 4L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        createKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("1,u1", internalKvState.get());
        Assert.assertEquals("1,u1", getSerializedValue(internalKvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        createKeyedBackend.setCurrentKey(2);
        Assert.assertEquals("2,u2", internalKvState.get());
        Assert.assertEquals("2,u2", getSerializedValue(internalKvState2, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        createKeyedBackend.setCurrentKey(3);
        Assert.assertEquals("u3", internalKvState.get());
        Assert.assertEquals("u3", getSerializedValue(internalKvState2, 3, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        createKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
        runSnapshot.discardState();
        InternalKvState internalKvState3 = (ReducingState) restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, reducingStateDescriptor);
        InternalKvState internalKvState4 = internalKvState3;
        restoreKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("1", internalKvState3.get());
        Assert.assertEquals("1", getSerializedValue(internalKvState4, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend.setCurrentKey(2);
        Assert.assertEquals("2", internalKvState3.get());
        Assert.assertEquals("2", getSerializedValue(internalKvState4, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend2 = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot2);
        runSnapshot2.discardState();
        InternalKvState internalKvState5 = (ReducingState) restoreKeyedBackend2.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, reducingStateDescriptor);
        InternalKvState internalKvState6 = internalKvState5;
        restoreKeyedBackend2.setCurrentKey(1);
        Assert.assertEquals("1,u1", internalKvState5.get());
        Assert.assertEquals("1,u1", getSerializedValue(internalKvState6, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend2.setCurrentKey(2);
        Assert.assertEquals("2,u2", internalKvState5.get());
        Assert.assertEquals("2,u2", getSerializedValue(internalKvState6, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend2.setCurrentKey(3);
        Assert.assertEquals("u3", internalKvState5.get());
        Assert.assertEquals("u3", getSerializedValue(internalKvState6, 3, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend2.dispose();
    }

    @Test
    public void testReducingStateAddAndGet() throws Exception {
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("my-state", (l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        }, Long.class);
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
        try {
            ReducingState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, reducingStateDescriptor);
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("def");
            Assert.assertNull(partitionedState.get());
            partitionedState.add(17L);
            partitionedState.add(11L);
            Assert.assertEquals(28L, ((Long) partitionedState.get()).longValue());
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("g");
            Assert.assertNull(partitionedState.get());
            partitionedState.add(1L);
            partitionedState.add(2L);
            createKeyedBackend.setCurrentKey("def");
            Assert.assertEquals(28L, ((Long) partitionedState.get()).longValue());
            partitionedState.clear();
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("g");
            partitionedState.add(3L);
            partitionedState.add(2L);
            partitionedState.add(1L);
            createKeyedBackend.setCurrentKey("def");
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("g");
            Assert.assertEquals(9L, ((Long) partitionedState.get()).longValue());
            partitionedState.clear();
            Assert.assertThat("State backend is not empty.", Integer.valueOf(createKeyedBackend.numKeyValueStateEntries()), Is.is(0));
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testReducingStateMerging() throws Exception {
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("my-state", (l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        }, Long.class);
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
        try {
            InternalReducingState partitionedState = createKeyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, reducingStateDescriptor);
            createKeyedBackend.setCurrentKey("abc");
            partitionedState.setCurrentNamespace(1);
            partitionedState.add(33L);
            partitionedState.add(55L);
            partitionedState.setCurrentNamespace(2);
            partitionedState.add(22L);
            partitionedState.add(11L);
            partitionedState.setCurrentNamespace(3);
            partitionedState.add(44L);
            createKeyedBackend.setCurrentKey("def");
            partitionedState.setCurrentNamespace(1);
            partitionedState.add(11L);
            partitionedState.add(44L);
            partitionedState.setCurrentNamespace(3);
            partitionedState.add(22L);
            partitionedState.add(55L);
            partitionedState.add(33L);
            createKeyedBackend.setCurrentKey("jkl");
            partitionedState.setCurrentNamespace(1);
            partitionedState.add(11L);
            partitionedState.add(22L);
            partitionedState.add(33L);
            partitionedState.add(44L);
            partitionedState.add(55L);
            createKeyedBackend.setCurrentKey("mno");
            partitionedState.setCurrentNamespace(3);
            partitionedState.add(11L);
            partitionedState.add(22L);
            partitionedState.add(33L);
            partitionedState.add(44L);
            partitionedState.add(55L);
            createKeyedBackend.setCurrentKey("abc");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertEquals(165L, partitionedState.get());
            createKeyedBackend.setCurrentKey("def");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertEquals(165L, partitionedState.get());
            createKeyedBackend.setCurrentKey("ghi");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("jkl");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertEquals(165L, partitionedState.get());
            createKeyedBackend.setCurrentKey("mno");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertEquals(165L, partitionedState.get());
            createKeyedBackend.setCurrentKey("abc");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            createKeyedBackend.setCurrentKey("def");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            createKeyedBackend.setCurrentKey("ghi");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            createKeyedBackend.setCurrentKey("jkl");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            createKeyedBackend.setCurrentKey("mno");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            Assert.assertThat("State backend is not empty.", Integer.valueOf(createKeyedBackend.numKeyValueStateEntries()), Is.is(0));
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testAggregatingStateAddAndGetWithMutableAccumulator() throws Exception {
        AggregatingStateDescriptor aggregatingStateDescriptor = new AggregatingStateDescriptor("my-state", new MutableAggregatingAddingFunction(), MutableLong.class);
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
        try {
            AggregatingState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, aggregatingStateDescriptor);
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("def");
            Assert.assertNull(partitionedState.get());
            partitionedState.add(17L);
            partitionedState.add(11L);
            Assert.assertEquals(28L, ((Long) partitionedState.get()).longValue());
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("g");
            Assert.assertNull(partitionedState.get());
            partitionedState.add(1L);
            partitionedState.add(2L);
            createKeyedBackend.setCurrentKey("def");
            Assert.assertEquals(28L, ((Long) partitionedState.get()).longValue());
            partitionedState.clear();
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("g");
            partitionedState.add(3L);
            partitionedState.add(2L);
            partitionedState.add(1L);
            createKeyedBackend.setCurrentKey("def");
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("g");
            Assert.assertEquals(9L, ((Long) partitionedState.get()).longValue());
            partitionedState.clear();
            Assert.assertThat("State backend is not empty.", Integer.valueOf(createKeyedBackend.numKeyValueStateEntries()), Is.is(0));
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testAggregatingStateMergingWithMutableAccumulator() throws Exception {
        AggregatingStateDescriptor aggregatingStateDescriptor = new AggregatingStateDescriptor("my-state", new MutableAggregatingAddingFunction(), MutableLong.class);
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
        try {
            InternalAggregatingState partitionedState = createKeyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, aggregatingStateDescriptor);
            createKeyedBackend.setCurrentKey("abc");
            partitionedState.setCurrentNamespace(1);
            partitionedState.add(33L);
            partitionedState.add(55L);
            partitionedState.setCurrentNamespace(2);
            partitionedState.add(22L);
            partitionedState.add(11L);
            partitionedState.setCurrentNamespace(3);
            partitionedState.add(44L);
            createKeyedBackend.setCurrentKey("def");
            partitionedState.setCurrentNamespace(1);
            partitionedState.add(11L);
            partitionedState.add(44L);
            partitionedState.setCurrentNamespace(3);
            partitionedState.add(22L);
            partitionedState.add(55L);
            partitionedState.add(33L);
            createKeyedBackend.setCurrentKey("jkl");
            partitionedState.setCurrentNamespace(1);
            partitionedState.add(11L);
            partitionedState.add(22L);
            partitionedState.add(33L);
            partitionedState.add(44L);
            partitionedState.add(55L);
            createKeyedBackend.setCurrentKey("mno");
            partitionedState.setCurrentNamespace(3);
            partitionedState.add(11L);
            partitionedState.add(22L);
            partitionedState.add(33L);
            partitionedState.add(44L);
            partitionedState.add(55L);
            createKeyedBackend.setCurrentKey("abc");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertEquals(165L, partitionedState.get());
            createKeyedBackend.setCurrentKey("def");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertEquals(165L, partitionedState.get());
            createKeyedBackend.setCurrentKey("ghi");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("jkl");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertEquals(165L, partitionedState.get());
            createKeyedBackend.setCurrentKey("mno");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertEquals(165L, partitionedState.get());
            createKeyedBackend.setCurrentKey("abc");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            createKeyedBackend.setCurrentKey("def");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            createKeyedBackend.setCurrentKey("ghi");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            createKeyedBackend.setCurrentKey("jkl");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            createKeyedBackend.setCurrentKey("mno");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            Assert.assertThat("State backend is not empty.", Integer.valueOf(createKeyedBackend.numKeyValueStateEntries()), Is.is(0));
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testAggregatingStateAddAndGetWithImmutableAccumulator() throws Exception {
        AggregatingStateDescriptor aggregatingStateDescriptor = new AggregatingStateDescriptor("my-state", new ImmutableAggregatingAddingFunction(), Long.class);
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
        try {
            AggregatingState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, aggregatingStateDescriptor);
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("def");
            Assert.assertNull(partitionedState.get());
            partitionedState.add(17L);
            partitionedState.add(11L);
            Assert.assertEquals(28L, ((Long) partitionedState.get()).longValue());
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("g");
            Assert.assertNull(partitionedState.get());
            partitionedState.add(1L);
            partitionedState.add(2L);
            createKeyedBackend.setCurrentKey("def");
            Assert.assertEquals(28L, ((Long) partitionedState.get()).longValue());
            partitionedState.clear();
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("g");
            partitionedState.add(3L);
            partitionedState.add(2L);
            partitionedState.add(1L);
            createKeyedBackend.setCurrentKey("def");
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("g");
            Assert.assertEquals(9L, ((Long) partitionedState.get()).longValue());
            partitionedState.clear();
            Assert.assertThat("State backend is not empty.", Integer.valueOf(createKeyedBackend.numKeyValueStateEntries()), Is.is(0));
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testAggregatingStateMergingWithImmutableAccumulator() throws Exception {
        AggregatingStateDescriptor aggregatingStateDescriptor = new AggregatingStateDescriptor("my-state", new ImmutableAggregatingAddingFunction(), Long.class);
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
        try {
            InternalAggregatingState partitionedState = createKeyedBackend.getPartitionedState(0, IntSerializer.INSTANCE, aggregatingStateDescriptor);
            createKeyedBackend.setCurrentKey("abc");
            partitionedState.setCurrentNamespace(1);
            partitionedState.add(33L);
            partitionedState.add(55L);
            partitionedState.setCurrentNamespace(2);
            partitionedState.add(22L);
            partitionedState.add(11L);
            partitionedState.setCurrentNamespace(3);
            partitionedState.add(44L);
            createKeyedBackend.setCurrentKey("def");
            partitionedState.setCurrentNamespace(1);
            partitionedState.add(11L);
            partitionedState.add(44L);
            partitionedState.setCurrentNamespace(3);
            partitionedState.add(22L);
            partitionedState.add(55L);
            partitionedState.add(33L);
            createKeyedBackend.setCurrentKey("jkl");
            partitionedState.setCurrentNamespace(1);
            partitionedState.add(11L);
            partitionedState.add(22L);
            partitionedState.add(33L);
            partitionedState.add(44L);
            partitionedState.add(55L);
            createKeyedBackend.setCurrentKey("mno");
            partitionedState.setCurrentNamespace(3);
            partitionedState.add(11L);
            partitionedState.add(22L);
            partitionedState.add(33L);
            partitionedState.add(44L);
            partitionedState.add(55L);
            createKeyedBackend.setCurrentKey("abc");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertEquals(165L, partitionedState.get());
            createKeyedBackend.setCurrentKey("def");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertEquals(165L, partitionedState.get());
            createKeyedBackend.setCurrentKey("ghi");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertNull(partitionedState.get());
            createKeyedBackend.setCurrentKey("jkl");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertEquals(165L, partitionedState.get());
            createKeyedBackend.setCurrentKey("mno");
            partitionedState.mergeNamespaces(1, Arrays.asList(2, 3));
            partitionedState.setCurrentNamespace(1);
            Assert.assertEquals(165L, partitionedState.get());
            createKeyedBackend.setCurrentKey("abc");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            createKeyedBackend.setCurrentKey("def");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            createKeyedBackend.setCurrentKey("ghi");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            createKeyedBackend.setCurrentKey("jkl");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            createKeyedBackend.setCurrentKey("mno");
            partitionedState.setCurrentNamespace(1);
            partitionedState.clear();
            Assert.assertThat("State backend is not empty.", Integer.valueOf(createKeyedBackend.numKeyValueStateEntries()), Is.is(0));
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testFoldingState() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        FoldingStateDescriptor foldingStateDescriptor = new FoldingStateDescriptor("id", "Fold-Initial:", new AppendingFold(), String.class);
        IntSerializer intSerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer voidNamespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        InternalKvState internalKvState = (FoldingState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, foldingStateDescriptor);
        InternalKvState internalKvState2 = internalKvState;
        TypeSerializer serializer = foldingStateDescriptor.getSerializer();
        createKeyedBackend.setCurrentKey(1);
        Assert.assertNull(internalKvState.get());
        Assert.assertNull(getSerializedValue(internalKvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        internalKvState.add(1);
        createKeyedBackend.setCurrentKey(2);
        Assert.assertNull(internalKvState.get());
        Assert.assertNull(getSerializedValue(internalKvState2, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        internalKvState.add(2);
        createKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("Fold-Initial:,1", internalKvState.get());
        Assert.assertEquals("Fold-Initial:,1", getSerializedValue(internalKvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        createKeyedBackend.setCurrentKey(1);
        internalKvState.clear();
        internalKvState.add(101);
        createKeyedBackend.setCurrentKey(2);
        internalKvState.add(102);
        createKeyedBackend.setCurrentKey(3);
        internalKvState.add(103);
        KeyedStateHandle runSnapshot2 = runSnapshot(createKeyedBackend.snapshot(682375462379L, 4L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        createKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("Fold-Initial:,101", internalKvState.get());
        Assert.assertEquals("Fold-Initial:,101", getSerializedValue(internalKvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        createKeyedBackend.setCurrentKey(2);
        Assert.assertEquals("Fold-Initial:,2,102", internalKvState.get());
        Assert.assertEquals("Fold-Initial:,2,102", getSerializedValue(internalKvState2, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        createKeyedBackend.setCurrentKey(3);
        Assert.assertEquals("Fold-Initial:,103", internalKvState.get());
        Assert.assertEquals("Fold-Initial:,103", getSerializedValue(internalKvState2, 3, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        createKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
        runSnapshot.discardState();
        InternalKvState internalKvState3 = (FoldingState) restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, foldingStateDescriptor);
        InternalKvState internalKvState4 = internalKvState3;
        restoreKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("Fold-Initial:,1", internalKvState3.get());
        Assert.assertEquals("Fold-Initial:,1", getSerializedValue(internalKvState4, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend.setCurrentKey(2);
        Assert.assertEquals("Fold-Initial:,2", internalKvState3.get());
        Assert.assertEquals("Fold-Initial:,2", getSerializedValue(internalKvState4, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend2 = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot2);
        runSnapshot.discardState();
        InternalKvState internalKvState5 = (FoldingState) restoreKeyedBackend2.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, foldingStateDescriptor);
        InternalKvState internalKvState6 = internalKvState5;
        restoreKeyedBackend2.setCurrentKey(1);
        Assert.assertEquals("Fold-Initial:,101", internalKvState5.get());
        Assert.assertEquals("Fold-Initial:,101", getSerializedValue(internalKvState6, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend2.setCurrentKey(2);
        Assert.assertEquals("Fold-Initial:,2,102", internalKvState5.get());
        Assert.assertEquals("Fold-Initial:,2,102", getSerializedValue(internalKvState6, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend2.setCurrentKey(3);
        Assert.assertEquals("Fold-Initial:,103", internalKvState5.get());
        Assert.assertEquals("Fold-Initial:,103", getSerializedValue(internalKvState6, 3, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend2.dispose();
    }

    @Test
    public void testMapState() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("id", Integer.class, String.class);
        StringSerializer stringSerializer = StringSerializer.INSTANCE;
        VoidNamespaceSerializer voidNamespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        InternalKvState internalKvState = (MapState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mapStateDescriptor);
        InternalKvState internalKvState2 = internalKvState;
        TypeSerializer keySerializer = mapStateDescriptor.getKeySerializer();
        TypeSerializer valueSerializer = mapStateDescriptor.getValueSerializer();
        createKeyedBackend.setCurrentKey("1");
        Assert.assertNull(internalKvState.get(1));
        Assert.assertNull(getSerializedMap(internalKvState2, "1", stringSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, keySerializer, valueSerializer));
        internalKvState.put(1, "1");
        createKeyedBackend.setCurrentKey("2");
        Assert.assertNull(internalKvState.get(2));
        Assert.assertNull(getSerializedMap(internalKvState2, "2", stringSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, keySerializer, valueSerializer));
        internalKvState.put(2, "2");
        createKeyedBackend.setCurrentKey("11");
        internalKvState.put(11, "11");
        createKeyedBackend.setCurrentKey("1");
        Assert.assertTrue(internalKvState.contains(1));
        Assert.assertEquals("1", internalKvState.get(1));
        Assert.assertEquals(new HashMap<Integer, String>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.4
            {
                put(1, "1");
            }
        }, getSerializedMap(internalKvState2, "1", stringSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, keySerializer, valueSerializer));
        Assert.assertEquals(new HashMap<Integer, String>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.5
            {
                put(11, "11");
            }
        }, getSerializedMap(internalKvState2, "11", stringSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, keySerializer, valueSerializer));
        KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        createKeyedBackend.setCurrentKey("1");
        internalKvState.put(1, "101");
        createKeyedBackend.setCurrentKey("2");
        internalKvState.put(102, "102");
        createKeyedBackend.setCurrentKey("3");
        internalKvState.put(103, "103");
        internalKvState.putAll(new HashMap<Integer, String>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.6
            {
                put(1031, "1031");
                put(1032, "1032");
            }
        });
        KeyedStateHandle runSnapshot2 = runSnapshot(createKeyedBackend.snapshot(682375462379L, 4L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        createKeyedBackend.setCurrentKey("1");
        Assert.assertEquals("101", internalKvState.get(1));
        Assert.assertEquals(new HashMap<Integer, String>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.7
            {
                put(1, "101");
            }
        }, getSerializedMap(internalKvState2, "1", stringSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, keySerializer, valueSerializer));
        createKeyedBackend.setCurrentKey("2");
        Assert.assertEquals("102", internalKvState.get(102));
        Assert.assertEquals(new HashMap<Integer, String>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.8
            {
                put(2, "2");
                put(102, "102");
            }
        }, getSerializedMap(internalKvState2, "2", stringSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, keySerializer, valueSerializer));
        createKeyedBackend.setCurrentKey("3");
        Assert.assertTrue(internalKvState.contains(103));
        Assert.assertEquals("103", internalKvState.get(103));
        Assert.assertEquals(new HashMap<Integer, String>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.9
            {
                put(103, "103");
                put(1031, "1031");
                put(1032, "1032");
            }
        }, getSerializedMap(internalKvState2, "3", stringSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, keySerializer, valueSerializer));
        ArrayList arrayList = new ArrayList();
        Iterator it = internalKvState.keys().iterator();
        while (it.hasNext()) {
            arrayList.add((Integer) it.next());
        }
        List asList = Arrays.asList(103, 1031, 1032);
        Assert.assertEquals(arrayList.size(), asList.size());
        arrayList.removeAll(asList);
        ArrayList arrayList2 = new ArrayList();
        Iterator it2 = internalKvState.values().iterator();
        while (it2.hasNext()) {
            arrayList2.add((String) it2.next());
        }
        List asList2 = Arrays.asList("103", "1031", "1032");
        Assert.assertEquals(arrayList2.size(), asList2.size());
        arrayList2.removeAll(asList2);
        createKeyedBackend.setCurrentKey("1");
        internalKvState.clear();
        createKeyedBackend.setCurrentKey("2");
        internalKvState.remove(102);
        createKeyedBackend.setCurrentKey("3");
        Iterator it3 = internalKvState.iterator();
        while (it3.hasNext()) {
            Map.Entry entry = (Map.Entry) it3.next();
            if (((String) entry.getValue()).length() != 4) {
                it3.remove();
            } else {
                entry.setValue(((String) entry.getValue()) + "_updated");
            }
        }
        createKeyedBackend.setCurrentKey("1");
        createKeyedBackend.setCurrentKey("2");
        Assert.assertFalse(internalKvState.contains(102));
        createKeyedBackend.setCurrentKey("3");
        for (Map.Entry entry2 : internalKvState.entries()) {
            Assert.assertEquals(4 + "_updated".length(), ((String) entry2.getValue()).length());
            Assert.assertTrue(((String) entry2.getValue()).endsWith("_updated"));
        }
        createKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(StringSerializer.INSTANCE, runSnapshot);
        runSnapshot.discardState();
        InternalKvState internalKvState3 = (MapState) restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mapStateDescriptor);
        InternalKvState internalKvState4 = internalKvState3;
        restoreKeyedBackend.setCurrentKey("1");
        Assert.assertEquals("1", internalKvState3.get(1));
        Assert.assertEquals(new HashMap<Integer, String>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.10
            {
                put(1, "1");
            }
        }, getSerializedMap(internalKvState4, "1", stringSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, keySerializer, valueSerializer));
        restoreKeyedBackend.setCurrentKey("2");
        Assert.assertEquals("2", internalKvState3.get(2));
        Assert.assertEquals(new HashMap<Integer, String>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.11
            {
                put(2, "2");
            }
        }, getSerializedMap(internalKvState4, "2", stringSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, keySerializer, valueSerializer));
        restoreKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend2 = restoreKeyedBackend(StringSerializer.INSTANCE, runSnapshot2);
        runSnapshot2.discardState();
        InternalKvState internalKvState5 = (MapState) restoreKeyedBackend2.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mapStateDescriptor);
        InternalKvState internalKvState6 = internalKvState5;
        restoreKeyedBackend2.setCurrentKey("1");
        Assert.assertEquals("101", internalKvState5.get(1));
        Assert.assertEquals(new HashMap<Integer, String>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.12
            {
                put(1, "101");
            }
        }, getSerializedMap(internalKvState6, "1", stringSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, keySerializer, valueSerializer));
        restoreKeyedBackend2.setCurrentKey("2");
        Assert.assertEquals("102", internalKvState5.get(102));
        Assert.assertEquals(new HashMap<Integer, String>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.13
            {
                put(2, "2");
                put(102, "102");
            }
        }, getSerializedMap(internalKvState6, "2", stringSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, keySerializer, valueSerializer));
        restoreKeyedBackend2.setCurrentKey("3");
        Assert.assertEquals("103", internalKvState5.get(103));
        Assert.assertEquals(new HashMap<Integer, String>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.14
            {
                put(103, "103");
                put(1031, "1031");
                put(1032, "1032");
            }
        }, getSerializedMap(internalKvState6, "3", stringSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, keySerializer, valueSerializer));
        restoreKeyedBackend2.dispose();
    }

    @Test
    public void testMapStateIsEmpty() throws Exception {
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("id", Integer.class, Long.class);
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            MapState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mapStateDescriptor);
            createKeyedBackend.setCurrentKey(1);
            Assert.assertTrue(partitionedState.isEmpty());
            for (int i = 0; i < 1024; i++) {
                partitionedState.put(Integer.valueOf(i), Long.valueOf(i * 2));
                Assert.assertFalse(partitionedState.isEmpty());
            }
            for (int i2 = 0; i2 < 1024; i2++) {
                Assert.assertFalse(partitionedState.isEmpty());
                partitionedState.remove(Integer.valueOf(i2));
            }
            Assert.assertTrue(partitionedState.isEmpty());
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testMapStateIteratorArbitraryAccess() throws Exception {
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("id", Integer.class, Long.class);
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            MapState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mapStateDescriptor);
            createKeyedBackend.setCurrentKey(1);
            for (int i = 0; i < 4096; i++) {
                partitionedState.put(Integer.valueOf(i), Long.valueOf(i * 2));
            }
            Iterator it = partitionedState.iterator();
            int i2 = 0;
            while (it.hasNext()) {
                Assert.assertEquals(i2, ((Integer) ((Map.Entry) it.next()).getKey()).intValue());
                switch (ThreadLocalRandom.current().nextInt() % 3) {
                    case ExternalSortLargeRecordsITCase.SmallOrMediumOrLargeValue.SMALL_SIZE /* 0 */:
                        it.remove();
                        try {
                            it.remove();
                            Assert.fail();
                            break;
                        } catch (IllegalStateException e) {
                            break;
                        }
                    case 1:
                        it.hasNext();
                        it.remove();
                        break;
                }
                i2++;
            }
            Assert.assertEquals(4096, i2);
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testValueStateNullAsDefaultValue() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", String.class, (Object) null));
        createKeyedBackend.setCurrentKey(1);
        Assert.assertNull(partitionedState.value());
        partitionedState.update("Ciao");
        Assert.assertEquals("Ciao", partitionedState.value());
        partitionedState.clear();
        Assert.assertNull(partitionedState.value());
        createKeyedBackend.dispose();
    }

    @Test
    public void testValueStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", String.class, "Hello"));
        createKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("Hello", partitionedState.value());
        partitionedState.update("Ciao");
        Assert.assertEquals("Ciao", partitionedState.value());
        partitionedState.clear();
        Assert.assertEquals("Hello", partitionedState.value());
        createKeyedBackend.dispose();
    }

    @Test
    public void testReducingStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ReducingState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ReducingStateDescriptor("id", new AppendingReduce(), String.class));
        createKeyedBackend.setCurrentKey(1);
        Assert.assertNull(partitionedState.get());
        partitionedState.add("Ciao");
        Assert.assertEquals("Ciao", partitionedState.get());
        partitionedState.clear();
        Assert.assertNull(partitionedState.get());
        createKeyedBackend.dispose();
    }

    @Test
    public void testFoldingStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        FoldingState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new FoldingStateDescriptor("id", "Fold-Initial:", new AppendingFold(), String.class));
        createKeyedBackend.setCurrentKey(1);
        Assert.assertNull(partitionedState.get());
        partitionedState.add(1);
        partitionedState.add(2);
        Assert.assertEquals("Fold-Initial:,1,2", partitionedState.get());
        partitionedState.clear();
        Assert.assertNull(partitionedState.get());
        createKeyedBackend.dispose();
    }

    @Test
    public void testListStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ListState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor("id", String.class));
        createKeyedBackend.setCurrentKey(1);
        Assert.assertNull(partitionedState.get());
        partitionedState.update(Arrays.asList("Ciao", "Bello"));
        Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new String[]{"Ciao", "Bello"}));
        partitionedState.clear();
        Assert.assertNull(partitionedState.get());
        createKeyedBackend.dispose();
    }

    @Test
    public void testMapStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        MapState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new MapStateDescriptor("id", String.class, String.class));
        createKeyedBackend.setCurrentKey(1);
        Assert.assertNull(partitionedState.entries());
        partitionedState.put("Ciao", "Hello");
        partitionedState.put("Bello", "Nice");
        Assert.assertNotNull(partitionedState.entries());
        Assert.assertEquals(partitionedState.get("Ciao"), "Hello");
        Assert.assertEquals(partitionedState.get("Bello"), "Nice");
        partitionedState.clear();
        Assert.assertNull(partitionedState.entries());
        createKeyedBackend.dispose();
    }

    @Test
    public void testSnapshotNonAccessedState() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend createKeyedBackend = createKeyedBackend(StringSerializer.INSTANCE);
        try {
            MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test-name", Integer.class, String.class);
            MapState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mapStateDescriptor);
            createKeyedBackend.setCurrentKey("1");
            partitionedState.put(11, "foo");
            createKeyedBackend.setCurrentKey("2");
            partitionedState.put(8, "bar");
            createKeyedBackend.setCurrentKey("3");
            partitionedState.put(91, "hello world");
            KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(1L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            createKeyedBackend.dispose();
            AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(StringSerializer.INSTANCE, runSnapshot);
            KeyedStateHandle runSnapshot2 = runSnapshot(restoreKeyedBackend.snapshot(2L, 3L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            restoreKeyedBackend.dispose();
            createKeyedBackend = restoreKeyedBackend(StringSerializer.INSTANCE, runSnapshot2);
            MapState partitionedState2 = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mapStateDescriptor);
            createKeyedBackend.setCurrentKey("1");
            Assert.assertEquals("foo", partitionedState2.get(11));
            createKeyedBackend.setCurrentKey("2");
            Assert.assertEquals("bar", partitionedState2.get(8));
            createKeyedBackend.setCurrentKey("3");
            Assert.assertEquals("hello world", partitionedState2.get(91));
            runSnapshot2.discardState();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testKeyGroupSnapshotRestoreScaleDown() throws Exception {
        testKeyGroupSnapshotRestore(4, 2, 128);
    }

    @Test
    public void testKeyGroupSnapshotRestoreScaleUp() throws Exception {
        testKeyGroupSnapshotRestore(2, 4, 128);
    }

    @Test
    public void testKeyGroupsSnapshotRestoreNoRescale() throws Exception {
        testKeyGroupSnapshotRestore(2, 2, 128);
    }

    @Test
    public void testKeyGroupsSnapshotRestoreScaleUpUnEvenDistribute() throws Exception {
        testKeyGroupSnapshotRestore(15, 77, 128);
    }

    @Test
    public void testKeyGroupsSnapshotRestoreScaleDownUnEvenDistribute() throws Exception {
        testKeyGroupSnapshotRestore(77, 15, 128);
    }

    private void testKeyGroupSnapshotRestore(int i, int i2, int i3) throws Exception {
        Preconditions.checkArgument(i > 0, "parallelism must be positive, current is %s.", new Object[]{Integer.valueOf(i)});
        Preconditions.checkArgument(i2 > 0, "parallelism must be positive, current is %s.", new Object[]{Integer.valueOf(i2)});
        Preconditions.checkArgument(i <= i3, "Maximum parallelism must not be smaller than parallelism.");
        Preconditions.checkArgument(i2 <= i3, "Maximum parallelism must not be smaller than parallelism.");
        Random random = new Random();
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i4 = 0; i4 < i; i4++) {
            arrayList.add(KeyGroupRange.of((i3 * i4) / i, ((i3 * (i4 + 1)) / i) - 1));
            arrayList2.add(createKeyedBackend(IntSerializer.INSTANCE, i3, (KeyGroupRange) arrayList.get(i4), this.env));
        }
        ArrayList arrayList3 = new ArrayList(i3);
        for (int i5 = 0; i5 < i3; i5++) {
            arrayList3.add(new ValueStateDescriptor("state" + i5, String.class));
        }
        ArrayList arrayList4 = new ArrayList(i3);
        ArrayList arrayList5 = new ArrayList(i3);
        for (int i6 = 0; i6 < i; i6++) {
            AbstractKeyedStateBackend abstractKeyedStateBackend = (AbstractKeyedStateBackend) arrayList2.get(i6);
            KeyGroupRange keyGroupRange = (KeyGroupRange) arrayList.get(i6);
            for (int startKeyGroup = keyGroupRange.getStartKeyGroup(); startKeyGroup <= keyGroupRange.getEndKeyGroup(); startKeyGroup++) {
                ValueState partitionedState = abstractKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, (StateDescriptor) arrayList3.get(startKeyGroup));
                int keyInKeyGroup = getKeyInKeyGroup(random, i3, KeyGroupRange.of(startKeyGroup, startKeyGroup));
                abstractKeyedStateBackend.setCurrentKey(Integer.valueOf(keyInKeyGroup));
                arrayList4.add(Integer.valueOf(keyInKeyGroup));
                String str = i6 + ":" + startKeyGroup;
                partitionedState.update(str);
                arrayList5.add(str);
            }
        }
        ArrayList arrayList6 = new ArrayList(i);
        for (int i7 = 0; i7 < i; i7++) {
            arrayList6.add(runSnapshot(((AbstractKeyedStateBackend) arrayList2.get(i7)).snapshot(0L, 0L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry));
        }
        for (int i8 = 0; i8 < i; i8++) {
            ((AbstractKeyedStateBackend) arrayList2.get(i8)).dispose();
        }
        ArrayList arrayList7 = new ArrayList();
        for (int i9 = 0; i9 < i2; i9++) {
            arrayList7.add(KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(i3, i2, i9));
        }
        ArrayList arrayList8 = new ArrayList(i2);
        for (int i10 = 0; i10 < i2; i10++) {
            ArrayList arrayList9 = new ArrayList();
            StateAssignmentOperation.extractIntersectingState(arrayList6, (KeyGroupRange) arrayList7.get(i10), arrayList9);
            arrayList8.add(arrayList9);
        }
        ArrayList arrayList10 = new ArrayList(i2);
        for (int i11 = 0; i11 < i2; i11++) {
            AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, i3, (KeyGroupRange) arrayList7.get(i11), (List) arrayList8.get(i11), this.env);
            arrayList10.add(restoreKeyedBackend);
            KeyGroupRange keyGroupRange2 = (KeyGroupRange) arrayList7.get(i11);
            for (int startKeyGroup2 = keyGroupRange2.getStartKeyGroup(); startKeyGroup2 <= keyGroupRange2.getEndKeyGroup(); startKeyGroup2++) {
                ValueState partitionedState2 = ((AbstractKeyedStateBackend) arrayList10.get(i11)).getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, (StateDescriptor) arrayList3.get(startKeyGroup2));
                restoreKeyedBackend.setCurrentKey(arrayList4.get(startKeyGroup2));
                Assert.assertEquals(arrayList5.get(startKeyGroup2), partitionedState2.value());
            }
        }
        for (int i12 = 0; i12 < i2; i12++) {
            ((AbstractKeyedStateBackend) arrayList10.get(i12)).dispose();
        }
    }

    @Test
    public void testRestoreWithWrongKeySerializer() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", String.class));
        createKeyedBackend.setCurrentKey(1);
        partitionedState.update("1");
        createKeyedBackend.setCurrentKey(2);
        partitionedState.update("2");
        KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        createKeyedBackend.dispose();
        try {
            restoreKeyedBackend(DoubleSerializer.INSTANCE, runSnapshot);
            Assert.fail("should recognize wrong key serializer");
        } catch (BackendBuildingException e) {
            Assert.assertTrue(e.getCause() instanceof StateMigrationException);
        } catch (StateMigrationException e2) {
        }
    }

    @Test
    public void testValueStateRestoreWithWrongSerializers() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", String.class));
            createKeyedBackend.setCurrentKey(1);
            partitionedState.update("1");
            createKeyedBackend.setCurrentKey(2);
            partitionedState.update("2");
            KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            createKeyedBackend.dispose();
            createKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
            runSnapshot.discardState();
            try {
                createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", FloatSerializer.INSTANCE)).value();
                Assert.fail("should recognize wrong serializers");
            } catch (StateMigrationException e) {
            }
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testListStateRestoreWithWrongSerializers() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            ListState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor("id", String.class));
            createKeyedBackend.setCurrentKey(1);
            partitionedState.add("1");
            createKeyedBackend.setCurrentKey(2);
            partitionedState.add("2");
            KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            createKeyedBackend.dispose();
            createKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
            runSnapshot.discardState();
            try {
                createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor("id", FloatSerializer.INSTANCE)).get();
                Assert.fail("should recognize wrong serializers");
            } catch (StateMigrationException e) {
            }
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testReducingStateRestoreWithWrongSerializers() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            ReducingState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ReducingStateDescriptor("id", new AppendingReduce(), StringSerializer.INSTANCE));
            createKeyedBackend.setCurrentKey(1);
            partitionedState.add("1");
            createKeyedBackend.setCurrentKey(2);
            partitionedState.add("2");
            KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            createKeyedBackend.dispose();
            createKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
            runSnapshot.discardState();
            try {
                createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ReducingStateDescriptor("id", new AppendingReduce(), FloatSerializer.INSTANCE)).get();
                Assert.fail("should recognize wrong serializers");
            } catch (StateMigrationException e) {
            }
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testMapStateRestoreWithWrongSerializers() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            MapState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new MapStateDescriptor("id", StringSerializer.INSTANCE, StringSerializer.INSTANCE));
            createKeyedBackend.setCurrentKey(1);
            partitionedState.put("1", "First");
            createKeyedBackend.setCurrentKey(2);
            partitionedState.put("2", "Second");
            KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            createKeyedBackend.dispose();
            createKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
            runSnapshot.discardState();
            try {
                createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new MapStateDescriptor("id", FloatSerializer.INSTANCE, StringSerializer.INSTANCE)).entries();
                Assert.fail("should recognize wrong serializers");
            } catch (StateMigrationException e) {
            }
            createKeyedBackend.dispose();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testCopyDefaultValue() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", IntValue.class, new IntValue(-1)));
        createKeyedBackend.setCurrentKey(1);
        IntValue intValue = (IntValue) partitionedState.value();
        createKeyedBackend.setCurrentKey(2);
        IntValue intValue2 = (IntValue) partitionedState.value();
        Assert.assertNotNull(intValue);
        Assert.assertNotNull(intValue2);
        Assert.assertEquals(intValue, intValue2);
        Assert.assertFalse(intValue == intValue2);
        createKeyedBackend.dispose();
    }

    @Test
    public void testRequireNonNullNamespace() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", IntValue.class, new IntValue(-1));
        try {
            createKeyedBackend.getPartitionedState((Object) null, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            Assert.fail("Did not throw expected NullPointerException");
        } catch (NullPointerException e) {
        }
        try {
            createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, (TypeSerializer) null, valueStateDescriptor);
            Assert.fail("Did not throw expected NullPointerException");
        } catch (NullPointerException e2) {
        }
        try {
            createKeyedBackend.getPartitionedState((Object) null, (TypeSerializer) null, valueStateDescriptor);
            Assert.fail("Did not throw expected NullPointerException");
        } catch (NullPointerException e3) {
        }
        createKeyedBackend.dispose();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testConcurrentMapIfQueryable() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new DummyEnvironment());
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("value-state", Integer.class, -1);
        valueStateDescriptor.setQueryable("my-query");
        AbstractHeapState abstractHeapState = (ValueState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        AbstractHeapState abstractHeapState2 = (InternalKvState) abstractHeapState;
        Assert.assertTrue(abstractHeapState2 instanceof AbstractHeapState);
        abstractHeapState2.setCurrentNamespace(VoidNamespace.INSTANCE);
        createKeyedBackend.setCurrentKey(1);
        abstractHeapState.update(121818273);
        checkConcurrentStateTable(abstractHeapState2.getStateTable(), 1);
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("list-state", Integer.class);
        listStateDescriptor.setQueryable("my-query");
        AbstractHeapState abstractHeapState3 = (ListState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
        AbstractHeapState abstractHeapState4 = (InternalKvState) abstractHeapState3;
        Assert.assertTrue(abstractHeapState4 instanceof AbstractHeapState);
        abstractHeapState4.setCurrentNamespace(VoidNamespace.INSTANCE);
        createKeyedBackend.setCurrentKey(1);
        abstractHeapState3.add(121818273);
        checkConcurrentStateTable(abstractHeapState4.getStateTable(), 1);
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("reducing-state", new ReduceFunction<Integer>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.15
            public Integer reduce(Integer num, Integer num2) throws Exception {
                return Integer.valueOf(num.intValue() + num2.intValue());
            }
        }, Integer.class);
        reducingStateDescriptor.setQueryable("my-query");
        AbstractHeapState abstractHeapState5 = (ReducingState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, reducingStateDescriptor);
        AbstractHeapState abstractHeapState6 = (InternalKvState) abstractHeapState5;
        Assert.assertTrue(abstractHeapState6 instanceof AbstractHeapState);
        abstractHeapState6.setCurrentNamespace(VoidNamespace.INSTANCE);
        createKeyedBackend.setCurrentKey(1);
        abstractHeapState5.add(121818273);
        checkConcurrentStateTable(abstractHeapState6.getStateTable(), 1);
        FoldingStateDescriptor foldingStateDescriptor = new FoldingStateDescriptor("folding-state", 0, new FoldFunction<Integer, Integer>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.16
            public Integer fold(Integer num, Integer num2) throws Exception {
                return Integer.valueOf(num.intValue() + num2.intValue());
            }
        }, Integer.class);
        foldingStateDescriptor.setQueryable("my-query");
        AbstractHeapState abstractHeapState7 = (FoldingState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, foldingStateDescriptor);
        AbstractHeapState abstractHeapState8 = (InternalKvState) abstractHeapState7;
        Assert.assertTrue(abstractHeapState8 instanceof AbstractHeapState);
        abstractHeapState8.setCurrentNamespace(VoidNamespace.INSTANCE);
        createKeyedBackend.setCurrentKey(1);
        abstractHeapState7.add(121818273);
        checkConcurrentStateTable(abstractHeapState8.getStateTable(), 1);
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("map-state", Integer.class, String.class);
        mapStateDescriptor.setQueryable("my-query");
        AbstractHeapState abstractHeapState9 = (MapState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mapStateDescriptor);
        AbstractHeapState abstractHeapState10 = (InternalKvState) abstractHeapState9;
        Assert.assertTrue(abstractHeapState10 instanceof AbstractHeapState);
        abstractHeapState10.setCurrentNamespace(VoidNamespace.INSTANCE);
        createKeyedBackend.setCurrentKey(1);
        abstractHeapState9.put(121818273, "121818273");
        int assignToKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(1, 1);
        StateTable<?, ?, ?> stateTable = abstractHeapState10.getStateTable();
        Assert.assertNotNull("State not set", stateTable.get(Integer.valueOf(assignToKeyGroup)));
        checkConcurrentStateTable(stateTable, 1);
        createKeyedBackend.dispose();
    }

    private void checkConcurrentStateTable(StateTable<?, ?, ?> stateTable, int i) {
        Assert.assertNotNull("State not set", stateTable);
        if (stateTable instanceof NestedMapsStateTable) {
            int assignToKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(1, i);
            NestedStateMap[] state = ((NestedMapsStateTable) stateTable).getState();
            Assert.assertTrue(state[assignToKeyGroup].getNamespaceMap() instanceof ConcurrentHashMap);
            Assert.assertTrue(state[assignToKeyGroup].getNamespaceMap().get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
        }
    }

    @Test
    public void testQueryableStateRegistration() throws Exception {
        KvStateRegistry kvStateRegistry = this.env.getKvStateRegistry();
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, this.env);
        KeyGroupRange keyGroupRange = createKeyedBackend.getKeyGroupRange();
        KvStateRegistryListener kvStateRegistryListener = (KvStateRegistryListener) Mockito.mock(KvStateRegistryListener.class);
        kvStateRegistry.registerListener(HighAvailabilityServices.DEFAULT_JOB_ID, kvStateRegistryListener);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("test", IntSerializer.INSTANCE);
        valueStateDescriptor.setQueryable("banana");
        createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        ((KvStateRegistryListener) Mockito.verify(kvStateRegistryListener, Mockito.times(1))).notifyKvStateRegistered((JobID) org.mockito.Matchers.eq(this.env.getJobID()), (JobVertexID) org.mockito.Matchers.eq(this.env.getJobVertexId()), (KeyGroupRange) org.mockito.Matchers.eq(keyGroupRange), (String) org.mockito.Matchers.eq("banana"), (KvStateID) org.mockito.Matchers.any(KvStateID.class));
        KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462379L, 4L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
        createKeyedBackend.dispose();
        ((KvStateRegistryListener) Mockito.verify(kvStateRegistryListener, Mockito.times(1))).notifyKvStateUnregistered((JobID) org.mockito.Matchers.eq(this.env.getJobID()), (JobVertexID) org.mockito.Matchers.eq(this.env.getJobVertexId()), (KeyGroupRange) org.mockito.Matchers.eq(keyGroupRange), (String) org.mockito.Matchers.eq("banana"));
        createKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot, this.env);
        runSnapshot.discardState();
        restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        ((KvStateRegistryListener) Mockito.verify(kvStateRegistryListener, Mockito.times(2))).notifyKvStateRegistered((JobID) org.mockito.Matchers.eq(this.env.getJobID()), (JobVertexID) org.mockito.Matchers.eq(this.env.getJobVertexId()), (KeyGroupRange) org.mockito.Matchers.eq(keyGroupRange), (String) org.mockito.Matchers.eq("banana"), (KvStateID) org.mockito.Matchers.any(KvStateID.class));
        restoreKeyedBackend.dispose();
    }

    @Test
    public void testEmptyStateCheckpointing() {
        try {
            CheckpointStreamFactory createStreamFactory = createStreamFactory();
            SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
            AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
            new ListStateDescriptor("id", String.class);
            KeyedStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462379L, 1L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistry);
            Assert.assertNull(runSnapshot);
            createKeyedBackend.dispose();
            restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot).dispose();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testNumStateEntries() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class);
        Assert.assertEquals(0L, createKeyedBackend.numKeyValueStateEntries());
        ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        createKeyedBackend.setCurrentKey(0);
        partitionedState.update("hello");
        partitionedState.update("ciao");
        Assert.assertEquals(1L, createKeyedBackend.numKeyValueStateEntries());
        createKeyedBackend.setCurrentKey(42);
        partitionedState.update("foo");
        Assert.assertEquals(2L, createKeyedBackend.numKeyValueStateEntries());
        createKeyedBackend.setCurrentKey(0);
        partitionedState.clear();
        Assert.assertEquals(1L, createKeyedBackend.numKeyValueStateEntries());
        createKeyedBackend.setCurrentKey(42);
        partitionedState.clear();
        Assert.assertEquals(0L, createKeyedBackend.numKeyValueStateEntries());
        createKeyedBackend.dispose();
    }

    @Test
    public void testParallelAsyncSnapshots() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = new BlockerCheckpointStreamFactory(1048576);
        blockerCheckpointStreamFactory.setWaiterLatch(oneShotLatch2);
        blockerCheckpointStreamFactory.setBlockerLatch(oneShotLatch);
        blockerCheckpointStreamFactory.setAfterNumberInvocations(10);
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            if (createKeyedBackend.supportsAsynchronousSnapshots()) {
                InternalValueState createInternalState = createKeyedBackend.createInternalState(VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("test", IntSerializer.INSTANCE));
                createInternalState.setCurrentNamespace(VoidNamespace.INSTANCE);
                for (int i = 0; i < 10; i++) {
                    createKeyedBackend.setCurrentKey(Integer.valueOf(i));
                    createInternalState.update(Integer.valueOf(i));
                }
                RunnableFuture snapshot = createKeyedBackend.snapshot(0L, 0L, blockerCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
                new Thread(snapshot, "snapshot-1-runner").start();
                oneShotLatch2.await();
                for (int i2 = 5; i2 < 15; i2++) {
                    createKeyedBackend.setCurrentKey(Integer.valueOf(i2));
                    createInternalState.update(Integer.valueOf(i2 + 1));
                }
                blockerCheckpointStreamFactory.setWaiterLatch(null);
                blockerCheckpointStreamFactory.setBlockerLatch(null);
                RunnableFuture snapshot2 = createKeyedBackend.snapshot(1L, 1L, blockerCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
                new Thread(snapshot2, "snapshot-2-runner").start();
                snapshot2.get();
                oneShotLatch.trigger();
                snapshot.get();
                createKeyedBackend.dispose();
            }
        } finally {
            createKeyedBackend.dispose();
        }
    }

    @Test
    public void testNonConcurrentSnapshotTransformerAccess() throws Exception {
        BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = new BlockerCheckpointStreamFactory(1048576);
        AutoCloseable autoCloseable = null;
        try {
            autoCloseable = createKeyedBackend(IntSerializer.INSTANCE);
            new StateSnapshotTransformerTest(autoCloseable, blockerCheckpointStreamFactory).testNonConcurrentSnapshotTransformerAccess();
            if (autoCloseable != null) {
                IOUtils.closeQuietly(autoCloseable);
                autoCloseable.dispose();
            }
        } catch (Throwable th) {
            if (autoCloseable != null) {
                IOUtils.closeQuietly(autoCloseable);
                autoCloseable.dispose();
            }
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testAsyncSnapshot() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = new BlockerCheckpointStreamFactory(1048576);
        blockerCheckpointStreamFactory.setWaiterLatch(oneShotLatch);
        AbstractKeyedStateBackend abstractKeyedStateBackend = null;
        try {
            abstractKeyedStateBackend = createKeyedBackend(IntSerializer.INSTANCE);
            InternalValueState createInternalState = abstractKeyedStateBackend.createInternalState(VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("test", IntSerializer.INSTANCE));
            createInternalState.setCurrentNamespace(VoidNamespace.INSTANCE);
            for (int i = 0; i < 10; i++) {
                abstractKeyedStateBackend.setCurrentKey(Integer.valueOf(i));
                createInternalState.update(Integer.valueOf(i));
            }
            RunnableFuture snapshot = abstractKeyedStateBackend.snapshot(0L, 0L, blockerCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread thread = new Thread(snapshot);
            thread.start();
            for (int i2 = 0; i2 < 20; i2++) {
                abstractKeyedStateBackend.setCurrentKey(Integer.valueOf(i2));
                createInternalState.update(Integer.valueOf(i2 + 1));
                if (10 == i2) {
                    oneShotLatch.await();
                }
            }
            thread.join();
            KeyedStateHandle jobManagerOwnedSnapshot = ((SnapshotResult) snapshot.get()).getJobManagerOwnedSnapshot();
            for (int i3 = 0; i3 < 20; i3++) {
                abstractKeyedStateBackend.setCurrentKey(Integer.valueOf(i3));
                Assert.assertEquals(i3 + 1, ((Integer) createInternalState.value()).intValue());
            }
            if (null != abstractKeyedStateBackend) {
                IOUtils.closeQuietly(abstractKeyedStateBackend);
                abstractKeyedStateBackend.dispose();
            }
            Assert.assertNotNull(jobManagerOwnedSnapshot);
            AbstractKeyedStateBackend abstractKeyedStateBackend2 = null;
            try {
                abstractKeyedStateBackend2 = restoreKeyedBackend(IntSerializer.INSTANCE, jobManagerOwnedSnapshot);
                InternalValueState createInternalState2 = abstractKeyedStateBackend2.createInternalState(VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("test", IntSerializer.INSTANCE));
                createInternalState2.setCurrentNamespace(VoidNamespace.INSTANCE);
                for (int i4 = 0; i4 < 10; i4++) {
                    abstractKeyedStateBackend2.setCurrentKey(Integer.valueOf(i4));
                    Assert.assertEquals(i4, ((Integer) createInternalState2.value()).intValue());
                }
                abstractKeyedStateBackend2.setCurrentKey(11);
                Assert.assertNull(createInternalState2.value());
                if (null != abstractKeyedStateBackend2) {
                    IOUtils.closeQuietly(abstractKeyedStateBackend2);
                    abstractKeyedStateBackend2.dispose();
                }
            } finally {
                if (null != abstractKeyedStateBackend2) {
                    IOUtils.closeQuietly(abstractKeyedStateBackend2);
                    abstractKeyedStateBackend2.dispose();
                }
            }
        } catch (Throwable th) {
            throw th;
        }
    }

    @Test
    public void testConcurrentModificationWithApplyToAllKeys() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor("foo", StringSerializer.INSTANCE);
            ListState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
            for (int i = 0; i < 100; i++) {
                createKeyedBackend.setCurrentKey(Integer.valueOf(i));
                partitionedState.add("Hello" + i);
            }
            createKeyedBackend.applyToAllKeys(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor, new KeyedStateFunction<Integer, ListState<String>>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.17
                public void process(Integer num, ListState<String> listState) throws Exception {
                    Assert.assertEquals("Hello" + num, ((Iterable) listState.get()).iterator().next());
                }
            });
            createKeyedBackend.applyToAllKeys(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor, new KeyedStateFunction<Integer, ListState<String>>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.18
                public void process(Integer num, ListState<String> listState) throws Exception {
                    listState.clear();
                }
            });
            createKeyedBackend.applyToAllKeys(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor, new KeyedStateFunction<Integer, ListState<String>>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.19
                public void process(Integer num, ListState<String> listState) throws Exception {
                    Assert.assertFalse(((Iterable) listState.get()).iterator().hasNext());
                }
            });
            createKeyedBackend.applyToAllKeys(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor, new KeyedStateFunction<Integer, ListState<String>>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.20
                public void process(Integer num, ListState<String> listState) throws Exception {
                    listState.add("Hello" + num);
                    listState.clear();
                    listState.add("Hello_" + num);
                }
            });
            createKeyedBackend.applyToAllKeys(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor, new KeyedStateFunction<Integer, ListState<String>>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.21
                public void process(Integer num, ListState<String> listState) throws Exception {
                    Iterator it = ((Iterable) listState.get()).iterator();
                    Assert.assertEquals("Hello_" + num, it.next());
                    Assert.assertFalse(it.hasNext());
                }
            });
            IOUtils.closeQuietly(createKeyedBackend);
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            IOUtils.closeQuietly(createKeyedBackend);
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testApplyToAllKeysLambdaFunction() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor("foo", StringSerializer.INSTANCE);
            ListState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
            for (int i = 0; i < 100; i++) {
                createKeyedBackend.setCurrentKey(Integer.valueOf(i));
                partitionedState.add("Hello" + i);
            }
            createKeyedBackend.applyToAllKeys(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor, (num, listState) -> {
                Assert.assertEquals("Hello" + num, ((Iterable) listState.get()).iterator().next());
            });
            IOUtils.closeQuietly(createKeyedBackend);
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            IOUtils.closeQuietly(createKeyedBackend);
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testAsyncSnapshotCancellation() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = new BlockerCheckpointStreamFactory(1048576);
        blockerCheckpointStreamFactory.setWaiterLatch(oneShotLatch2);
        blockerCheckpointStreamFactory.setBlockerLatch(oneShotLatch);
        blockerCheckpointStreamFactory.setAfterNumberInvocations(10);
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            if (createKeyedBackend.supportsAsynchronousSnapshots()) {
                InternalValueState createInternalState = createKeyedBackend.createInternalState(VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("test", IntSerializer.INSTANCE));
                createInternalState.setCurrentNamespace(VoidNamespace.INSTANCE);
                for (int i = 0; i < 10; i++) {
                    createKeyedBackend.setCurrentKey(Integer.valueOf(i));
                    createInternalState.update(Integer.valueOf(i));
                }
                RunnableFuture snapshot = createKeyedBackend.snapshot(0L, 0L, blockerCheckpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
                Thread thread = new Thread(snapshot);
                thread.start();
                oneShotLatch2.await();
                IOUtils.closeQuietly(createKeyedBackend);
                oneShotLatch.trigger();
                thread.join();
                try {
                    snapshot.get();
                    Assert.fail("Close was not propagated.");
                } catch (CancellationException e) {
                }
                createKeyedBackend.dispose();
            }
        } finally {
            createKeyedBackend.dispose();
        }
    }

    /* JADX WARN: Type inference failed for: r0v34, types: [java.util.PrimitiveIterator$OfInt] */
    /* JADX WARN: Type inference failed for: r0v54, types: [java.util.PrimitiveIterator$OfInt] */
    @Test
    public void testMapStateGetKeys() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        try {
            MapState partitionedState = createKeyedBackend.getPartitionedState("ns1", StringSerializer.INSTANCE, new MapStateDescriptor("get-keys-test", StringSerializer.INSTANCE, IntSerializer.INSTANCE));
            for (int i = 0; i < 1000; i++) {
                createKeyedBackend.setCurrentKey(Integer.valueOf(i));
                partitionedState.put("he", Integer.valueOf(i * 2));
                partitionedState.put("ho", Integer.valueOf(i * 2));
            }
            MapState partitionedState2 = createKeyedBackend.getPartitionedState("ns2", StringSerializer.INSTANCE, new MapStateDescriptor("get-keys-test", StringSerializer.INSTANCE, IntSerializer.INSTANCE));
            for (int i2 = 1000; i2 < 2000; i2++) {
                createKeyedBackend.setCurrentKey(Integer.valueOf(i2));
                partitionedState2.put("he", Integer.valueOf(i2 * 2));
                partitionedState2.put("ho", Integer.valueOf(i2 * 2));
            }
            Stream sorted = createKeyedBackend.getKeys("get-keys-test", "ns1").sorted();
            Throwable th = null;
            try {
                ?? it = sorted.mapToInt(num -> {
                    return num.intValue();
                }).iterator();
                for (int i3 = 0; i3 < 1000; i3++) {
                    Assert.assertTrue(it.hasNext());
                    Assert.assertEquals(i3, it.nextInt());
                }
                Assert.assertFalse(it.hasNext());
                if (sorted != null) {
                    if (0 != 0) {
                        try {
                            sorted.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        sorted.close();
                    }
                }
                Stream sorted2 = createKeyedBackend.getKeys("get-keys-test", "ns2").sorted();
                Throwable th3 = null;
                try {
                    try {
                        ?? it2 = sorted2.mapToInt(num2 -> {
                            return num2.intValue();
                        }).iterator();
                        for (int i4 = 1000; i4 < 2000; i4++) {
                            Assert.assertTrue(it2.hasNext());
                            Assert.assertEquals(i4, it2.nextInt());
                        }
                        Assert.assertFalse(it2.hasNext());
                        if (sorted2 != null) {
                            if (0 != 0) {
                                try {
                                    sorted2.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                sorted2.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            IOUtils.closeQuietly(createKeyedBackend);
            createKeyedBackend.dispose();
        }
    }

    /* JADX WARN: Type inference failed for: r0v12, types: [java.util.List, java.util.concurrent.ExecutorService, java.util.ArrayList] */
    @Test
    public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, this.env);
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1);
        try {
            try {
                long j = 0;
                ?? arrayList = new ArrayList();
                for (int i = 0; i < 10; i++) {
                    InternalValueState internalValueState = (ValueState) createKeyedBackend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id" + i, IntSerializer.INSTANCE));
                    internalValueState.setCurrentNamespace(VoidNamespace.INSTANCE);
                    createKeyedBackend.setCurrentKey(Integer.valueOf(i));
                    internalValueState.update(Integer.valueOf(i));
                    long j2 = j;
                    j = j2 + 1;
                    arrayList.add(runSnapshotAsync(arrayList, createKeyedBackend.snapshot(j2, System.currentTimeMillis(), createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())));
                }
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    ((Future) it.next()).get(20L, TimeUnit.SECONDS);
                }
                createKeyedBackend.dispose();
                newScheduledThreadPool.shutdown();
            } catch (Exception e) {
                Assert.fail();
                createKeyedBackend.dispose();
                newScheduledThreadPool.shutdown();
            }
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            newScheduledThreadPool.shutdown();
            throw th;
        }
    }

    protected Future<SnapshotResult<KeyedStateHandle>> runSnapshotAsync(ExecutorService executorService, RunnableFuture<SnapshotResult<KeyedStateHandle>> runnableFuture) throws Exception {
        if (runnableFuture.isDone()) {
            return CompletableFuture.completedFuture(runnableFuture.get());
        }
        CompletableFuture completableFuture = new CompletableFuture();
        executorService.submit(() -> {
            try {
                runnableFuture.run();
                completableFuture.complete(runnableFuture.get());
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
        });
        return completableFuture;
    }

    private int getKeyInKeyGroup(Random random, int i, KeyGroupRange keyGroupRange) {
        int nextInt = random.nextInt();
        int assignToKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(Integer.valueOf(nextInt), i);
        while (!keyGroupRange.contains(assignToKeyGroup)) {
            nextInt = random.nextInt();
            assignToKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(Integer.valueOf(nextInt), i);
        }
        return nextInt;
    }

    protected static <V, K, N> V getSerializedValue(InternalKvState<K, N, V> internalKvState, K k, TypeSerializer<K> typeSerializer, N n, TypeSerializer<N> typeSerializer2, TypeSerializer<V> typeSerializer3) throws Exception {
        byte[] serializedValue = internalKvState.getSerializedValue(KvStateSerializer.serializeKeyAndNamespace(k, typeSerializer, n, typeSerializer2), internalKvState.getKeySerializer(), internalKvState.getNamespaceSerializer(), internalKvState.getValueSerializer());
        if (serializedValue == null) {
            return null;
        }
        return (V) KvStateSerializer.deserializeValue(serializedValue, typeSerializer3);
    }

    private static <V, K, N> List<V> getSerializedList(InternalKvState<K, N, V> internalKvState, K k, TypeSerializer<K> typeSerializer, N n, TypeSerializer<N> typeSerializer2, TypeSerializer<V> typeSerializer3) throws Exception {
        byte[] serializedValue = internalKvState.getSerializedValue(KvStateSerializer.serializeKeyAndNamespace(k, typeSerializer, n, typeSerializer2), internalKvState.getKeySerializer(), internalKvState.getNamespaceSerializer(), internalKvState.getValueSerializer());
        if (serializedValue == null) {
            return null;
        }
        return KvStateSerializer.deserializeList(serializedValue, typeSerializer3);
    }

    private static <UK, UV, K, N> Map<UK, UV> getSerializedMap(InternalKvState<K, N, Map<UK, UV>> internalKvState, K k, TypeSerializer<K> typeSerializer, N n, TypeSerializer<N> typeSerializer2, TypeSerializer<UK> typeSerializer3, TypeSerializer<UV> typeSerializer4) throws Exception {
        byte[] serializedValue = internalKvState.getSerializedValue(KvStateSerializer.serializeKeyAndNamespace(k, typeSerializer, n, typeSerializer2), internalKvState.getKeySerializer(), internalKvState.getNamespaceSerializer(), internalKvState.getValueSerializer());
        if (serializedValue == null) {
            return null;
        }
        return KvStateSerializer.deserializeMap(serializedValue, typeSerializer3, typeSerializer4);
    }

    protected KeyedStateHandle runSnapshot(RunnableFuture<SnapshotResult<KeyedStateHandle>> runnableFuture, SharedStateRegistry sharedStateRegistry) throws Exception {
        if (!runnableFuture.isDone()) {
            runnableFuture.run();
        }
        KeyedStateHandle jobManagerOwnedSnapshot = runnableFuture.get().getJobManagerOwnedSnapshot();
        if (jobManagerOwnedSnapshot != null) {
            jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry);
        }
        return jobManagerOwnedSnapshot;
    }

    private MockEnvironment buildMockEnv() {
        return MockEnvironment.builder().build();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -640350094:
                if (implMethodName.equals("lambda$testReducingStateAddAndGet$e228ea2e$1")) {
                    z = false;
                    break;
                }
                break;
            case -87859171:
                if (implMethodName.equals("lambda$testReducingStateMerging$e228ea2e$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case ExternalSortLargeRecordsITCase.SmallOrMediumOrLargeValue.SMALL_SIZE /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/ReduceFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("reduce") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/runtime/state/StateBackendTestBase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return (l, l2) -> {
                        return Long.valueOf(l.longValue() + l2.longValue());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/ReduceFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("reduce") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/runtime/state/StateBackendTestBase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return (l3, l22) -> {
                        return Long.valueOf(l3.longValue() + l22.longValue());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
