package org.apache.flink.state.rocksdb;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.state.SharedStateRegistryKey;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendTestBase;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.rocksdb.sstmerge.RocksDBManualCompactionOptions;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.RocksObject;
import org.rocksdb.Snapshot;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest.class */
public class EmbeddedRocksDBStateBackendTest extends StateBackendTestBase<EmbeddedRocksDBStateBackend> {

    @TempDir
    private static Path tempFolder;
    private OneShotLatch blocker;
    private OneShotLatch waiter;
    private BlockerCheckpointStreamFactory testStreamFactory;
    private RocksDBKeyedStateBackend<Integer> keyedStateBackend;
    private List<RocksObject> allCreatedCloseables;
    private ValueState<Integer> testState1;
    private ValueState<String> testState2;

    @Parameter(0)
    public boolean enableIncrementalCheckpointing;

    @Parameter(1)
    public SupplierWithException<CheckpointStorage, IOException> storageSupplier;

    @Parameter(2)
    public boolean useIngestDB;
    private String dbPath;
    private RocksDB db = null;
    private ColumnFamilyHandle defaultCFHandle = null;
    private RocksDBStateUploader rocksDBStateUploader = null;
    private final RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer();
    private final HashMap<String, Long> initMetricBackingMap = new HashMap<>();

    /* loaded from: input_file:org/apache/flink/state/rocksdb/EmbeddedRocksDBStateBackendTest$AcceptAllFilter.class */
    private static class AcceptAllFilter implements IOFileFilter {
        private AcceptAllFilter() {
        }

        public boolean accept(File file) {
            return true;
        }

        public boolean accept(File file, String str) {
            return true;
        }
    }

