package org.apache.flink.runtime.state.v2;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.RunnableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.state.StateFutureImpl;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.testutils.statemigration.TestType;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.IOUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;

/* loaded from: input_file:org/apache/flink/runtime/state/v2/StateBackendTestV2Base.class */
public abstract class StateBackendTestV2Base<B extends AbstractStateBackend> {
    protected MockEnvironment env;
    protected JobID jobID;
    private CheckpointStreamFactory checkpointStreamFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/v2/StateBackendTestV2Base$TestAsyncFrameworkExceptionHandler.class */
    public static class TestAsyncFrameworkExceptionHandler implements StateFutureImpl.AsyncFrameworkExceptionHandler {
        String message = null;
        Throwable exception = null;

        TestAsyncFrameworkExceptionHandler() {
        }

        public void handleException(String str, Throwable th) {
            this.message = str;
            this.exception = th;
        }
    }

    @BeforeEach
    void before() throws Exception {
        this.jobID = new JobID();
        this.env = buildMockEnv();
    }

    protected MockEnvironment buildMockEnv() throws Exception {
        MockEnvironment build = MockEnvironment.builder().setTaskStateManager(getTestTaskStateManager()).setJobID(this.jobID).build();
        build.setCheckpointStorageAccess(getCheckpointStorageAccess());
        return build;
    }

    protected TestTaskStateManager getTestTaskStateManager() throws IOException {
        return TestTaskStateManager.builder().build();
    }

    @AfterEach
    void after() {
        IOUtils.closeQuietly(this.env);
    }

    protected abstract ConfigurableStateBackend getStateBackend() throws Exception;

    protected abstract void restoreJob() throws Exception;

    protected CheckpointStorage getCheckpointStorage() throws Exception {
        CheckpointStorage stateBackend = getStateBackend();
        if (stateBackend instanceof CheckpointStorage) {
            return stateBackend;
        }
        throw new IllegalStateException("The state backend under test does not implement CheckpointStorage.Please override 'createCheckpointStorage' and provide an appropriatecheckpoint storage instance");
    }

    protected CheckpointStorageAccess getCheckpointStorageAccess() throws Exception {
        return getCheckpointStorage().createCheckpointStorage(this.jobID);
    }

    protected CheckpointStreamFactory createStreamFactory() throws Exception {
        if (this.checkpointStreamFactory == null) {
            this.checkpointStreamFactory = getCheckpointStorage().createCheckpointStorage(this.jobID).resolveCheckpointStorageLocation(1L, CheckpointStorageLocationReference.getDefault());
        }
        return this.checkpointStreamFactory;
    }

    protected <K> AsyncKeyedStateBackend<K> createAsyncKeyedBackend(int i, int i2, TypeSerializer<K> typeSerializer, KeyGroupRange keyGroupRange, Environment environment) throws Exception {
        environment.setCheckpointStorageAccess(getCheckpointStorageAccess());
        return getStateBackend().createAsyncKeyedStateBackend(new KeyedStateBackendParametersImpl(environment, this.jobID, String.format("test_op_%d_%d", Integer.valueOf(i), Integer.valueOf(i2)), typeSerializer, keyGroupRange.getNumberOfKeyGroups(), keyGroupRange, environment.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, getMetricGroup(), getCustomInitializationMetrics(), Collections.emptyList(), new CloseableRegistry(), 1.0d));
    }

    protected StateBackend.CustomInitializationMetrics getCustomInitializationMetrics() {
        return (str, j) -> {
        };
    }

    protected <K> AsyncKeyedStateBackend<K> restoreAsyncKeyedBackend(int i, int i2, TypeSerializer<K> typeSerializer, KeyGroupRange keyGroupRange, List<KeyedStateHandle> list, Environment environment) throws Exception {
        return getStateBackend().createAsyncKeyedStateBackend(new KeyedStateBackendParametersImpl(environment, this.jobID, String.format("test_op_%d_%d", Integer.valueOf(i), Integer.valueOf(i2)), typeSerializer, keyGroupRange.getNumberOfKeyGroups(), keyGroupRange, environment.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, getMetricGroup(), getCustomInitializationMetrics(), list, new CloseableRegistry(), 1.0d));
    }

    protected MetricGroup getMetricGroup() {
        return new UnregisteredMetricsGroup();
    }

