/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst.datatransfer;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.state.forst.datatransfer.DataTransferStrategy;
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
import org.apache.flink.state.forst.fs.filemapping.MappingEntry;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

public class CopyDataTransferStrategy
extends DataTransferStrategy {
    private static final int READ_BUFFER_SIZE = 65536;

    CopyDataTransferStrategy() {
        super((FileSystem)new LocalFileSystem());
    }

    CopyDataTransferStrategy(@Nonnull FileSystem dbFileSystem) {
        super(dbFileSystem);
    }

    @Override
    public IncrementalKeyedStateHandle.HandleAndLocalPath transferToCheckpoint(Path dbFilePath, long maxTransferBytes, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, CloseableRegistry closeableRegistry, CloseableRegistry tmpResourcesRegistry) throws IOException {
        if (maxTransferBytes < 0L) {
            maxTransferBytes = Long.MAX_VALUE;
        }
        return this.copyFileToCheckpoint(dbFilePath, maxTransferBytes, checkpointStreamFactory, stateScope, closeableRegistry, tmpResourcesRegistry);
    }

    @Override
    public void transferFromCheckpoint(StreamStateHandle sourceHandle, Path targetPath, CloseableRegistry closeableRegistry) throws IOException {
        LOG.trace("Copy file from checkpoint: {}, {}, {}", new Object[]{sourceHandle, targetPath, this.dbFileSystem});
        this.copyFileFromCheckpoint(sourceHandle, targetPath, closeableRegistry);
    }

    public String toString() {
        return "CopyDataTransferStrategy{, dbFileSystem=" + this.dbFileSystem + "}";
    }

    private IncrementalKeyedStateHandle.HandleAndLocalPath copyFileToCheckpoint(Path dbFilePath, long maxTransferBytes, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, CloseableRegistry closeableRegistry, CloseableRegistry tmpResourcesRegistry) throws IOException {
        FileStateHandle sourceStateHandle;
        if (this.dbFileSystem instanceof ForStFlinkFileSystem) {
            MappingEntry mappingEntry = ((ForStFlinkFileSystem)this.dbFileSystem).getMappingEntry(dbFilePath);
            Preconditions.checkNotNull((Object)((Object)mappingEntry), (String)("dbFile not found: " + dbFilePath));
            sourceStateHandle = mappingEntry.getSource().toStateHandle();
        } else {
            FileSystem sourceFileSystem = dbFilePath.getFileSystem();
            long fileLength = sourceFileSystem.getFileStatus(dbFilePath).getLen();
            sourceStateHandle = new FileStateHandle(dbFilePath, fileLength);
        }
        StreamStateHandle targetStateHandle = this.tryPathCopyingToCheckpoint((StreamStateHandle)sourceStateHandle, checkpointStreamFactory, stateScope, tmpResourcesRegistry, maxTransferBytes);
        if (targetStateHandle != null) {
            LOG.trace("Path-copy file to checkpoint: {} {}", (Object)dbFilePath, (Object)targetStateHandle);
        } else {
            targetStateHandle = this.bytesCopyingToCheckpoint(dbFilePath, maxTransferBytes, checkpointStreamFactory, stateScope, closeableRegistry, tmpResourcesRegistry);
            LOG.trace("Bytes-copy file to checkpoint: {}, {}", (Object)dbFilePath, (Object)targetStateHandle);
        }
        return IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)targetStateHandle, (String)dbFilePath.getName());
    }

    @Nullable
    private StreamStateHandle tryPathCopyingToCheckpoint(@Nonnull StreamStateHandle sourceHandle, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, CloseableRegistry tmpResourcesRegistry, long maxTransferBytes) {
        try {
            if (maxTransferBytes > 0L && maxTransferBytes != Long.MAX_VALUE) {
                return null;
            }
            if (!checkpointStreamFactory.canFastDuplicate(sourceHandle, stateScope)) {
                return null;
            }
            List result = checkpointStreamFactory.duplicate(Collections.singletonList(sourceHandle), stateScope);
            StreamStateHandle resultStateHandle = (StreamStateHandle)result.get(0);
            tmpResourcesRegistry.registerCloseable(() -> StateUtil.discardStateObjectQuietly((StateObject)resultStateHandle));
            return resultStateHandle;
        }
        catch (Exception e) {
            LOG.warn("Failed to duplicate file to checkpoint: {} {}", new Object[]{sourceHandle, stateScope, e});
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private StreamStateHandle bytesCopyingToCheckpoint(Path filePath, long maxTransferBytes, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, CloseableRegistry closeableRegistry, CloseableRegistry tmpResourcesRegistry) throws IOException {
        StreamStateHandle streamStateHandle;
        FSDataInputStream inputStream = null;
        CheckpointStateOutputStream outputStream = null;
        try {
            StreamStateHandle result;
            int maxReadBytes;
            int readBytes;
            byte[] buffer = new byte[65536];
            inputStream = this.dbFileSystem.open(filePath, 65536);
            closeableRegistry.registerCloseable((AutoCloseable)inputStream);
            outputStream = checkpointStreamFactory.createCheckpointStateOutputStream(stateScope);
            closeableRegistry.registerCloseable((AutoCloseable)outputStream);
            while (maxTransferBytes > 0L && (readBytes = inputStream.read(buffer, 0, maxReadBytes = (int)Math.min(maxTransferBytes, 65536L))) != -1) {
                outputStream.write(buffer, 0, readBytes);
                maxTransferBytes -= (long)readBytes;
            }
            if (closeableRegistry.unregisterCloseable((AutoCloseable)outputStream)) {
                result = outputStream.closeAndGetHandle();
                outputStream = null;
            } else {
                result = null;
            }
            tmpResourcesRegistry.registerCloseable(() -> StateUtil.discardStateObjectQuietly((StateObject)result));
            streamStateHandle = result;
        }
        catch (Throwable throwable) {
            if (closeableRegistry.unregisterCloseable(inputStream)) {
                IOUtils.closeQuietly(inputStream);
            }
            if (closeableRegistry.unregisterCloseable(outputStream)) {
                IOUtils.closeQuietly(outputStream);
            }
            throw throwable;
        }
        if (closeableRegistry.unregisterCloseable((AutoCloseable)inputStream)) {
            IOUtils.closeQuietly((AutoCloseable)inputStream);
        }
        if (closeableRegistry.unregisterCloseable((AutoCloseable)outputStream)) {
            IOUtils.closeQuietly((AutoCloseable)outputStream);
        }
        return streamStateHandle;
    }

    private void copyFileFromCheckpoint(StreamStateHandle sourceHandle, Path targetPath, CloseableRegistry closeableRegistry) throws IOException {
        if (closeableRegistry.isClosed()) {
            return;
        }
        try {
            int numBytes;
            FSDataInputStream input = sourceHandle.openInputStream();
            closeableRegistry.registerCloseable((AutoCloseable)input);
            FSDataOutputStream output = this.dbFileSystem.create(targetPath, FileSystem.WriteMode.NO_OVERWRITE);
            closeableRegistry.registerCloseable((AutoCloseable)output);
            byte[] buffer = new byte[65536];
            while ((numBytes = input.read(buffer)) != -1) {
                output.write(buffer, 0, numBytes);
            }
            closeableRegistry.unregisterAndCloseAll(new Closeable[]{output, input});
        }
        catch (Exception ex) {
            LOG.info("closing: {}, {}, {}", new Object[]{sourceHandle, targetPath, ex});
            IOUtils.closeQuietly((AutoCloseable)closeableRegistry);
            throw new IOException(ex);
        }
    }
}

