/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.configuration.description.TextElement;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractManagedMemoryStateBackend;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.metrics.SizeTrackingStateConfig;
import org.apache.flink.state.forst.ConfigurableForStOptionsFactory;
import org.apache.flink.state.forst.ForStConfigurableOptions;
import org.apache.flink.state.forst.ForStKeyedStateBackend;
import org.apache.flink.state.forst.ForStKeyedStateBackendBuilder;
import org.apache.flink.state.forst.ForStMemoryConfiguration;
import org.apache.flink.state.forst.ForStMemoryControllerUtils;
import org.apache.flink.state.forst.ForStNativeMetricOptions;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.ForStOptions;
import org.apache.flink.state.forst.ForStOptionsFactory;
import org.apache.flink.state.forst.ForStPathContainer;
import org.apache.flink.state.forst.ForStResourceContainer;
import org.apache.flink.state.forst.ForStSharedResources;
import org.apache.flink.state.forst.sync.ForStPriorityQueueConfig;
import org.apache.flink.state.forst.sync.ForStSyncKeyedStateBackendBuilder;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.concurrent.FutureUtils;
import org.forstdb.NativeLibraryLoader;
import org.forstdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class ForStStateBackend
extends AbstractManagedMemoryStateBackend
implements ConfigurableStateBackend {
    public static final String CHECKPOINT_DIR_AS_PRIMARY_SHORTCUT = "checkpoint-dir";
    public static final String LOCAL_DIR_AS_PRIMARY_SHORTCUT = "local-dir";
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(ForStStateBackend.class);
    private static final double UNDEFINED_OVERLAP_FRACTION_THRESHOLD = -1.0;
    private static final int FORST_LIB_LOADING_ATTEMPTS = 3;
    private static boolean forStInitialized = false;
    private final TernaryBoolean enableIncrementalCheckpointing;
    @Nullable
    private Path remoteForStDirectory;
    @Nullable
    private File[] localForStDirectories;
    @Nullable
    private ReadableConfig configurableOptions;
    @Nullable
    private ForStOptionsFactory forStOptionsFactory;
    private final ForStMemoryConfiguration memoryConfiguration;
    private final ForStNativeMetricOptions nativeMetricOptions;
    private transient File[] initializedDbBasePaths;
    private transient JobID jobId;
    private transient int nextDirectory;
    private transient boolean isInitialized;
    private final ForStMemoryControllerUtils.ForStMemoryFactory forStMemoryFactory;
    private final ForStPriorityQueueConfig priorityQueueConfig;
    private final double overlapFractionThreshold;
    private final TernaryBoolean useIngestDbRestoreMode;
    private final TernaryBoolean rescalingUseDeleteFilesInRange;
    private RecoveryClaimMode recoveryClaimMode = RecoveryClaimMode.DEFAULT;
    private boolean remoteShareWithCheckpoint;
    private boolean forceSyncLocal;

    public ForStStateBackend() {
        this.enableIncrementalCheckpointing = TernaryBoolean.UNDEFINED;
        this.nativeMetricOptions = new ForStNativeMetricOptions();
        this.memoryConfiguration = new ForStMemoryConfiguration();
        this.priorityQueueConfig = new ForStPriorityQueueConfig();
        this.forStMemoryFactory = ForStMemoryControllerUtils.ForStMemoryFactory.DEFAULT;
        this.overlapFractionThreshold = -1.0;
        this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED;
        this.rescalingUseDeleteFilesInRange = TernaryBoolean.UNDEFINED;
        this.remoteShareWithCheckpoint = false;
        this.forceSyncLocal = true;
    }

    private ForStStateBackend(ForStStateBackend original, ReadableConfig config, ClassLoader classLoader) {
        this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined(((Boolean)config.get(CheckpointingOptions.INCREMENTAL_CHECKPOINTS)).booleanValue());
        this.memoryConfiguration = ForStMemoryConfiguration.fromOtherAndConfiguration(original.memoryConfiguration, config);
        this.memoryConfiguration.validate();
        this.remoteShareWithCheckpoint = false;
        if (original.remoteForStDirectory != null) {
            this.remoteForStDirectory = original.remoteForStDirectory;
        } else {
            String remoteDirStr = (String)config.get(ForStOptions.PRIMARY_DIRECTORY);
            if (CHECKPOINT_DIR_AS_PRIMARY_SHORTCUT.equals(remoteDirStr)) {
                this.remoteForStDirectory = null;
                this.remoteShareWithCheckpoint = true;
            } else {
                this.remoteForStDirectory = remoteDirStr == null || LOCAL_DIR_AS_PRIMARY_SHORTCUT.equals(remoteDirStr) ? null : new Path(remoteDirStr);
            }
        }
        this.forceSyncLocal = (Boolean)config.get(ForStOptions.SYNC_ENFORCE_LOCAL);
        this.priorityQueueConfig = ForStPriorityQueueConfig.fromOtherAndConfiguration(original.priorityQueueConfig, config);
        if (original.localForStDirectories != null) {
            this.localForStDirectories = original.localForStDirectories;
        } else {
            String forStLocalPaths = (String)config.get(ForStOptions.LOCAL_DIRECTORIES);
            if (forStLocalPaths != null) {
                String[] directories = forStLocalPaths.split(",|" + File.pathSeparator);
                try {
                    this.setLocalDbStoragePaths(directories);
                }
                catch (IllegalArgumentException e) {
                    throw new IllegalConfigurationException("Invalid configuration for ForSt state backend's local storage directories: " + e.getMessage(), (Throwable)e);
                }
            }
        }
        this.nativeMetricOptions = ForStNativeMetricOptions.fromConfig(config);
        this.configurableOptions = this.mergeConfigurableOptions(original.configurableOptions, config);
        try {
            this.forStOptionsFactory = this.configureOptionsFactory(original.forStOptionsFactory, (String)config.get(ForStOptions.OPTIONS_FACTORY), config, classLoader);
        }
        catch (DynamicCodeLoadingException e) {
            throw new FlinkRuntimeException((Throwable)e);
        }
        this.latencyTrackingConfigBuilder = original.latencyTrackingConfigBuilder.configure(config);
        this.sizeTrackingConfigBuilder = original.sizeTrackingConfigBuilder.configure(config);
        this.forStMemoryFactory = original.forStMemoryFactory;
        this.overlapFractionThreshold = original.overlapFractionThreshold == -1.0 ? (Double)config.get(ForStConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD) : original.overlapFractionThreshold;
        Preconditions.checkArgument((this.overlapFractionThreshold >= 0.0 && this.overlapFractionThreshold <= 1.0 ? 1 : 0) != 0, (Object)"Overlap fraction threshold of restoring should be between 0 and 1");
        this.useIngestDbRestoreMode = TernaryBoolean.mergeTernaryBooleanWithConfig((TernaryBoolean)original.useIngestDbRestoreMode, ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, (ReadableConfig)config);
        this.rescalingUseDeleteFilesInRange = TernaryBoolean.mergeTernaryBooleanWithConfig((TernaryBoolean)original.rescalingUseDeleteFilesInRange, ForStConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING, (ReadableConfig)config);
        if (config.getOptional(StateRecoveryOptions.RESTORE_MODE).isPresent()) {
            this.recoveryClaimMode = (RecoveryClaimMode)config.get(StateRecoveryOptions.RESTORE_MODE);
        }
    }

    public ForStStateBackend configure(ReadableConfig config, ClassLoader classLoader) {
        return new ForStStateBackend(this, config, classLoader);
    }

    private void lazyInitializeForJob(Environment env, String operatorIdentifier) throws IOException {
        if (this.isInitialized) {
            return;
        }
        this.jobId = env.getJobID();
        if (this.localForStDirectories == null) {
            this.initializedDbBasePaths = new File[]{env.getTaskManagerInfo().getTmpWorkingDirectory()};
        } else {
            ArrayList<File> dirs = new ArrayList<File>(this.localForStDirectories.length);
            StringBuilder errorMessage = new StringBuilder();
            for (File f : this.localForStDirectories) {
                File testDir = new File(f, UUID.randomUUID().toString());
                if (!testDir.mkdirs()) {
                    String msg = "Local DB files directory '" + f + "' does not exist and cannot be created. ";
                    LOG.error(msg);
                    errorMessage.append(msg);
                } else {
                    dirs.add(f);
                }
                testDir.delete();
            }
            if (dirs.isEmpty()) {
                throw new IOException("No local storage directories available. " + errorMessage);
            }
            this.initializedDbBasePaths = dirs.toArray(new File[0]);
        }
        this.nextDirectory = new Random().nextInt(this.initializedDbBasePaths.length);
        this.isInitialized = true;
    }

    private File getNextStoragePath() {
        int ni = this.nextDirectory + 1;
        this.nextDirectory = ni = ni >= this.initializedDbBasePaths.length ? 0 : ni;
        return this.initializedDbBasePaths[ni];
    }

    public boolean supportsAsyncKeyedStateBackend() {
        return true;
    }

    public <K> ForStKeyedStateBackend<K> createAsyncKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) throws IOException {
        Environment env = parameters.getEnv();
        String tempDir = env.getTaskManagerInfo().getTmpWorkingDirectory().getAbsolutePath();
        ForStStateBackend.ensureForStIsLoaded(tempDir, env.getAsyncOperationsThreadPool());
        String fileCompatibleIdentifier = parameters.getOperatorIdentifier().replaceAll("[^a-zA-Z0-9\\-]", "_");
        this.lazyInitializeForJob(env, fileCompatibleIdentifier);
        ForStPathContainer pathContainer = this.createForStPathContainer(fileCompatibleIdentifier, env, false);
        OpaqueMemoryResource<ForStSharedResources> sharedResources = ForStOperationUtils.allocateSharedCachesIfConfigured(this.memoryConfiguration, env, parameters.getManagedMemoryFraction(), LOG, this.forStMemoryFactory);
        if (sharedResources != null) {
            LOG.info("Obtained shared ForSt cache of size {} bytes", (Object)sharedResources.getSize());
        }
        ForStResourceContainer resourceContainer = this.createOptionsAndResourceContainer(sharedResources, pathContainer, env.getCheckpointStorageAccess(), parameters.getMetricGroup(), this.nativeMetricOptions.isStatisticsEnabled());
        ForStKeyedStateBackendBuilder builder = new ForStKeyedStateBackendBuilder(parameters.getOperatorIdentifier(), env.getUserCodeClassLoader().asClassLoader(), resourceContainer, stateName -> resourceContainer.getColumnOptions(), parameters.getKeySerializer(), parameters.getNumberOfKeyGroups(), parameters.getKeyGroupRange(), env.getExecutionConfig(), this.priorityQueueConfig, parameters.getTtlTimeProvider(), parameters.getMetricGroup(), parameters.getCustomInitializationMetrics(), parameters.getStateHandles(), parameters.getCancelStreamRegistry()).setEnableIncrementalCheckpointing(true).setNativeMetricOptions(resourceContainer.getMemoryWatcherOptions(this.nativeMetricOptions)).setOverlapFractionThreshold(this.overlapFractionThreshold == -1.0 ? (Double)ForStConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue() : this.overlapFractionThreshold).setUseIngestDbRestoreMode(this.useIngestDbRestoreMode.getOrDefault(((Boolean)ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE.defaultValue()).booleanValue())).setRescalingUseDeleteFilesInRange(this.rescalingUseDeleteFilesInRange.getOrDefault(((Boolean)ForStConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue()).booleanValue())).setRecoveryClaimMode(this.recoveryClaimMode);
        return builder.build();
    }

    public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) throws IOException {
        Environment env = parameters.getEnv();
        String tempDir = env.getTaskManagerInfo().getTmpWorkingDirectory().getAbsolutePath();
        ForStStateBackend.ensureForStIsLoaded(tempDir, env.getAsyncOperationsThreadPool());
        String fileCompatibleIdentifier = parameters.getOperatorIdentifier().replaceAll("[^a-zA-Z0-9\\-]", "_");
        this.lazyInitializeForJob(env, fileCompatibleIdentifier);
        ForStPathContainer pathContainer = this.createForStPathContainer(fileCompatibleIdentifier, env, this.forceSyncLocal);
        LocalRecoveryConfig localRecoveryConfig = env.getTaskStateManager().createLocalRecoveryConfig();
        OpaqueMemoryResource<ForStSharedResources> sharedResources = ForStOperationUtils.allocateSharedCachesIfConfigured(this.memoryConfiguration, env, parameters.getManagedMemoryFraction(), LOG, this.forStMemoryFactory);
        if (sharedResources != null) {
            LOG.info("Obtained shared RocksDB cache of size {} bytes", (Object)sharedResources.getSize());
        }
        ForStResourceContainer resourceContainer = this.createOptionsAndResourceContainer(sharedResources, pathContainer, env.getCheckpointStorageAccess(), parameters.getMetricGroup(), this.nativeMetricOptions.isStatisticsEnabled());
        ExecutionConfig executionConfig = env.getExecutionConfig();
        StreamCompressionDecorator keyGroupCompressionDecorator = ForStStateBackend.getCompressionDecorator((ExecutionConfig)executionConfig);
        LatencyTrackingStateConfig latencyTrackingStateConfig = ((LatencyTrackingStateConfig.Builder)this.latencyTrackingConfigBuilder.setMetricGroup(parameters.getMetricGroup())).build();
        SizeTrackingStateConfig sizeTrackingStateConfig = ((SizeTrackingStateConfig.Builder)this.sizeTrackingConfigBuilder.setMetricGroup(parameters.getMetricGroup())).build();
        ForStSyncKeyedStateBackendBuilder builder = new ForStSyncKeyedStateBackendBuilder(parameters.getOperatorIdentifier(), env.getUserCodeClassLoader().asClassLoader(), resourceContainer, stateName -> resourceContainer.getColumnOptions(), parameters.getKvStateRegistry(), parameters.getKeySerializer(), parameters.getNumberOfKeyGroups(), parameters.getKeyGroupRange(), executionConfig, localRecoveryConfig, this.priorityQueueConfig, parameters.getTtlTimeProvider(), latencyTrackingStateConfig, sizeTrackingStateConfig, parameters.getMetricGroup(), parameters.getCustomInitializationMetrics(), parameters.getStateHandles(), keyGroupCompressionDecorator, parameters.getCancelStreamRegistry()).setEnableIncrementalCheckpointing(this.isIncrementalCheckpointsEnabled()).setNativeMetricOptions(resourceContainer.getMemoryWatcherOptions(this.nativeMetricOptions)).setOverlapFractionThreshold(this.overlapFractionThreshold == -1.0 ? (Double)ForStConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue() : this.overlapFractionThreshold).setUseIngestDbRestoreMode(this.useIngestDbRestoreMode.getOrDefault(((Boolean)ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE.defaultValue()).booleanValue())).setRescalingUseDeleteFilesInRange(this.rescalingUseDeleteFilesInRange.getOrDefault(((Boolean)ForStConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue()).booleanValue())).setRecoveryClaimMode(this.recoveryClaimMode);
        return builder.build();
    }

    public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters parameters) throws Exception {
        boolean asyncSnapshots = true;
        return new DefaultOperatorStateBackendBuilder(parameters.getEnv().getUserCodeClassLoader().asClassLoader(), parameters.getEnv().getExecutionConfig(), true, parameters.getStateHandles(), parameters.getCancelStreamRegistry()).build();
    }

    private ForStOptionsFactory configureOptionsFactory(@Nullable ForStOptionsFactory originalOptionsFactory, @Nullable String factoryClassName, ReadableConfig config, ClassLoader classLoader) throws DynamicCodeLoadingException {
        ForStOptionsFactory optionsFactory = null;
        if (originalOptionsFactory != null) {
            if (originalOptionsFactory instanceof ConfigurableForStOptionsFactory) {
                originalOptionsFactory = ((ConfigurableForStOptionsFactory)originalOptionsFactory).configure(config);
            }
            LOG.info("Using application-defined options factory: {}.", (Object)originalOptionsFactory);
            optionsFactory = originalOptionsFactory;
        } else if (factoryClassName != null) {
            try {
                Class<ForStOptionsFactory> clazz = Class.forName(factoryClassName, false, classLoader).asSubclass(ForStOptionsFactory.class);
                optionsFactory = clazz.newInstance();
                if (optionsFactory instanceof ConfigurableForStOptionsFactory) {
                    optionsFactory = ((ConfigurableForStOptionsFactory)optionsFactory).configure(config);
                }
                LOG.info("Using configured options factory: {}.", (Object)optionsFactory);
            }
            catch (ClassNotFoundException e) {
                throw new DynamicCodeLoadingException("Cannot find configured options factory class: " + factoryClassName, (Throwable)e);
            }
            catch (ClassCastException | IllegalAccessException | InstantiationException e) {
                throw new DynamicCodeLoadingException("The class configured under '" + ForStOptions.OPTIONS_FACTORY.key() + "' is not a valid options factory (" + factoryClassName + ")", (Throwable)e);
            }
        }
        return optionsFactory;
    }

    public boolean isIncrementalCheckpointsEnabled() {
        return this.enableIncrementalCheckpointing.getOrDefault(((Boolean)CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue()).booleanValue());
    }

    public boolean supportsNoClaimRestoreMode() {
        return true;
    }

    public boolean supportsSavepointFormat(SavepointFormatType formatType) {
        return formatType == SavepointFormatType.NATIVE;
    }

    public void setLocalDbStoragePath(String path) {
        String[] stringArray;
        if (path == null) {
            stringArray = null;
        } else {
            String[] stringArray2 = new String[1];
            stringArray = stringArray2;
            stringArray2[0] = path;
        }
        this.setLocalDbStoragePaths(stringArray);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void setLocalDbStoragePaths(String ... paths) {
        if (paths == null) {
            this.localForStDirectories = null;
            return;
        }
        if (paths.length == 0) {
            throw new IllegalArgumentException("empty paths");
        }
        File[] pp = new File[paths.length];
        for (int i = 0; i < paths.length; ++i) {
            String path;
            String rawPath = paths[i];
            if (rawPath == null) {
                throw new IllegalArgumentException("null path");
            }
            URI uri = null;
            try {
                uri = new Path(rawPath).toUri();
            }
            catch (Exception exception) {
                // empty catch block
            }
            if (uri != null && uri.getScheme() != null) {
                if (!"file".equalsIgnoreCase(uri.getScheme())) throw new IllegalArgumentException("Path " + rawPath + " has a non-local scheme");
                path = uri.getPath();
            } else {
                path = rawPath;
            }
            pp[i] = new File(path);
            if (pp[i].isAbsolute()) continue;
            throw new IllegalArgumentException("Relative paths are not supported");
        }
        this.localForStDirectories = pp;
    }

    public String[] getLocalDbStoragePaths() {
        if (this.localForStDirectories == null) {
            return null;
        }
        String[] paths = new String[this.localForStDirectories.length];
        for (int i = 0; i < paths.length; ++i) {
            paths[i] = this.localForStDirectories[i].toString();
        }
        return paths;
    }

    public void setForStOptions(ForStOptionsFactory optionsFactory) {
        this.forStOptionsFactory = optionsFactory;
    }

    @Nullable
    public ForStOptionsFactory getForStOptions() {
        return this.forStOptionsFactory;
    }

    private ReadableConfig mergeConfigurableOptions(ReadableConfig base, ReadableConfig onTop) {
        if (base == null) {
            base = new Configuration();
        }
        Configuration configuration = new Configuration();
        Map baseMap = base.toMap();
        Map onTopMap = onTop.toMap();
        for (ConfigOption<?> option : ForStConfigurableOptions.CANDIDATE_CONFIGS) {
            Optional baseValue = base.getOptional(option);
            Optional topValue = onTop.getOptional(option);
            if (!topValue.isPresent() && !baseValue.isPresent()) continue;
            Object validValue = topValue.isPresent() ? topValue.get() : baseValue.get();
            ForStConfigurableOptions.checkArgumentValid(option, validValue);
            configuration.setString(option.key(), validValue.toString());
            String valueString = topValue.isPresent() ? (String)onTopMap.get(option.key()) : (String)baseMap.get(option.key());
            configuration.setString(option.key(), valueString);
        }
        return configuration;
    }

    ForStPathContainer createForStPathContainer(String operatorIdentifier, Environment env, boolean forceLocal) {
        String opChildPath = String.format("op_%s_attempt_%s", operatorIdentifier, env.getTaskInfo().getAttemptNumber());
        File localJobFile = new File(this.getNextStoragePath(), this.jobId.toHexString());
        Path localJobPath = new Path(localJobFile.getPath());
        Path localBasePath = new Path(new File(localJobFile, opChildPath).getAbsolutePath());
        if (forceLocal) {
            return ForStPathContainer.ofLocal(localJobPath, localBasePath);
        }
        Path remoteJobPath = null;
        Path remoteBasePath = null;
        if (this.remoteForStDirectory != null) {
            remoteJobPath = new Path(this.remoteForStDirectory, this.jobId.toHexString());
            remoteBasePath = new Path(remoteJobPath, opChildPath);
        } else if (this.remoteShareWithCheckpoint) {
            if (env.getCheckpointStorageAccess() instanceof FsCheckpointStorageAccess) {
                FsCheckpointStorageAccess fsCheckpointStorageAccess = (FsCheckpointStorageAccess)env.getCheckpointStorageAccess();
                remoteJobPath = fsCheckpointStorageAccess.getCheckpointsDirectory();
                Path sharedStateDirectory = fsCheckpointStorageAccess.getSharedStateDirectory();
                remoteBasePath = new Path(sharedStateDirectory, opChildPath);
                LOG.info("Set remote ForSt directory to checkpoint directory {}", (Object)remoteBasePath);
            } else {
                LOG.warn("Remote ForSt directory can't be set, because checkpoint directory isn't on file system.");
            }
        }
        return ForStPathContainer.of(localJobPath, localBasePath, remoteJobPath, remoteBasePath);
    }

    @VisibleForTesting
    ForStResourceContainer createOptionsAndResourceContainer(@Nullable Path localBasePath) {
        return this.createOptionsAndResourceContainer(null, ForStPathContainer.ofLocal(null, localBasePath), null, null, false);
    }

    @VisibleForTesting
    private ForStResourceContainer createOptionsAndResourceContainer(@Nullable OpaqueMemoryResource<ForStSharedResources> sharedResources, ForStPathContainer pathContainer, @Nullable CheckpointStorageAccess checkpointStorageAccess, @Nullable MetricGroup metricGroup, boolean enableStatistics) {
        return new ForStResourceContainer((ReadableConfig)(this.configurableOptions != null ? this.configurableOptions : new Configuration()), this.forStOptionsFactory, sharedResources, pathContainer, this.recoveryClaimMode, checkpointStorageAccess, metricGroup, enableStatistics);
    }

    public String toString() {
        return "ForStStateBackend{, localForStDirectories=" + Arrays.toString(this.localForStDirectories) + ", remoteForStDirectory=" + this.remoteForStDirectory + "}";
    }

    @VisibleForTesting
    static void ensureForStIsLoaded(String tempDirectory, Executor executor) throws IOException {
        ForStStateBackend.ensureForStIsLoaded(tempDirectory, NativeLibraryLoader::getInstance, executor);
    }

    @VisibleForTesting
    static void setForStInitialized(boolean initialized) {
        forStInitialized = initialized;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    static void ensureForStIsLoaded(String tempDirectory, Supplier<NativeLibraryLoader> nativeLibraryLoaderSupplier, Executor executor) throws IOException {
        Class<ForStStateBackend> clazz = ForStStateBackend.class;
        synchronized (ForStStateBackend.class) {
            if (!forStInitialized) {
                File tempDirParent = new File(tempDirectory).getAbsoluteFile();
                LOG.info("Attempting to load ForSt native library and store it under '{}'", (Object)tempDirParent);
                Throwable lastException = null;
                for (int attempt = 1; attempt <= 3; ++attempt) {
                    AtomicReference<Object> rocksLibFolder = new AtomicReference<Object>(null);
                    try {
                        CompletableFuture future = FutureUtils.runAsync(() -> {
                            File libFolder = new File(tempDirParent, "rocksdb-lib-" + new AbstractID());
                            rocksLibFolder.set(libFolder);
                            LOG.debug("Attempting to create ForSt native library folder {}", (Object)libFolder);
                            libFolder.mkdirs();
                            ((NativeLibraryLoader)nativeLibraryLoaderSupplier.get()).loadLibrary(libFolder.getAbsolutePath());
                            RocksDB.loadLibrary();
                        }, (Executor)executor);
                        future.get();
                        LOG.info("Successfully loaded ForSt native library");
                        forStInitialized = true;
                        // ** MonitorExit[var3_3] (shouldn't be in output)
                        return;
                    }
                    catch (Throwable t) {
                        lastException = t;
                        LOG.debug("ForSt JNI library loading attempt {} failed", (Object)attempt, (Object)t);
                        try {
                            ForStStateBackend.resetForStLoadedFlag();
                        }
                        catch (Throwable tt) {
                            LOG.debug("Failed to reset 'initialized' flag in ForSt native code loader", tt);
                        }
                        FileUtils.deleteDirectoryQuietly((File)rocksLibFolder.get());
                        continue;
                    }
                }
                throw new IOException("Could not load the native ForSt library", lastException);
            }
            // ** MonitorExit[var3_3] (shouldn't be in output)
            return;
        }
    }

    @VisibleForTesting
    static void resetForStLoadedFlag() throws Exception {
        Field initField = NativeLibraryLoader.class.getDeclaredField("initialized");
        initField.setAccessible(true);
        initField.setBoolean(null, false);
    }

    public static enum PriorityQueueStateType implements DescribedEnum
    {
        HEAP((InlineElement)TextElement.text((String)"Heap-based")),
        ForStDB((InlineElement)TextElement.text((String)"Implementation based on RocksDB"));

        private final InlineElement description;

        private PriorityQueueStateType(InlineElement description) {
            this.description = description;
        }

        public InlineElement getDescription() {
            return this.description;
        }
    }
}