    @TestTemplate
    void testAsyncKeyedStateBackendSnapshot() throws Exception {
        ArrayList arrayList = new ArrayList(20);
        AsyncKeyedStateBackend asyncKeyedStateBackend = null;
        TestAsyncFrameworkExceptionHandler testAsyncFrameworkExceptionHandler = new TestAsyncFrameworkExceptionHandler();
        try {
            asyncKeyedStateBackend = createAsyncKeyedBackend(0, 1, IntSerializer.INSTANCE, new KeyGroupRange(0, 128 - 1), this.env);
            AsyncExecutionController asyncExecutionController = new AsyncExecutionController(new SyncMailboxExecutor(), testAsyncFrameworkExceptionHandler, asyncKeyedStateBackend.createStateExecutor(), new DeclarationManager(), 128, 1, 1L, HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, (AsyncExecutionController.SwitchContextListener) null, (MetricGroup) null);
            asyncKeyedStateBackend.setup(asyncExecutionController);
            ValueState orCreateKeyedState = asyncKeyedStateBackend.getOrCreateKeyedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("test", BasicTypeInfo.INT_TYPE_INFO));
            for (int i = 0; i < 20; i++) {
                arrayList.add(asyncExecutionController.buildContext(Integer.valueOf(i), Integer.valueOf(i)));
                ((RecordContext) arrayList.get(i)).retain();
            }
            for (int i2 = 0; i2 < 20; i2++) {
                asyncExecutionController.setCurrentContext((RecordContext) arrayList.get(i2));
                orCreateKeyedState.update(Integer.valueOf(i2));
            }
            for (int i3 = 0; i3 < 20; i3++) {
                asyncExecutionController.setCurrentContext((RecordContext) arrayList.get(i3));
                Assertions.assertThat((Integer) orCreateKeyedState.value()).isEqualTo(i3);
                ((RecordContext) arrayList.get(i3)).release();
            }
            asyncExecutionController.drainInflightRecords(0);
            RunnableFuture snapshot = asyncKeyedStateBackend.snapshot(1L, System.currentTimeMillis(), createStreamFactory(), CheckpointOptions.forCheckpointWithDefaultLocation());
            if (!snapshot.isDone()) {
                snapshot.run();
            }
            KeyedStateHandle jobManagerOwnedSnapshot = ((SnapshotResult) snapshot.get()).getJobManagerOwnedSnapshot();
            arrayList.clear();
            for (int i4 = 0; i4 < 20; i4++) {
                arrayList.add(asyncExecutionController.buildContext(Integer.valueOf(i4), Integer.valueOf(i4)));
                ((RecordContext) arrayList.get(i4)).retain();
            }
            for (int i5 = 0; i5 < 20; i5++) {
                asyncExecutionController.setCurrentContext((RecordContext) arrayList.get(i5));
                orCreateKeyedState.update(Integer.valueOf(i5 + 1));
            }
            for (int i6 = 0; i6 < 20; i6++) {
                asyncExecutionController.setCurrentContext((RecordContext) arrayList.get(i6));
                Assertions.assertThat((Integer) orCreateKeyedState.value()).isEqualTo(i6 + 1);
                ((RecordContext) arrayList.get(i6)).release();
            }
            if (null != asyncKeyedStateBackend) {
                IOUtils.closeQuietly(asyncKeyedStateBackend);
                asyncKeyedStateBackend.dispose();
            }
            Assertions.assertThat(jobManagerOwnedSnapshot).isNotNull();
            AsyncKeyedStateBackend asyncKeyedStateBackend2 = null;
            restoreJob();
            try {
                asyncKeyedStateBackend2 = restoreAsyncKeyedBackend(0, 1, IntSerializer.INSTANCE, new KeyGroupRange(0, 128 - 1), Collections.singletonList(jobManagerOwnedSnapshot), this.env);
                AsyncExecutionController asyncExecutionController2 = new AsyncExecutionController(new SyncMailboxExecutor(), testAsyncFrameworkExceptionHandler, asyncKeyedStateBackend2.createStateExecutor(), new DeclarationManager(), 128, 1, 1L, HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, (AsyncExecutionController.SwitchContextListener) null, (MetricGroup) null);
                asyncKeyedStateBackend2.setup(asyncExecutionController2);
                ValueState orCreateKeyedState2 = asyncKeyedStateBackend2.getOrCreateKeyedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("test", BasicTypeInfo.INT_TYPE_INFO));
                arrayList.clear();
                for (int i7 = 0; i7 < 20; i7++) {
                    arrayList.add(asyncExecutionController2.buildContext(Integer.valueOf(i7), Integer.valueOf(i7)));
                    ((RecordContext) arrayList.get(i7)).retain();
                }
                for (int i8 = 0; i8 < 20; i8++) {
                    asyncExecutionController2.setCurrentContext((RecordContext) arrayList.get(i8));
                    Assertions.assertThat((Integer) orCreateKeyedState2.value()).isEqualTo(i8);
                    ((RecordContext) arrayList.get(i8)).release();
                }
                if (null != asyncKeyedStateBackend2) {
                    IOUtils.closeQuietly(asyncKeyedStateBackend2);
                    asyncKeyedStateBackend2.dispose();
                }
                Assertions.assertThat(testAsyncFrameworkExceptionHandler.exception).isNull();
            } finally {
            }
        } finally {
        }
    }

    @TestTemplate
    void testAsyncStateBackendScaleUp() throws Exception {
        testKeyGroupSnapshotRestore(2, 5, 10);
    }

    @TestTemplate
    void testAsyncStateBackendScaleDown() throws Exception {
        testKeyGroupSnapshotRestore(4, 3, 10);
    }

    private void testKeyGroupSnapshotRestore(int i, int i2, int i3) throws Exception {
        Random random = new Random();
        ArrayList arrayList = new ArrayList(i3);
        ArrayList arrayList2 = new ArrayList(i3);
        ArrayList arrayList3 = new ArrayList(i3);
        for (int i4 = 0; i4 < i3; i4++) {
            arrayList.add(new ValueStateDescriptor("state" + i4, BasicTypeInfo.INT_TYPE_INFO));
        }
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistryImpl = new SharedStateRegistryImpl();
        TestAsyncFrameworkExceptionHandler testAsyncFrameworkExceptionHandler = new TestAsyncFrameworkExceptionHandler();
        ArrayList arrayList4 = new ArrayList(i);
        for (int i5 = 0; i5 < i; i5++) {
            KeyGroupRange of = KeyGroupRange.of((i3 * i5) / i, ((i3 * (i5 + 1)) / i) - 1);
            AsyncKeyedStateBackend<K> createAsyncKeyedBackend = createAsyncKeyedBackend(i5, i, IntSerializer.INSTANCE, of, this.env);
            AsyncExecutionController asyncExecutionController = new AsyncExecutionController(new SyncMailboxExecutor(), testAsyncFrameworkExceptionHandler, createAsyncKeyedBackend.createStateExecutor(), new DeclarationManager(), i3, 1, 1L, HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, (AsyncExecutionController.SwitchContextListener) null, (MetricGroup) null);
            createAsyncKeyedBackend.setup(asyncExecutionController);
            try {
                for (int startKeyGroup = of.getStartKeyGroup(); startKeyGroup <= of.getEndKeyGroup(); startKeyGroup++) {
                    ValueState orCreateKeyedState = createAsyncKeyedBackend.getOrCreateKeyedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, (StateDescriptor) arrayList.get(startKeyGroup));
                    int keyInKeyGroup = getKeyInKeyGroup(random, i3, KeyGroupRange.of(startKeyGroup, startKeyGroup));
                    RecordContext buildContext = asyncExecutionController.buildContext(Integer.valueOf(keyInKeyGroup), Integer.valueOf(keyInKeyGroup));
                    buildContext.retain();
                    asyncExecutionController.setCurrentContext(buildContext);
                    arrayList2.add(Integer.valueOf(keyInKeyGroup));
                    orCreateKeyedState.update(Integer.valueOf(keyInKeyGroup));
                    arrayList3.add(Integer.valueOf(keyInKeyGroup));
                    buildContext.release();
                }
                arrayList4.add(runSnapshot(createAsyncKeyedBackend.snapshot(0L, 0L, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistryImpl));
                IOUtils.closeQuietly(createAsyncKeyedBackend);
                createAsyncKeyedBackend.dispose();
            } catch (Throwable th) {
                IOUtils.closeQuietly(createAsyncKeyedBackend);
                createAsyncKeyedBackend.dispose();
                throw th;
            }
        }
        ArrayList arrayList5 = new ArrayList();
        for (int i6 = 0; i6 < i2; i6++) {
            arrayList5.add(KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(i3, i2, i6));
        }
        ArrayList arrayList6 = new ArrayList(i2);
        for (int i7 = 0; i7 < i2; i7++) {
            ArrayList arrayList7 = new ArrayList();
            StateAssignmentOperation.extractIntersectingState(arrayList4, (KeyGroupRange) arrayList5.get(i7), arrayList7);
            arrayList6.add(arrayList7);
        }
        for (int i8 = 0; i8 < i2; i8++) {
            AsyncKeyedStateBackend<K> restoreAsyncKeyedBackend = restoreAsyncKeyedBackend(i8, i2, IntSerializer.INSTANCE, (KeyGroupRange) arrayList5.get(i8), (List) arrayList6.get(i8), this.env);
            AsyncExecutionController asyncExecutionController2 = new AsyncExecutionController(new SyncMailboxExecutor(), testAsyncFrameworkExceptionHandler, restoreAsyncKeyedBackend.createStateExecutor(), new DeclarationManager(), i3, 1, 1L, HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, (AsyncExecutionController.SwitchContextListener) null, (MetricGroup) null);
            restoreAsyncKeyedBackend.setup(asyncExecutionController2);
            try {
                KeyGroupRange keyGroupRange = (KeyGroupRange) arrayList5.get(i8);
                for (int startKeyGroup2 = keyGroupRange.getStartKeyGroup(); startKeyGroup2 <= keyGroupRange.getEndKeyGroup(); startKeyGroup2++) {
                    ValueState orCreateKeyedState2 = restoreAsyncKeyedBackend.getOrCreateKeyedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, (StateDescriptor) arrayList.get(startKeyGroup2));
                    RecordContext buildContext2 = asyncExecutionController2.buildContext(arrayList2.get(startKeyGroup2), (Integer) arrayList2.get(startKeyGroup2));
                    buildContext2.retain();
                    asyncExecutionController2.setCurrentContext(buildContext2);
                    Assertions.assertThat((Integer) orCreateKeyedState2.value()).isEqualTo(arrayList3.get(startKeyGroup2));
                    buildContext2.release();
                }
            } finally {
                IOUtils.closeQuietly(restoreAsyncKeyedBackend);
                restoreAsyncKeyedBackend.dispose();
            }
        }
    }

    @TestTemplate
    void testKeyGroupedInternalPriorityQueue() throws Exception {
        testKeyGroupedInternalPriorityQueue(false);
    }

    @TestTemplate
    void testKeyGroupedInternalPriorityQueueAddAll() throws Exception {
        testKeyGroupedInternalPriorityQueue(true);
    }

    void testKeyGroupedInternalPriorityQueue(boolean z) throws Exception {
        AsyncKeyedStateBackend<K> createAsyncKeyedBackend = createAsyncKeyedBackend(0, 1, IntSerializer.INSTANCE, new KeyGroupRange(0, 127), this.env);
        try {
            KeyGroupedInternalPriorityQueue create = createAsyncKeyedBackend.create("key-grouped-priority-queue", new TestType.V1TestTypeSerializer());
            TestType testType = new TestType("a", 42);
            TestType testType2 = new TestType("a", 44);
            TestType testType3 = new TestType("b", 1);
            TestType testType4 = new TestType("b", 3);
            TestType[] testTypeArr = {testType2, testType3, testType3, testType4, testType};
            if (z) {
                create.addAll(Arrays.asList(testTypeArr));
            } else {
                Assertions.assertThat(create.add(testTypeArr[0])).isTrue();
                Assertions.assertThat(create.add(testTypeArr[1])).isTrue();
                Assertions.assertThat(create.add(testTypeArr[2])).isFalse();
                Assertions.assertThat(create.add(testTypeArr[3])).isFalse();
                Assertions.assertThat(create.add(testTypeArr[4])).isFalse();
            }
            Assertions.assertThat(create.isEmpty()).isFalse();
            Assertions.assertThat(create.getSubsetForKeyGroup(81)).containsExactlyInAnyOrder(new TestType[]{testType, testType2});
            Assertions.assertThat(create.getSubsetForKeyGroup(22)).containsExactlyInAnyOrder(new TestType[]{testType3, testType4});
            Assertions.assertThat((TestType) create.peek()).isEqualTo(testType3);
            Assertions.assertThat((TestType) create.poll()).isEqualTo(testType3);
            Assertions.assertThat((TestType) create.peek()).isEqualTo(testType4);
            ArrayList arrayList = new ArrayList();
            CloseableIterator it = create.iterator();
            try {
                Objects.requireNonNull(arrayList);
                it.forEachRemaining((v1) -> {
                    r1.add(v1);
                });
                if (it != null) {
                    it.close();
                }
                Assertions.assertThat(arrayList).containsExactlyInAnyOrder(new TestType[]{testType4, testType, testType2});
                Assertions.assertThat(create.size()).isEqualTo(3);
                Assertions.assertThat(create.remove(testType3)).isFalse();
                Assertions.assertThat(create.remove(testType4)).isTrue();
                Assertions.assertThat((TestType) create.peek()).isEqualTo(testType);
                IOUtils.closeQuietly(createAsyncKeyedBackend);
                createAsyncKeyedBackend.dispose();
            } finally {
            }
        } catch (Throwable th) {
            IOUtils.closeQuietly(createAsyncKeyedBackend);
            createAsyncKeyedBackend.dispose();
            throw th;
        }
    }

    @TestTemplate
    void testValueStateWorkWithTtl() throws Exception {
        TestAsyncFrameworkExceptionHandler testAsyncFrameworkExceptionHandler = new TestAsyncFrameworkExceptionHandler();
        AsyncKeyedStateBackend<K> createAsyncKeyedBackend = createAsyncKeyedBackend(0, 1, LongSerializer.INSTANCE, new KeyGroupRange(0, 127), this.env);
        AsyncExecutionController asyncExecutionController = new AsyncExecutionController(new SyncMailboxExecutor(), testAsyncFrameworkExceptionHandler, createAsyncKeyedBackend.createStateExecutor(), new DeclarationManager(), 128, 1, -1L, 1, (AsyncExecutionController.SwitchContextListener) null, (MetricGroup) null);
        createAsyncKeyedBackend.setup(asyncExecutionController);
        try {
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", TypeInformation.of(Long.class));
            valueStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Duration.ofSeconds(1L)).build());
            ValueState orCreateKeyedState = createAsyncKeyedBackend.getOrCreateKeyedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            RecordContext buildContext = asyncExecutionController.buildContext("record-1", 1L);
            buildContext.retain();
            asyncExecutionController.setCurrentContext(buildContext);
            orCreateKeyedState.update(1L);
            Assertions.assertThat((Long) orCreateKeyedState.value()).isEqualTo(1L);
            Thread.sleep(1000L);
            Assertions.assertThat((Long) orCreateKeyedState.value()).isNull();
            buildContext.release();
            RecordContext buildContext2 = asyncExecutionController.buildContext("record-2", 2L);
            asyncExecutionController.setCurrentContext(buildContext2);
            orCreateKeyedState.asyncUpdate(2L).thenAccept(r4 -> {
                orCreateKeyedState.asyncValue().thenAccept(l -> {
                    Assertions.assertThat(l).isEqualTo(2L);
                    Thread.sleep(1000L);
                    orCreateKeyedState.asyncValue().thenAccept(l -> {
                        Assertions.assertThat(l).isNull();
                    });
                });
            });
            Thread.sleep(3000L);
            buildContext2.release();
            IOUtils.closeQuietly(createAsyncKeyedBackend);
            createAsyncKeyedBackend.dispose();
        } catch (Throwable th) {
            IOUtils.closeQuietly(createAsyncKeyedBackend);
            createAsyncKeyedBackend.dispose();
            throw th;
        }
    }

    private int getKeyInKeyGroup(Random random, int i, KeyGroupRange keyGroupRange) {
        int nextInt = random.nextInt();
        int assignToKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(Integer.valueOf(nextInt), i);
        while (!keyGroupRange.contains(assignToKeyGroup)) {
            nextInt = random.nextInt();
            assignToKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(Integer.valueOf(nextInt), i);
        }
        return nextInt;
    }

    private static KeyedStateHandle runSnapshot(RunnableFuture<SnapshotResult<KeyedStateHandle>> runnableFuture, SharedStateRegistry sharedStateRegistry) throws Exception {
        if (!runnableFuture.isDone()) {
            runnableFuture.run();
        }
        KeyedStateHandle jobManagerOwnedSnapshot = runnableFuture.get().getJobManagerOwnedSnapshot();
        if (jobManagerOwnedSnapshot != null) {
            jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry, 0L);
        }
        return jobManagerOwnedSnapshot;
    }
}