    @Parameters
    public static List<Object[]> modes() {
        return Arrays.asList(new Object[]{true, JobManagerCheckpointStorage::new, false}, new Object[]{true, JobManagerCheckpointStorage::new, true}, new Object[]{false, () -> {
            return new FileSystemCheckpointStorage(new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(tempFolder).toURI().toString()), 0, -1);
        }, false});
    }

    public void prepareRocksDB() throws Exception {
        String absolutePath = RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(TempDirUtils.newFolder(tempFolder)).getAbsolutePath();
        ColumnFamilyOptions columnOptions = this.optionsContainer.getColumnOptions();
        ArrayList arrayList = new ArrayList(1);
        this.db = RocksDBOperationUtils.openDB(absolutePath, Collections.emptyList(), arrayList, columnOptions, this.optionsContainer.getDbOptions());
        this.defaultCFHandle = (ColumnFamilyHandle) arrayList.remove(0);
    }

    protected ConfigurableStateBackend getStateBackend() throws IOException {
        this.dbPath = TempDirUtils.newFolder(tempFolder).getAbsolutePath();
        EmbeddedRocksDBStateBackend configure = new EmbeddedRocksDBStateBackend(this.enableIncrementalCheckpointing).configure(createBackendConfig(), Thread.currentThread().getContextClassLoader());
        configure.setDbStoragePath(this.dbPath);
        return configure;
    }

    private Configuration createBackendConfig() {
        Configuration configuration = new Configuration();
        configuration.set(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, Boolean.valueOf(this.useIngestDB));
        configuration.set(RocksDBOptions.TIMER_SERVICE_FACTORY, EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
        configuration.set(RocksDBManualCompactionOptions.MIN_INTERVAL, Duration.ofMillis(1L));
        return configuration;
    }

    protected StateBackend.CustomInitializationMetrics getCustomInitializationMetrics() {
        return (str, j) -> {
            this.initMetricBackingMap.compute(str, (str, l) -> {
                return Long.valueOf(l == null ? j : j + l.longValue());
            });
        };
    }

    protected CheckpointStorage getCheckpointStorage() throws Exception {
        return (CheckpointStorage) this.storageSupplier.get();
    }

    protected boolean isSerializerPresenceRequiredOnRestore() {
        return false;
    }

    protected boolean supportsAsynchronousSnapshots() {
        return true;
    }

    protected boolean isSafeToReuseKVState() {
        return true;
    }

    @AfterEach
    public void cleanupRocksDB() {
        if (this.keyedStateBackend != null) {
            IOUtils.closeQuietly(this.keyedStateBackend);
            this.keyedStateBackend.dispose();
        }
        IOUtils.closeQuietly(this.defaultCFHandle);
        IOUtils.closeQuietly(this.db);
        IOUtils.closeQuietly(this.optionsContainer);
        if (this.allCreatedCloseables != null) {
            Iterator<RocksObject> it = this.allCreatedCloseables.iterator();
            while (it.hasNext()) {
                ((RocksObject) Mockito.verify(it.next(), VerificationModeFactory.times(1))).close();
            }
            this.allCreatedCloseables = null;
        }
    }

    public void setupRocksKeyedStateBackend() throws Exception {
        this.blocker = new OneShotLatch();
        this.waiter = new OneShotLatch();
        this.testStreamFactory = new BlockerCheckpointStreamFactory(1048576);
        this.testStreamFactory.setBlockerLatch(this.blocker);
        this.testStreamFactory.setWaiterLatch(this.waiter);
        this.testStreamFactory.setAfterNumberInvocations(10);
        prepareRocksDB();
        RocksDBKeyedStateBackendBuilder useIngestDbRestoreMode = RocksDBTestUtils.builderForTestDB(TempDirUtils.newFolder(tempFolder), IntSerializer.INSTANCE, (RocksDB) Mockito.spy(this.db), this.defaultCFHandle, this.optionsContainer.getColumnOptions()).setEnableIncrementalCheckpointing(this.enableIncrementalCheckpointing).setUseIngestDbRestoreMode(this.useIngestDB);
        if (this.enableIncrementalCheckpointing) {
            this.rocksDBStateUploader = (RocksDBStateUploader) Mockito.spy(new RocksDBStateUploader(((Integer) RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue()).intValue()));
            useIngestDbRestoreMode.setRocksDBStateUploader(this.rocksDBStateUploader);
        }
        this.keyedStateBackend = useIngestDbRestoreMode.build();
        this.testState1 = this.keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("TestState-1", Integer.class, 0));
        this.testState2 = this.keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("TestState-2", String.class, ""));
        this.allCreatedCloseables = new ArrayList();
        ((RocksDB) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendTest.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                RocksObject rocksObject = (RocksIterator) Mockito.spy((RocksIterator) invocationOnMock.callRealMethod());
                EmbeddedRocksDBStateBackendTest.this.allCreatedCloseables.add(rocksObject);
                return rocksObject;
            }
        }).when(this.keyedStateBackend.db)).newIterator((ColumnFamilyHandle) ArgumentMatchers.any(ColumnFamilyHandle.class), (ReadOptions) ArgumentMatchers.any(ReadOptions.class));
        ((RocksDB) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendTest.2
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                RocksObject rocksObject = (Snapshot) Mockito.spy((Snapshot) invocationOnMock.callRealMethod());
                EmbeddedRocksDBStateBackendTest.this.allCreatedCloseables.add(rocksObject);
                return rocksObject;
            }
        }).when(this.keyedStateBackend.db)).getSnapshot();
        ((RocksDB) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendTest.3
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                RocksObject rocksObject = (ColumnFamilyHandle) Mockito.spy((ColumnFamilyHandle) invocationOnMock.callRealMethod());
                EmbeddedRocksDBStateBackendTest.this.allCreatedCloseables.add(rocksObject);
                return rocksObject;
            }
        }).when(this.keyedStateBackend.db)).createColumnFamily((ColumnFamilyDescriptor) ArgumentMatchers.any(ColumnFamilyDescriptor.class));
        for (int i = 0; i < 100; i++) {
            this.keyedStateBackend.setCurrentKey(Integer.valueOf(i));
            this.testState1.update(Integer.valueOf(4200 + i));
            this.testState2.update("S-" + (4200 + i));
        }
    }

    @TestTemplate
    public void testCorrectMergeOperatorSet() throws Exception {
        prepareRocksDB();
        ColumnFamilyOptions columnFamilyOptions = (ColumnFamilyOptions) Mockito.spy(new ColumnFamilyOptions());
        try {
            RocksDBKeyedStateBackend build = RocksDBTestUtils.builderForTestDB(TempDirUtils.newFolder(tempFolder), IntSerializer.INSTANCE, this.db, this.defaultCFHandle, columnFamilyOptions).setEnableIncrementalCheckpointing(this.enableIncrementalCheckpointing).setUseIngestDbRestoreMode(this.useIngestDB).build();
            try {
                build.createOrUpdateInternalState(StringSerializer.INSTANCE, new ValueStateDescriptor("StubState-1", StringSerializer.INSTANCE));
                build.createOrUpdateInternalState(StringSerializer.INSTANCE, new ValueStateDescriptor("StubState-2", StringSerializer.INSTANCE));
                ((ColumnFamilyOptions) Mockito.verify(columnFamilyOptions, Mockito.times(2))).setMergeOperatorName("stringappendtest");
                if (build != null) {
                    build.close();
                }
                if (columnFamilyOptions != null) {
                    columnFamilyOptions.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (columnFamilyOptions != null) {
                try {
                    columnFamilyOptions.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    public void testReleasingSnapshotAfterBackendClosed() throws Exception {
        setupRocksKeyedStateBackend();
        try {
            RunnableFuture snapshot = this.keyedStateBackend.snapshot(0L, 0L, this.testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            RocksDB rocksDB = this.keyedStateBackend.db;
            Iterator<RocksObject> it = this.allCreatedCloseables.iterator();
            while (it.hasNext()) {
                ((RocksObject) Mockito.verify(it.next(), VerificationModeFactory.times(0))).close();
            }
            snapshot.cancel(true);
            this.keyedStateBackend.dispose();
            ((RocksDB) Mockito.verify(rocksDB, VerificationModeFactory.times(1))).close();
            Assertions.assertThat(this.keyedStateBackend.isDisposed()).isTrue();
            Iterator<RocksObject> it2 = this.allCreatedCloseables.iterator();
            while (it2.hasNext()) {
                ((RocksObject) Mockito.verify(it2.next(), VerificationModeFactory.times(1))).close();
            }
            verifyRocksDBStateUploaderClosed();
        } finally {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
        }
    }

    @TestTemplate
    public void testDismissingSnapshot() throws Exception {
        setupRocksKeyedStateBackend();
        try {
            this.keyedStateBackend.snapshot(0L, 0L, this.testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()).cancel(true);
            verifyRocksObjectsReleased();
            verifyRocksDBStateUploaderClosed();
        } finally {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
        }
    }

    @TestTemplate
    public void testDismissingSnapshotNotRunnable() throws Exception {
        setupRocksKeyedStateBackend();
        try {
            RunnableFuture snapshot = this.keyedStateBackend.snapshot(0L, 0L, this.testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            snapshot.cancel(true);
            Thread thread = new Thread(snapshot);
            thread.start();
            Objects.requireNonNull(snapshot);
            Assertions.assertThatThrownBy(snapshot::get);
            thread.join();
            verifyRocksObjectsReleased();
            verifyRocksDBStateUploaderClosed();
        } finally {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
        }
    }

    @TestTemplate
    public void testCompletingSnapshot() throws Exception {
        setupRocksKeyedStateBackend();
        try {
            RunnableFuture snapshot = this.keyedStateBackend.snapshot(0L, 0L, this.testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread thread = new Thread(snapshot);
            thread.start();
            this.waiter.await();
            this.waiter.reset();
            runStateUpdates();
            this.blocker.trigger();
            this.waiter.await();
            KeyedStateHandle jobManagerOwnedSnapshot = ((SnapshotResult) snapshot.get()).getJobManagerOwnedSnapshot();
            Assertions.assertThat(jobManagerOwnedSnapshot).isNotNull();
            Assertions.assertThat(jobManagerOwnedSnapshot.getStateSize()).isGreaterThan(0L);
            Assertions.assertThat(jobManagerOwnedSnapshot.getKeyGroupRange().getNumberOfKeyGroups()).isEqualTo(2);
            Iterator it = this.testStreamFactory.getAllCreatedStreams().iterator();
            while (it.hasNext()) {
                Assertions.assertThat(((BlockingCheckpointOutputStream) it.next()).isClosed()).isTrue();
            }
            thread.join();
            verifyRocksObjectsReleased();
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
            verifyRocksDBStateUploaderClosed();
        } catch (Throwable th) {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
            throw th;
        }
    }

    @TestTemplate
    public void testCancelRunningSnapshot() throws Exception {
        setupRocksKeyedStateBackend();
        try {
            RunnableFuture snapshot = this.keyedStateBackend.snapshot(0L, 0L, this.testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            Thread thread = new Thread(snapshot);
            thread.start();
            this.waiter.await();
            this.waiter.reset();
            runStateUpdates();
            snapshot.cancel(true);
            this.blocker.trigger();
            Iterator it = this.testStreamFactory.getAllCreatedStreams().iterator();
            while (it.hasNext()) {
                Assertions.assertThat(((BlockingCheckpointOutputStream) it.next()).isClosed()).isTrue();
            }
            this.waiter.await();
            Objects.requireNonNull(snapshot);
            Assertions.assertThatThrownBy(snapshot::get);
            thread.join();
            verifyRocksObjectsReleased();
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
            verifyRocksDBStateUploaderClosed();
        } catch (Throwable th) {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
            throw th;
        }
    }

    @TestTemplate
    public void testDisposeDeletesAllDirectories() throws Exception {
        CheckpointableKeyedStateBackend createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        Collection listFilesAndDirs = FileUtils.listFilesAndDirs(new File(this.dbPath), new AcceptAllFilter(), new AcceptAllFilter());
        try {
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class, (Object) null);
            valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
            ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            createKeyedBackend.setCurrentKey(1);
            partitionedState.update("Hello");
            Assertions.assertThat(listFilesAndDirs.size()).isGreaterThan(1);
            IOUtils.closeQuietly(createKeyedBackend);
            createKeyedBackend.dispose();
            Assertions.assertThat(FileUtils.listFilesAndDirs(new File(this.dbPath), new AcceptAllFilter(), new AcceptAllFilter())).hasSize(1);
        } catch (Throwable th) {
            IOUtils.closeQuietly(createKeyedBackend);
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @TestTemplate
    public void testSharedIncrementalStateDeRegistration() throws Exception {
        if (this.enableIncrementalCheckpointing) {
            CheckpointListener createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
            try {
                ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class, (Object) null);
                valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
                ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
                LinkedList linkedList = new LinkedList();
                SharedStateRegistry sharedStateRegistry = (SharedStateRegistry) Mockito.spy(new SharedStateRegistryImpl());
                for (int i = 0; i < 3; i++) {
                    Mockito.reset(new SharedStateRegistry[]{sharedStateRegistry});
                    createKeyedBackend.setCurrentKey(Integer.valueOf(i));
                    partitionedState.update("Hello-" + i);
                    RunnableFuture snapshot = createKeyedBackend.snapshot(i, i, createStreamFactory(), CheckpointOptions.forCheckpointWithDefaultLocation());
                    snapshot.run();
                    IncrementalRemoteKeyedStateHandle jobManagerOwnedSnapshot = ((SnapshotResult) snapshot.get()).getJobManagerOwnedSnapshot();
                    List<IncrementalKeyedStateHandle.HandleAndLocalPath> list = (List) jobManagerOwnedSnapshot.getSharedState().stream().map(handleAndLocalPath -> {
                        return IncrementalKeyedStateHandle.HandleAndLocalPath.of(handleAndLocalPath.getHandle(), handleAndLocalPath.getLocalPath());
                    }).collect(Collectors.toList());
                    jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry, i);
                    for (IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath2 : list) {
                        ((SharedStateRegistry) Mockito.verify(sharedStateRegistry)).registerReference(SharedStateRegistryKey.forStreamStateHandle(handleAndLocalPath2.getHandle()), handleAndLocalPath2.getHandle(), i);
                    }
                    linkedList.add(jobManagerOwnedSnapshot);
                    createKeyedBackend.notifyCheckpointComplete(i);
                    if (linkedList.size() > 1) {
                        ((IncrementalRemoteKeyedStateHandle) linkedList.remove()).discardState();
                    }
                }
                while (!linkedList.isEmpty()) {
                    Mockito.reset(new SharedStateRegistry[]{sharedStateRegistry});
                    ((IncrementalRemoteKeyedStateHandle) linkedList.remove()).discardState();
                }
            } finally {
                IOUtils.closeQuietly(createKeyedBackend);
                createKeyedBackend.dispose();
            }
        }
    }

    @TestTemplate
    public void testMapStateClear() throws Exception {
        setupRocksKeyedStateBackend();
        MapState partitionedState = this.keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new MapStateDescriptor("id", Integer.class, String.class));
        ((RocksDB) Mockito.doAnswer(invocationOnMock -> {
            throw new RocksDBException("Artificial failure");
        }).when(this.keyedStateBackend.db)).newIterator((ColumnFamilyHandle) ArgumentMatchers.any(ColumnFamilyHandle.class), (ReadOptions) ArgumentMatchers.any(ReadOptions.class));
        Objects.requireNonNull(partitionedState);
        Assertions.assertThatThrownBy(partitionedState::clear).isInstanceOf(FlinkRuntimeException.class);
    }

    @TestTemplate
    public void testConfigureTernaryBooleanConfigs() throws Exception {
        EmbeddedRocksDBStateBackend stateBackend = getStateBackend();
        if (stateBackend instanceof EmbeddedRocksDBStateBackend) {
            EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = stateBackend;
            Configuration createBackendConfig = createBackendConfig();
            Configuration configuration = new Configuration();
            configuration.set(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, Boolean.valueOf(!((Boolean) RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE.defaultValue()).booleanValue()));
            configuration.set(RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE, Boolean.valueOf(!((Boolean) RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue()).booleanValue()));
            configuration.set(RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING, Boolean.valueOf(!((Boolean) RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue()).booleanValue()));
            EmbeddedRocksDBStateBackend configure = embeddedRocksDBStateBackend.configure(configuration, Thread.currentThread().getContextClassLoader());
            checkBooleanWithBaseConf(createBackendConfig, RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, configure.getUseIngestDbRestoreMode());
            checkBooleanWithBaseConf(createBackendConfig, RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING, configure.isRescalingUseDeleteFilesInRange());
            checkBooleanWithBaseConf(createBackendConfig, RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE, configure.getIncrementalRestoreAsyncCompactAfterRescale());
        }
    }

    private void checkBooleanWithBaseConf(Configuration configuration, ConfigOption<Boolean> configOption, boolean z) {
        org.junit.jupiter.api.Assertions.assertEquals(configuration.getOptional(configOption).orElse(Boolean.valueOf(!((Boolean) configOption.defaultValue()).booleanValue())), Boolean.valueOf(z));
    }

    @TestTemplate
    public void testSmallFilesCompaction() throws Exception {
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class);
        SharedStateRegistryImpl sharedStateRegistryImpl = new SharedStateRegistryImpl();
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        KeyGroupRange of = KeyGroupRange.of(0, 49);
        double numberOfKeyGroups = of.getNumberOfKeyGroups() * 0.5d;
        CheckpointableKeyedStateBackend createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, (of.getEndKeyGroup() - of.getStartKeyGroup()) + 1, of, this.env);
        try {
            ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            for (int startKeyGroup = of.getStartKeyGroup(); startKeyGroup < of.getEndKeyGroup(); startKeyGroup++) {
                createKeyedBackend.setCurrentKey(Integer.valueOf(startKeyGroup));
                partitionedState.update(Integer.toString(startKeyGroup));
                runSnapshot(createKeyedBackend.snapshot(startKeyGroup, startKeyGroup, createStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), sharedStateRegistryImpl);
            }
            int length = new File(this.dbPath).listFiles()[0].listFiles()[0].listFiles((file, str) -> {
                return str.endsWith(".sst");
            }).length;
            Assertions.assertThat(length).isLessThanOrEqualTo((int) numberOfKeyGroups).withFailMessage("actual: " + length + ", expected: " + numberOfKeyGroups, new Object[0]);
            IOUtils.closeQuietly(createKeyedBackend);
            createKeyedBackend.dispose();
            Thread.sleep(100L);
        } catch (Throwable th) {
            IOUtils.closeQuietly(createKeyedBackend);
            createKeyedBackend.dispose();
            throw th;
        }
    }

    private void runStateUpdates() throws Exception {
        for (int i = 50; i < 150; i++) {
            if (i % 10 == 0) {
                Thread.sleep(1L);
            }
            this.keyedStateBackend.setCurrentKey(Integer.valueOf(i));
            this.testState1.update(Integer.valueOf(4200 + i));
            this.testState2.update("S-" + (4200 + i));
        }
    }

    private void verifyRocksObjectsReleased() {
        Iterator<RocksObject> it = this.allCreatedCloseables.iterator();
        while (it.hasNext()) {
            ((RocksObject) Mockito.verify(it.next(), VerificationModeFactory.times(1))).close();
        }
        Assertions.assertThat(this.keyedStateBackend.db).isNotNull();
        RocksDB rocksDB = this.keyedStateBackend.db;
        this.keyedStateBackend.dispose();
        ((RocksDB) Mockito.verify(rocksDB, VerificationModeFactory.times(1))).close();
        Assertions.assertThat(this.keyedStateBackend.isDisposed()).isTrue();
    }

    private void verifyRocksDBStateUploaderClosed() throws IOException {
        if (this.enableIncrementalCheckpointing) {
            ((RocksDBStateUploader) Mockito.verify(this.rocksDBStateUploader, VerificationModeFactory.times(1))).close();
        }
    }

    protected <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, List<KeyedStateHandle> list, Environment environment) throws Exception {
        CheckpointableKeyedStateBackend<K> restoreKeyedBackend = super.restoreKeyedBackend(typeSerializer, i, keyGroupRange, list, environment);
        if (checkMetrics() && !CollectionUtil.isEmptyOrAllElementsNull(list)) {
            Assertions.assertThat(this.initMetricBackingMap.keySet()).containsExactlyInAnyOrder(new String[]{"RestoreStateDurationMs", "DownloadStateDurationMs"});
        }
        return restoreKeyedBackend;
    }

    protected boolean checkMetrics() {
        return true;
    }
}
