package org.apache.flink.state.forst.datatransfer;

import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.state.forst.StateHandleTransferSpec;
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.class */
public class ForStStateDataTransfer implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(ForStStateDataTransfer.class);
    public static final int DEFAULT_THREAD_NUM = 4;
    protected final ExecutorService executorService;

    @Nullable
    private final ForStFlinkFileSystem forStFs;

    public ForStStateDataTransfer(int i) {
        this(i, null);
    }

    public ForStStateDataTransfer(int i, ForStFlinkFileSystem forStFlinkFileSystem) {
        this.forStFs = forStFlinkFileSystem;
        if (i > 1) {
            this.executorService = Executors.newFixedThreadPool(i, new ExecutorThreadFactory("Flink-ForStStateDataTransfer"));
        } else {
            this.executorService = org.apache.flink.util.concurrent.Executors.newDirectExecutorService();
        }
    }

    public IncrementalKeyedStateHandle.HandleAndLocalPath transferFileToCheckpointFs(SnapshotType.SharingFilesStrategy sharingFilesStrategy, Path path, long j, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope checkpointedStateScope, CloseableRegistry closeableRegistry, CloseableRegistry closeableRegistry2) throws Exception {
        try {
            return createTransferFuture(DataTransferStrategyBuilder.buildForSnapshot(sharingFilesStrategy, this.forStFs, checkpointStreamFactory), path, j, checkpointStreamFactory, checkpointedStateScope, closeableRegistry, closeableRegistry2).get();
        } catch (ExecutionException e) {
            throw convertExecutionException(e);
        }
    }

    public List<IncrementalKeyedStateHandle.HandleAndLocalPath> transferFilesToCheckpointFs(SnapshotType.SharingFilesStrategy sharingFilesStrategy, List<Path> list, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope checkpointedStateScope, CloseableRegistry closeableRegistry, CloseableRegistry closeableRegistry2) throws Exception {
        DataTransferStrategy buildForSnapshot = DataTransferStrategyBuilder.buildForSnapshot(sharingFilesStrategy, this.forStFs, checkpointStreamFactory);
        List list2 = (List) list.stream().map(path -> {
            return createTransferFuture(buildForSnapshot, path, -1L, checkpointStreamFactory, checkpointedStateScope, closeableRegistry, closeableRegistry2);
        }).collect(Collectors.toList());
        try {
            ArrayList arrayList = new ArrayList(list.size());
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                arrayList.add((IncrementalKeyedStateHandle.HandleAndLocalPath) ((CompletableFuture) it.next()).get());
            }
            return arrayList;
        } catch (ExecutionException e) {
            throw convertExecutionException(e);
        }
    }

    public IncrementalKeyedStateHandle.HandleAndLocalPath writeFileToCheckpointFs(String str, String str2, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope checkpointedStateScope, CloseableRegistry closeableRegistry, CloseableRegistry closeableRegistry2) throws IOException {
        StreamStateHandle streamStateHandle;
        AutoCloseable autoCloseable = null;
        try {
            autoCloseable = checkpointStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
            closeableRegistry.registerCloseable(autoCloseable);
            byte[] bytes = str2.getBytes(StandardCharsets.UTF_8);
            autoCloseable.write(bytes, 0, bytes.length);
            if (closeableRegistry.unregisterCloseable(autoCloseable)) {
                streamStateHandle = autoCloseable.closeAndGetHandle();
                autoCloseable = null;
            } else {
                streamStateHandle = null;
            }
            StreamStateHandle streamStateHandle2 = streamStateHandle;
            closeableRegistry2.registerCloseable(() -> {
                StateUtil.discardStateObjectQuietly(streamStateHandle2);
            });
            IncrementalKeyedStateHandle.HandleAndLocalPath of = IncrementalKeyedStateHandle.HandleAndLocalPath.of(streamStateHandle, str);
            if (closeableRegistry.unregisterCloseable(autoCloseable)) {
                IOUtils.closeQuietly(autoCloseable);
            }
            return of;
        } catch (Throwable th) {
            if (closeableRegistry.unregisterCloseable(autoCloseable)) {
                IOUtils.closeQuietly(autoCloseable);
            }
            throw th;
        }
    }

    private CompletableFuture<IncrementalKeyedStateHandle.HandleAndLocalPath> createTransferFuture(DataTransferStrategy dataTransferStrategy, Path path, long j, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope checkpointedStateScope, CloseableRegistry closeableRegistry, CloseableRegistry closeableRegistry2) {
        return CompletableFuture.supplyAsync(CheckedSupplier.unchecked(() -> {
            return dataTransferStrategy.transferToCheckpoint(path, j, checkpointStreamFactory, checkpointedStateScope, closeableRegistry, closeableRegistry2);
        }), this.executorService);
    }

    private FileSystem getDbFileSystem() {
        return this.forStFs != null ? this.forStFs : FileSystem.getLocalFileSystem();
    }

    public void transferAllStateDataToDirectory(Collection<StateHandleTransferSpec> collection, CloseableRegistry closeableRegistry, RecoveryClaimMode recoveryClaimMode) throws Exception {
        CloseableRegistry closeableRegistry2 = new CloseableRegistry();
        closeableRegistry.registerCloseable(closeableRegistry2);
        try {
            try {
                FutureUtils.completeAll((List) transferAllStateDataToDirectoryAsync(collection, closeableRegistry2, recoveryClaimMode).collect(Collectors.toList())).get();
                if (closeableRegistry.unregisterCloseable(closeableRegistry2)) {
                    IOUtils.closeQuietly(closeableRegistry2);
                }
            } catch (ExecutionException e) {
                collection.stream().map((v0) -> {
                    return v0.getTransferDestination();
                }).forEach(path -> {
                    try {
                        getDbFileSystem().delete(path, true);
                    } catch (IOException e2) {
                        LOG.warn("Failed to delete transfer destination.", e2);
                    }
                });
                throw convertExecutionException(e);
            }
        } catch (Throwable th) {
            if (closeableRegistry.unregisterCloseable(closeableRegistry2)) {
                IOUtils.closeQuietly(closeableRegistry2);
            }
            throw th;
        }
    }

    private Stream<CompletableFuture<Void>> transferAllStateDataToDirectoryAsync(Collection<StateHandleTransferSpec> collection, CloseableRegistry closeableRegistry, RecoveryClaimMode recoveryClaimMode) {
        DataTransferStrategy buildForRestore = DataTransferStrategyBuilder.buildForRestore(this.forStFs, collection, recoveryClaimMode);
        return collection.stream().flatMap(stateHandleTransferSpec -> {
            return Stream.concat(stateHandleTransferSpec.getStateHandle().getSharedState().stream(), stateHandleTransferSpec.getStateHandle().getPrivateState().stream()).map(handleAndLocalPath -> {
                String localPath = handleAndLocalPath.getLocalPath();
                StreamStateHandle handle = handleAndLocalPath.getHandle();
                Path path = new Path(stateHandleTransferSpec.getTransferDestination(), localPath);
                return ThrowingRunnable.unchecked(() -> {
                    buildForRestore.transferFromCheckpoint(handle, path, closeableRegistry);
                });
            });
        }).map(runnable -> {
            return CompletableFuture.runAsync(runnable, this.executorService);
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.executorService.shutdownNow();
    }

    private Exception convertExecutionException(ExecutionException executionException) {
        Throwable stripException = ExceptionUtils.stripException(ExceptionUtils.stripExecutionException(executionException), RuntimeException.class);
        return stripException instanceof IOException ? (IOException) stripException : new FlinkRuntimeException("Failed to transfer data.", executionException);
    }
}
