/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.rocksdb;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.state.SharedStateRegistryKey;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendTestBase;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.rocksdb.RocksDBConfigurableOptions;
import org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend;
import org.apache.flink.state.rocksdb.RocksDBKeyedStateBackendBuilder;
import org.apache.flink.state.rocksdb.RocksDBOperationUtils;
import org.apache.flink.state.rocksdb.RocksDBOptions;
import org.apache.flink.state.rocksdb.RocksDBResourceContainer;
import org.apache.flink.state.rocksdb.RocksDBStateUploader;
import org.apache.flink.state.rocksdb.RocksDBTestUtils;
import org.apache.flink.state.rocksdb.sstmerge.RocksDBManualCompactionOptions;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.RocksObject;
import org.rocksdb.Snapshot;

@ExtendWith(value={ParameterizedTestExtension.class})
public class EmbeddedRocksDBStateBackendTest
extends StateBackendTestBase<EmbeddedRocksDBStateBackend> {
    @TempDir
    private static java.nio.file.Path tempFolder;
    private OneShotLatch blocker;
    private OneShotLatch waiter;
    private BlockerCheckpointStreamFactory testStreamFactory;
    private RocksDBKeyedStateBackend<Integer> keyedStateBackend;
    private List<RocksObject> allCreatedCloseables;
    private ValueState<Integer> testState1;
    private ValueState<String> testState2;
    @Parameter(value=0)
    public boolean enableIncrementalCheckpointing;
    @Parameter(value=1)
    public SupplierWithException<CheckpointStorage, IOException> storageSupplier;
    @Parameter(value=2)
    public boolean useIngestDB;
    private String dbPath;
    private RocksDB db = null;
    private ColumnFamilyHandle defaultCFHandle = null;
    private RocksDBStateUploader rocksDBStateUploader = null;
    private final RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer();
    private final HashMap<String, Long> initMetricBackingMap = new HashMap();

    @Parameters
    public static List<Object[]> modes() {
        return Arrays.asList({true, JobManagerCheckpointStorage::new, false}, {true, JobManagerCheckpointStorage::new, true}, {false, () -> {
            String checkpointPath = TempDirUtils.newFolder((java.nio.file.Path)tempFolder).toURI().toString();
            return new FileSystemCheckpointStorage(new Path(checkpointPath), 0, -1);
        }, false});
    }

    public void prepareRocksDB() throws Exception {
        String dbPath = RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath((File)TempDirUtils.newFolder((java.nio.file.Path)tempFolder)).getAbsolutePath();
        ColumnFamilyOptions columnOptions = this.optionsContainer.getColumnOptions();
        ArrayList columnFamilyHandles = new ArrayList(1);
        this.db = RocksDBOperationUtils.openDB((String)dbPath, Collections.emptyList(), columnFamilyHandles, (ColumnFamilyOptions)columnOptions, (DBOptions)this.optionsContainer.getDbOptions());
        this.defaultCFHandle = (ColumnFamilyHandle)columnFamilyHandles.remove(0);
    }

    protected ConfigurableStateBackend getStateBackend() throws IOException {
        this.dbPath = TempDirUtils.newFolder((java.nio.file.Path)tempFolder).getAbsolutePath();
        EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(this.enableIncrementalCheckpointing);
        Configuration configuration = this.createBackendConfig();
        backend = backend.configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        backend.setDbStoragePath(this.dbPath);
        return backend;
    }

    private Configuration createBackendConfig() {
        Configuration configuration = new Configuration();
        configuration.set(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, (Object)this.useIngestDB);
        configuration.set(RocksDBOptions.TIMER_SERVICE_FACTORY, (Object)EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
        configuration.set(RocksDBManualCompactionOptions.MIN_INTERVAL, (Object)Duration.ofMillis(1L));
        return configuration;
    }

    protected StateBackend.CustomInitializationMetrics getCustomInitializationMetrics() {
        return (name, value) -> this.initMetricBackingMap.compute(name, (key, oldValue) -> oldValue == null ? value : value + oldValue);
    }

    protected CheckpointStorage getCheckpointStorage() throws Exception {
        return (CheckpointStorage)this.storageSupplier.get();
    }

    protected boolean isSerializerPresenceRequiredOnRestore() {
        return false;
    }

    protected boolean supportsAsynchronousSnapshots() {
        return true;
    }

    protected boolean isSafeToReuseKVState() {
        return true;
    }

    @AfterEach
    public void cleanupRocksDB() {
        if (this.keyedStateBackend != null) {
            IOUtils.closeQuietly(this.keyedStateBackend);
            this.keyedStateBackend.dispose();
        }
        IOUtils.closeQuietly((AutoCloseable)this.defaultCFHandle);
        IOUtils.closeQuietly((AutoCloseable)this.db);
        IOUtils.closeQuietly((AutoCloseable)this.optionsContainer);
        if (this.allCreatedCloseables != null) {
            for (RocksObject rocksCloseable : this.allCreatedCloseables) {
                ((RocksObject)Mockito.verify((Object)rocksCloseable, (VerificationMode)VerificationModeFactory.times((int)1))).close();
            }
            this.allCreatedCloseables = null;
        }
    }

    public void setupRocksKeyedStateBackend() throws Exception {
        this.blocker = new OneShotLatch();
        this.waiter = new OneShotLatch();
        this.testStreamFactory = new BlockerCheckpointStreamFactory(0x100000);
        this.testStreamFactory.setBlockerLatch(this.blocker);
        this.testStreamFactory.setWaiterLatch(this.waiter);
        this.testStreamFactory.setAfterNumberInvocations(10);
        this.prepareRocksDB();
        RocksDBKeyedStateBackendBuilder keyedStateBackendBuilder = RocksDBTestUtils.builderForTestDB(TempDirUtils.newFolder((java.nio.file.Path)tempFolder), IntSerializer.INSTANCE, (RocksDB)Mockito.spy((Object)this.db), this.defaultCFHandle, this.optionsContainer.getColumnOptions()).setEnableIncrementalCheckpointing(this.enableIncrementalCheckpointing).setUseIngestDbRestoreMode(this.useIngestDB);
        if (this.enableIncrementalCheckpointing) {
            this.rocksDBStateUploader = (RocksDBStateUploader)Mockito.spy((Object)new RocksDBStateUploader(((Integer)RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue()).intValue()));
            keyedStateBackendBuilder.setRocksDBStateUploader(this.rocksDBStateUploader);
        }
        this.keyedStateBackend = keyedStateBackendBuilder.build();
        this.testState1 = (ValueState)this.keyedStateBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("TestState-1", Integer.class, (Object)0));
        this.testState2 = (ValueState)this.keyedStateBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("TestState-2", String.class, (Object)""));
        this.allCreatedCloseables = new ArrayList<RocksObject>();
        ((RocksDB)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                RocksIterator rocksIterator = (RocksIterator)Mockito.spy((Object)((RocksIterator)invocationOnMock.callRealMethod()));
                EmbeddedRocksDBStateBackendTest.this.allCreatedCloseables.add((RocksObject)rocksIterator);
                return rocksIterator;
            }
        }).when((Object)this.keyedStateBackend.db)).newIterator((ColumnFamilyHandle)ArgumentMatchers.any(ColumnFamilyHandle.class), (ReadOptions)ArgumentMatchers.any(ReadOptions.class));
        ((RocksDB)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Snapshot snapshot = (Snapshot)Mockito.spy((Object)((Snapshot)invocationOnMock.callRealMethod()));
                EmbeddedRocksDBStateBackendTest.this.allCreatedCloseables.add((RocksObject)snapshot);
                return snapshot;
            }
        }).when((Object)this.keyedStateBackend.db)).getSnapshot();
        ((RocksDB)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ColumnFamilyHandle snapshot = (ColumnFamilyHandle)Mockito.spy((Object)((ColumnFamilyHandle)invocationOnMock.callRealMethod()));
                EmbeddedRocksDBStateBackendTest.this.allCreatedCloseables.add((RocksObject)snapshot);
                return snapshot;
            }
        }).when((Object)this.keyedStateBackend.db)).createColumnFamily((ColumnFamilyDescriptor)ArgumentMatchers.any(ColumnFamilyDescriptor.class));
        for (int i = 0; i < 100; ++i) {
            this.keyedStateBackend.setCurrentKey((Object)i);
            this.testState1.update((Object)(4200 + i));
            this.testState2.update((Object)("S-" + (4200 + i)));
        }
    }

    @TestTemplate
    public void testCorrectMergeOperatorSet() throws Exception {
        this.prepareRocksDB();
        try (ColumnFamilyOptions columnFamilyOptions = (ColumnFamilyOptions)Mockito.spy((Object)new ColumnFamilyOptions());
             RocksDBKeyedStateBackend test = RocksDBTestUtils.builderForTestDB(TempDirUtils.newFolder((java.nio.file.Path)tempFolder), IntSerializer.INSTANCE, this.db, this.defaultCFHandle, columnFamilyOptions).setEnableIncrementalCheckpointing(this.enableIncrementalCheckpointing).setUseIngestDbRestoreMode(this.useIngestDB).build();){
            ValueStateDescriptor stubState1 = new ValueStateDescriptor("StubState-1", (TypeSerializer)StringSerializer.INSTANCE);
            test.createOrUpdateInternalState((TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)stubState1);
            ValueStateDescriptor stubState2 = new ValueStateDescriptor("StubState-2", (TypeSerializer)StringSerializer.INSTANCE);
            test.createOrUpdateInternalState((TypeSerializer)StringSerializer.INSTANCE, (StateDescriptor)stubState2);
            ((ColumnFamilyOptions)Mockito.verify((Object)columnFamilyOptions, (VerificationMode)Mockito.times((int)2))).setMergeOperatorName("stringappendtest");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    public void testReleasingSnapshotAfterBackendClosed() throws Exception {
        this.setupRocksKeyedStateBackend();
        try {
            RunnableFuture snapshot = this.keyedStateBackend.snapshot(0L, 0L, (CheckpointStreamFactory)this.testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            RocksDB spyDB = this.keyedStateBackend.db;
            for (RocksObject rocksCloseable : this.allCreatedCloseables) {
                ((RocksObject)Mockito.verify((Object)rocksCloseable, (VerificationMode)VerificationModeFactory.times((int)0))).close();
            }
            snapshot.cancel(true);
            this.keyedStateBackend.dispose();
            ((RocksDB)Mockito.verify((Object)spyDB, (VerificationMode)VerificationModeFactory.times((int)1))).close();
            Assertions.assertThat((boolean)this.keyedStateBackend.isDisposed()).isTrue();
            for (RocksObject rocksCloseable : this.allCreatedCloseables) {
                ((RocksObject)Mockito.verify((Object)rocksCloseable, (VerificationMode)VerificationModeFactory.times((int)1))).close();
            }
        }
        finally {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
        }
        this.verifyRocksDBStateUploaderClosed();
    }

    @TestTemplate
    public void testDismissingSnapshot() throws Exception {
        this.setupRocksKeyedStateBackend();
        try {
            RunnableFuture snapshot = this.keyedStateBackend.snapshot(0L, 0L, (CheckpointStreamFactory)this.testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            snapshot.cancel(true);
            this.verifyRocksObjectsReleased();
        }
        finally {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
        }
        this.verifyRocksDBStateUploaderClosed();
    }

    @TestTemplate
    public void testDismissingSnapshotNotRunnable() throws Exception {
        this.setupRocksKeyedStateBackend();
        try {
            RunnableFuture snapshot = this.keyedStateBackend.snapshot(0L, 0L, (CheckpointStreamFactory)this.testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            snapshot.cancel(true);
            Thread asyncSnapshotThread = new Thread(snapshot);
            asyncSnapshotThread.start();
            Assertions.assertThatThrownBy(snapshot::get);
            asyncSnapshotThread.join();
            this.verifyRocksObjectsReleased();
        }
        finally {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
        }
        this.verifyRocksDBStateUploaderClosed();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    public void testCompletingSnapshot() throws Exception {
        this.setupRocksKeyedStateBackend();
        try {
            RunnableFuture snapshot = this.keyedStateBackend.snapshot(0L, 0L, (CheckpointStreamFactory)this.testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread asyncSnapshotThread = new Thread(snapshot);
            asyncSnapshotThread.start();
            this.waiter.await();
            this.waiter.reset();
            this.runStateUpdates();
            this.blocker.trigger();
            this.waiter.await();
            SnapshotResult snapshotResult = (SnapshotResult)snapshot.get();
            KeyedStateHandle keyedStateHandle = (KeyedStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
            Assertions.assertThat((Object)keyedStateHandle).isNotNull();
            Assertions.assertThat((long)keyedStateHandle.getStateSize()).isGreaterThan(0L);
            Assertions.assertThat((int)keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups()).isEqualTo(2);
            for (BlockingCheckpointOutputStream stream : this.testStreamFactory.getAllCreatedStreams()) {
                Assertions.assertThat((boolean)stream.isClosed()).isTrue();
            }
            asyncSnapshotThread.join();
            this.verifyRocksObjectsReleased();
        }
        finally {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
        }
        this.verifyRocksDBStateUploaderClosed();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    public void testCancelRunningSnapshot() throws Exception {
        this.setupRocksKeyedStateBackend();
        try {
            RunnableFuture snapshot = this.keyedStateBackend.snapshot(0L, 0L, (CheckpointStreamFactory)this.testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread asyncSnapshotThread = new Thread(snapshot);
            asyncSnapshotThread.start();
            this.waiter.await();
            this.waiter.reset();
            this.runStateUpdates();
            snapshot.cancel(true);
            this.blocker.trigger();
            for (BlockingCheckpointOutputStream stream : this.testStreamFactory.getAllCreatedStreams()) {
                Assertions.assertThat((boolean)stream.isClosed()).isTrue();
            }
            this.waiter.await();
            Assertions.assertThatThrownBy(snapshot::get);
            asyncSnapshotThread.join();
            this.verifyRocksObjectsReleased();
        }
        finally {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
        }
        this.verifyRocksDBStateUploaderClosed();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    public void testDisposeDeletesAllDirectories() throws Exception {
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        Collection allFilesInDbDir = FileUtils.listFilesAndDirs((File)new File(this.dbPath), (IOFileFilter)new AcceptAllFilter(), (IOFileFilter)new AcceptAllFilter());
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, null);
            kvId.initializeSerializerUnlessSet(new ExecutionConfig());
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            state.update((Object)"Hello");
            Assertions.assertThat((int)allFilesInDbDir.size()).isGreaterThan(1);
        }
        finally {
            IOUtils.closeQuietly((AutoCloseable)backend);
            backend.dispose();
        }
        allFilesInDbDir = FileUtils.listFilesAndDirs((File)new File(this.dbPath), (IOFileFilter)new AcceptAllFilter(), (IOFileFilter)new AcceptAllFilter());
        Assertions.assertThat((Collection)allFilesInDbDir).hasSize(1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    public void testSharedIncrementalStateDeRegistration() throws Exception {
        if (this.enableIncrementalCheckpointing) {
            CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
            try {
                ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class, null);
                kvId.initializeSerializerUnlessSet(new ExecutionConfig());
                ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
                LinkedList<IncrementalRemoteKeyedStateHandle> previousStateHandles = new LinkedList<IncrementalRemoteKeyedStateHandle>();
                SharedStateRegistry sharedStateRegistry = (SharedStateRegistry)Mockito.spy((Object)new SharedStateRegistryImpl());
                for (int checkpointId = 0; checkpointId < 3; ++checkpointId) {
                    Mockito.reset((Object[])new SharedStateRegistry[]{sharedStateRegistry});
                    backend.setCurrentKey((Object)checkpointId);
                    state.update((Object)("Hello-" + checkpointId));
                    RunnableFuture snapshot = backend.snapshot((long)checkpointId, (long)checkpointId, this.createStreamFactory(), CheckpointOptions.forCheckpointWithDefaultLocation());
                    snapshot.run();
                    SnapshotResult snapshotResult = (SnapshotResult)snapshot.get();
                    IncrementalRemoteKeyedStateHandle stateHandle = (IncrementalRemoteKeyedStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
                    List sharedState = stateHandle.getSharedState().stream().map(e -> IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)e.getHandle(), (String)e.getLocalPath())).collect(Collectors.toList());
                    stateHandle.registerSharedStates(sharedStateRegistry, (long)checkpointId);
                    for (IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath : sharedState) {
                        ((SharedStateRegistry)Mockito.verify((Object)sharedStateRegistry)).registerReference(SharedStateRegistryKey.forStreamStateHandle((StreamStateHandle)handleAndLocalPath.getHandle()), handleAndLocalPath.getHandle(), (long)checkpointId);
                    }
                    previousStateHandles.add(stateHandle);
                    ((CheckpointListener)backend).notifyCheckpointComplete((long)checkpointId);
                    if (previousStateHandles.size() <= 1) continue;
                    ((IncrementalRemoteKeyedStateHandle)previousStateHandles.remove()).discardState();
                }
                while (!previousStateHandles.isEmpty()) {
                    Mockito.reset((Object[])new SharedStateRegistry[]{sharedStateRegistry});
                    ((IncrementalRemoteKeyedStateHandle)previousStateHandles.remove()).discardState();
                }
            }
            finally {
                IOUtils.closeQuietly((AutoCloseable)backend);
                backend.dispose();
            }
        }
    }

    @TestTemplate
    public void testMapStateClear() throws Exception {
        this.setupRocksKeyedStateBackend();
        MapStateDescriptor kvId = new MapStateDescriptor("id", Integer.class, String.class);
        MapState state = (MapState)this.keyedStateBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
        ((RocksDB)Mockito.doAnswer(invocationOnMock -> {
            throw new RocksDBException("Artificial failure");
        }).when((Object)this.keyedStateBackend.db)).newIterator((ColumnFamilyHandle)ArgumentMatchers.any(ColumnFamilyHandle.class), (ReadOptions)ArgumentMatchers.any(ReadOptions.class));
        Assertions.assertThatThrownBy(() -> ((MapState)state).clear()).isInstanceOf(FlinkRuntimeException.class);
    }

    @TestTemplate
    public void testConfigureTernaryBooleanConfigs() throws Exception {
        ConfigurableStateBackend stateBackend = this.getStateBackend();
        if (!(stateBackend instanceof EmbeddedRocksDBStateBackend)) {
            return;
        }
        EmbeddedRocksDBStateBackend rocksDBStateBackend = (EmbeddedRocksDBStateBackend)stateBackend;
        Configuration baseConfig = this.createBackendConfig();
        Configuration testConfig = new Configuration();
        testConfig.set(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, (Object)((Boolean)RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE.defaultValue() == false ? 1 : 0));
        testConfig.set(RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE, (Object)((Boolean)RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue() == false ? 1 : 0));
        testConfig.set(RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING, (Object)((Boolean)RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue() == false ? 1 : 0));
        EmbeddedRocksDBStateBackend configuredBackend = rocksDBStateBackend.configure((ReadableConfig)testConfig, Thread.currentThread().getContextClassLoader());
        this.checkBooleanWithBaseConf(baseConfig, (ConfigOption<Boolean>)RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, configuredBackend.getUseIngestDbRestoreMode());
        this.checkBooleanWithBaseConf(baseConfig, (ConfigOption<Boolean>)RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING, configuredBackend.isRescalingUseDeleteFilesInRange());
        this.checkBooleanWithBaseConf(baseConfig, (ConfigOption<Boolean>)RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE, configuredBackend.getIncrementalRestoreAsyncCompactAfterRescale());
    }

    private void checkBooleanWithBaseConf(Configuration testConfig, ConfigOption<Boolean> option, boolean value) {
        org.junit.jupiter.api.Assertions.assertEquals((Object)testConfig.getOptional(option).orElse((Boolean)option.defaultValue() == false), (Object)value);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    public void testSmallFilesCompaction() throws Exception {
        ValueStateDescriptor kvId = new ValueStateDescriptor("id", String.class);
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        KeyGroupRange range = KeyGroupRange.of((int)0, (int)49);
        double expectedNumSstFiles = (double)range.getNumberOfKeyGroups() * 0.5;
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, range.getEndKeyGroup() - range.getStartKeyGroup() + 1, range, (Environment)this.env);
        try {
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            for (int i = range.getStartKeyGroup(); i < range.getEndKeyGroup(); ++i) {
                backend.setCurrentKey((Object)i);
                state.update((Object)Integer.toString(i));
                EmbeddedRocksDBStateBackendTest.runSnapshot((RunnableFuture)backend.snapshot((long)i, (long)i, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            }
            File sstPath = new File(this.dbPath).listFiles()[0].listFiles()[0];
            int length = sstPath.listFiles((dir, name) -> name.endsWith(".sst")).length;
            Assertions.assertThat((int)length).isLessThanOrEqualTo((int)expectedNumSstFiles).withFailMessage("actual: " + length + ", expected: " + expectedNumSstFiles, new Object[0]);
        }
        finally {
            IOUtils.closeQuietly((AutoCloseable)backend);
            backend.dispose();
        }
        Thread.sleep(100L);
    }

    private void runStateUpdates() throws Exception {
        for (int i = 50; i < 150; ++i) {
            if (i % 10 == 0) {
                Thread.sleep(1L);
            }
            this.keyedStateBackend.setCurrentKey((Object)i);
            this.testState1.update((Object)(4200 + i));
            this.testState2.update((Object)("S-" + (4200 + i)));
        }
    }

    private void verifyRocksObjectsReleased() {
        for (RocksObject rocksCloseable : this.allCreatedCloseables) {
            ((RocksObject)Mockito.verify((Object)rocksCloseable, (VerificationMode)VerificationModeFactory.times((int)1))).close();
        }
        Assertions.assertThat((Object)this.keyedStateBackend.db).isNotNull();
        RocksDB spyDB = this.keyedStateBackend.db;
        this.keyedStateBackend.dispose();
        ((RocksDB)Mockito.verify((Object)spyDB, (VerificationMode)VerificationModeFactory.times((int)1))).close();
        Assertions.assertThat((boolean)this.keyedStateBackend.isDisposed()).isTrue();
    }

    private void verifyRocksDBStateUploaderClosed() throws IOException {
        if (this.enableIncrementalCheckpointing) {
            ((RocksDBStateUploader)Mockito.verify((Object)this.rocksDBStateUploader, (VerificationMode)VerificationModeFactory.times((int)1))).close();
        }
    }

    protected <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, List<KeyedStateHandle> state, Environment env) throws Exception {
        CheckpointableKeyedStateBackend restoreResult = super.restoreKeyedBackend(keySerializer, numberOfKeyGroups, keyGroupRange, state, env);
        if (this.checkMetrics() && !CollectionUtil.isEmptyOrAllElementsNull(state)) {
            Assertions.assertThat(this.initMetricBackingMap.keySet()).containsExactlyInAnyOrder((Object[])new String[]{"RestoreStateDurationMs", "DownloadStateDurationMs"});
        }
        return restoreResult;
    }

    protected boolean checkMetrics() {
        return true;
    }

    private static class AcceptAllFilter
    implements IOFileFilter {
        private AcceptAllFilter() {
        }

        public boolean accept(File file) {
            return true;
        }

        public boolean accept(File file, String s) {
            return true;
        }
    }
}

