/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.rocksdb;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.state.rocksdb.RocksDBStateDataTransferHelper;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.CheckedSupplier;

public class RocksDBStateUploader
implements Closeable {
    private static final int READ_BUFFER_SIZE = 16384;
    private final RocksDBStateDataTransferHelper transfer;

    @VisibleForTesting
    public RocksDBStateUploader(int numberOfSnapshottingThreads) {
        this(RocksDBStateDataTransferHelper.forThreadNum(numberOfSnapshottingThreads));
    }

    public RocksDBStateUploader(RocksDBStateDataTransferHelper transfer) {
        this.transfer = transfer;
    }

    public List<IncrementalKeyedStateHandle.HandleAndLocalPath> uploadFilesToCheckpointFs(@Nonnull List<Path> files, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, CloseableRegistry closeableRegistry, CloseableRegistry tmpResourcesRegistry) throws Exception {
        List<CompletableFuture<IncrementalKeyedStateHandle.HandleAndLocalPath>> futures = this.createUploadFutures(files, checkpointStreamFactory, stateScope, closeableRegistry, tmpResourcesRegistry);
        ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> handles = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>(files.size());
        try {
            FutureUtils.waitForAll(futures).get();
            for (CompletableFuture<IncrementalKeyedStateHandle.HandleAndLocalPath> future : futures) {
                handles.add(future.get());
            }
        }
        catch (ExecutionException e) {
            Throwable throwable = ExceptionUtils.stripExecutionException((Throwable)e);
            throwable = ExceptionUtils.stripException((Throwable)throwable, RuntimeException.class);
            if (throwable instanceof IOException) {
                throw (IOException)throwable;
            }
            throw new FlinkRuntimeException("Failed to upload data for state handles.", (Throwable)e);
        }
        return handles;
    }

    private List<CompletableFuture<IncrementalKeyedStateHandle.HandleAndLocalPath>> createUploadFutures(List<Path> files, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, CloseableRegistry closeableRegistry, CloseableRegistry tmpResourcesRegistry) {
        return files.stream().map(e -> CompletableFuture.supplyAsync(CheckedSupplier.unchecked(() -> this.uploadLocalFileToCheckpointFs((Path)e, checkpointStreamFactory, stateScope, closeableRegistry, tmpResourcesRegistry)), this.transfer.getExecutorService())).collect(Collectors.toList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private IncrementalKeyedStateHandle.HandleAndLocalPath uploadLocalFileToCheckpointFs(Path filePath, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, CloseableRegistry closeableRegistry, CloseableRegistry tmpResourcesRegistry) throws IOException {
        IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath;
        InputStream inputStream = null;
        CheckpointStateOutputStream outputStream = null;
        try {
            StreamStateHandle result;
            int numBytes;
            byte[] buffer = new byte[16384];
            inputStream = Files.newInputStream(filePath, new OpenOption[0]);
            closeableRegistry.registerCloseable((AutoCloseable)inputStream);
            outputStream = checkpointStreamFactory.createCheckpointStateOutputStream(stateScope);
            closeableRegistry.registerCloseable((AutoCloseable)outputStream);
            while ((numBytes = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, numBytes);
            }
            if (closeableRegistry.unregisterCloseable((AutoCloseable)outputStream)) {
                result = outputStream.closeAndGetHandle();
                outputStream = null;
            } else {
                result = null;
            }
            tmpResourcesRegistry.registerCloseable(() -> StateUtil.discardStateObjectQuietly((StateObject)result));
            handleAndLocalPath = IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)result, (String)filePath.getFileName().toString());
        }
        catch (Throwable throwable) {
            if (closeableRegistry.unregisterCloseable(inputStream)) {
                IOUtils.closeQuietly((AutoCloseable)inputStream);
            }
            if (closeableRegistry.unregisterCloseable(outputStream)) {
                IOUtils.closeQuietly(outputStream);
            }
            throw throwable;
        }
        if (closeableRegistry.unregisterCloseable((AutoCloseable)inputStream)) {
            IOUtils.closeQuietly((AutoCloseable)inputStream);
        }
        if (closeableRegistry.unregisterCloseable((AutoCloseable)outputStream)) {
            IOUtils.closeQuietly((AutoCloseable)outputStream);
        }
        return handleAndLocalPath;
    }

    @Override
    public void close() throws IOException {
        this.transfer.close();
    }
}

