/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst.datatransfer;

import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.state.forst.ForStPathContainer;
import org.apache.flink.state.forst.StateHandleTransferSpec;
import org.apache.flink.state.forst.datatransfer.DataTransferStrategy;
import org.apache.flink.state.forst.datatransfer.DataTransferStrategyBuilder;
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ForStStateDataTransfer
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(ForStStateDataTransfer.class);
    public static final int DEFAULT_THREAD_NUM = 4;
    protected final ExecutorService executorService;
    @Nullable
    private final ForStFlinkFileSystem forStFs;

    public ForStStateDataTransfer(int threadNum) {
        this(threadNum, null);
    }

    public ForStStateDataTransfer(int threadNum, ForStFlinkFileSystem forStFs) {
        this.forStFs = forStFs;
        this.executorService = threadNum > 1 ? Executors.newFixedThreadPool(threadNum, (ThreadFactory)new ExecutorThreadFactory("Flink-ForStStateDataTransfer")) : org.apache.flink.util.concurrent.Executors.newDirectExecutorService();
    }

    public IncrementalKeyedStateHandle.HandleAndLocalPath transferFileToCheckpointFs(SnapshotType.SharingFilesStrategy sharingFilesStrategy, Path file, long transferBytes, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, CloseableRegistry snapshotCloseableRegistry, CloseableRegistry tmpResourcesRegistry, boolean forceCopy) throws Exception {
        try {
            DataTransferStrategy strategy = DataTransferStrategyBuilder.buildForSnapshot(sharingFilesStrategy, this.forStFs, checkpointStreamFactory, forceCopy);
            return this.createTransferFuture(strategy, file, transferBytes, checkpointStreamFactory, stateScope, snapshotCloseableRegistry, tmpResourcesRegistry).get();
        }
        catch (ExecutionException e) {
            throw this.convertExecutionException(e);
        }
    }

    public List<IncrementalKeyedStateHandle.HandleAndLocalPath> transferFilesToCheckpointFs(SnapshotType.SharingFilesStrategy sharingFilesStrategy, List<Path> files, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, CloseableRegistry closeableRegistry, CloseableRegistry tmpResourcesRegistry, boolean forceCopy) throws Exception {
        DataTransferStrategy strategy = DataTransferStrategyBuilder.buildForSnapshot(sharingFilesStrategy, this.forStFs, checkpointStreamFactory, forceCopy);
        List futures = files.stream().map(file -> this.createTransferFuture(strategy, (Path)file, -1L, checkpointStreamFactory, stateScope, closeableRegistry, tmpResourcesRegistry)).collect(Collectors.toList());
        try {
            ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> handles = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>(files.size());
            for (CompletableFuture future : futures) {
                handles.add((IncrementalKeyedStateHandle.HandleAndLocalPath)future.get());
            }
            return handles;
        }
        catch (ExecutionException e) {
            throw this.convertExecutionException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public IncrementalKeyedStateHandle.HandleAndLocalPath writeFileToCheckpointFs(String filename, String fileContent, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, CloseableRegistry closeableRegistry, CloseableRegistry tmpResourcesRegistry) throws IOException {
        CheckpointStateOutputStream outputStream = null;
        try {
            StreamStateHandle result;
            outputStream = checkpointStreamFactory.createCheckpointStateOutputStream(stateScope);
            closeableRegistry.registerCloseable((AutoCloseable)outputStream);
            byte[] content = fileContent.getBytes(StandardCharsets.UTF_8);
            outputStream.write(content, 0, content.length);
            if (closeableRegistry.unregisterCloseable((AutoCloseable)outputStream)) {
                result = outputStream.closeAndGetHandle();
                outputStream = null;
            } else {
                result = null;
            }
            tmpResourcesRegistry.registerCloseable(() -> StateUtil.discardStateObjectQuietly((StateObject)result));
            IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath = IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)result, (String)filename);
            return handleAndLocalPath;
        }
        finally {
            if (closeableRegistry.unregisterCloseable((AutoCloseable)outputStream)) {
                IOUtils.closeQuietly((AutoCloseable)outputStream);
            }
        }
    }

    private CompletableFuture<IncrementalKeyedStateHandle.HandleAndLocalPath> createTransferFuture(DataTransferStrategy strategy, Path file, long transferBytes, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, CloseableRegistry closeableRegistry, CloseableRegistry tmpResourcesRegistry) {
        return CompletableFuture.supplyAsync(CheckedSupplier.unchecked(() -> strategy.transferToCheckpoint(file, transferBytes, checkpointStreamFactory, stateScope, closeableRegistry, tmpResourcesRegistry)), this.executorService);
    }

    private FileSystem getDbFileSystem() {
        return this.forStFs != null ? this.forStFs : FileSystem.getLocalFileSystem();
    }

    public void transferAllStateDataToDirectory(ForStPathContainer forStPathContainer, Collection<StateHandleTransferSpec> transferSpecs, CloseableRegistry closeableRegistry, RecoveryClaimMode recoveryClaimMode) throws Exception {
        CloseableRegistry internalCloser = new CloseableRegistry();
        closeableRegistry.registerCloseable((AutoCloseable)internalCloser);
        try {
            List futures = this.transferAllStateDataToDirectoryAsync(forStPathContainer, transferSpecs, internalCloser, recoveryClaimMode).collect(Collectors.toList());
            FutureUtils.completeAll(futures).get();
        }
        catch (ExecutionException e) {
            transferSpecs.stream().map(StateHandleTransferSpec::getTransferDestination).forEach(dir -> {
                try {
                    this.getDbFileSystem().delete(dir, true);
                }
                catch (IOException ignored) {
                    LOG.warn("Failed to delete transfer destination.", (Throwable)ignored);
                }
            });
            throw this.convertExecutionException(e);
        }
        finally {
            if (closeableRegistry.unregisterCloseable((AutoCloseable)internalCloser)) {
                IOUtils.closeQuietly((AutoCloseable)internalCloser);
            }
        }
    }

    private Stream<CompletableFuture<Void>> transferAllStateDataToDirectoryAsync(ForStPathContainer forStPathContainer, Collection<StateHandleTransferSpec> transferSpecs, CloseableRegistry closeableRegistry, RecoveryClaimMode recoveryClaimMode) {
        DataTransferStrategy strategy = DataTransferStrategyBuilder.buildForRestore(this.forStFs, forStPathContainer, transferSpecs, recoveryClaimMode);
        return transferSpecs.stream().flatMap(spec -> Stream.concat(spec.getStateHandle().getSharedState().stream(), spec.getStateHandle().getPrivateState().stream()).map(entry -> {
            String localPath = entry.getLocalPath();
            StreamStateHandle sourceHandle = entry.getHandle();
            Path targetPath = new Path(spec.getTransferDestination(), localPath);
            return ThrowingRunnable.unchecked(() -> strategy.transferFromCheckpoint(sourceHandle, targetPath, closeableRegistry));
        })).map(runnable -> CompletableFuture.runAsync(runnable, this.executorService));
    }

    @Override
    public void close() {
        this.executorService.shutdownNow();
    }

    private Exception convertExecutionException(ExecutionException e) {
        Throwable throwable = ExceptionUtils.stripExecutionException((Throwable)e);
        if ((throwable = ExceptionUtils.stripException((Throwable)throwable, RuntimeException.class)) instanceof IOException) {
            return (IOException)throwable;
        }
        return new FlinkRuntimeException("Failed to transfer data.", (Throwable)e);
    }
}

