package org.apache.flink.state.forst.datatransfer;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
import org.apache.flink.state.forst.fs.filemapping.FileOwnership;
import org.apache.flink.state.forst.fs.filemapping.MappingEntry;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.class */
public class CopyDataTransferStrategy extends DataTransferStrategy {
    private static final int READ_BUFFER_SIZE = 65536;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CopyDataTransferStrategy() {
        super(new LocalFileSystem());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CopyDataTransferStrategy(@Nonnull FileSystem fileSystem) {
        super(fileSystem);
    }

    @Override // org.apache.flink.state.forst.datatransfer.DataTransferStrategy
    public IncrementalKeyedStateHandle.HandleAndLocalPath transferToCheckpoint(Path path, long j, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope checkpointedStateScope, CloseableRegistry closeableRegistry, CloseableRegistry closeableRegistry2) throws IOException {
        if (j < 0) {
            j = Long.MAX_VALUE;
        }
        return copyFileToCheckpoint(path, j, checkpointStreamFactory, checkpointedStateScope, closeableRegistry, closeableRegistry2);
    }

    @Override // org.apache.flink.state.forst.datatransfer.DataTransferStrategy
    public void transferFromCheckpoint(StreamStateHandle streamStateHandle, Path path, CloseableRegistry closeableRegistry) throws IOException {
        LOG.trace("Copy file from checkpoint: {}, {}, {}", new Object[]{streamStateHandle, path, this.dbFileSystem});
        copyFileFromCheckpoint(streamStateHandle, path, closeableRegistry);
    }

    public String toString() {
        return "CopyDataTransferStrategy{, dbFileSystem=" + this.dbFileSystem + "}";
    }

    private IncrementalKeyedStateHandle.HandleAndLocalPath copyFileToCheckpoint(Path path, long j, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope checkpointedStateScope, CloseableRegistry closeableRegistry, CloseableRegistry closeableRegistry2) throws IOException {
        StreamStateHandle fileStateHandle;
        if (this.dbFileSystem instanceof ForStFlinkFileSystem) {
            MappingEntry mappingEntry = ((ForStFlinkFileSystem) this.dbFileSystem).getMappingEntry(path);
            Preconditions.checkNotNull(mappingEntry, "dbFile not found: " + path);
            fileStateHandle = mappingEntry.getSource().toStateHandle();
            if (mappingEntry.getFileOwnership() == FileOwnership.NOT_OWNED) {
                return IncrementalKeyedStateHandle.HandleAndLocalPath.of(fileStateHandle, path.getName());
            }
        } else {
            fileStateHandle = new FileStateHandle(path, path.getFileSystem().getFileStatus(path).getLen());
        }
        StreamStateHandle tryPathCopyingToCheckpoint = tryPathCopyingToCheckpoint(fileStateHandle, checkpointStreamFactory, checkpointedStateScope);
        if (tryPathCopyingToCheckpoint != null) {
            LOG.trace("Path-copy file to checkpoint: {} {}", path, tryPathCopyingToCheckpoint);
        } else {
            tryPathCopyingToCheckpoint = bytesCopyingToCheckpoint(path, j, checkpointStreamFactory, checkpointedStateScope, closeableRegistry, closeableRegistry2);
            LOG.trace("Bytes-copy file to checkpoint: {}, {}", path, tryPathCopyingToCheckpoint);
        }
        return IncrementalKeyedStateHandle.HandleAndLocalPath.of(tryPathCopyingToCheckpoint, path.getName());
    }

    @Nullable
    private StreamStateHandle tryPathCopyingToCheckpoint(@Nonnull StreamStateHandle streamStateHandle, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope checkpointedStateScope) {
        try {
            if (checkpointStreamFactory.canFastDuplicate(streamStateHandle, checkpointedStateScope)) {
                return (StreamStateHandle) checkpointStreamFactory.duplicate(Collections.singletonList(streamStateHandle), checkpointedStateScope).get(0);
            }
            return null;
        } catch (Exception e) {
            LOG.warn("Failed to duplicate file to checkpoint: {} {}", new Object[]{streamStateHandle, checkpointedStateScope, e});
            return null;
        }
    }

    @Nullable
    private StreamStateHandle bytesCopyingToCheckpoint(Path path, long j, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope checkpointedStateScope, CloseableRegistry closeableRegistry, CloseableRegistry closeableRegistry2) throws IOException {
        StreamStateHandle streamStateHandle;
        int read;
        InputStream inputStream = null;
        AutoCloseable autoCloseable = null;
        try {
            byte[] bArr = new byte[READ_BUFFER_SIZE];
            inputStream = this.dbFileSystem.open(path, READ_BUFFER_SIZE);
            closeableRegistry.registerCloseable(inputStream);
            autoCloseable = checkpointStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
            closeableRegistry.registerCloseable(autoCloseable);
            while (j > 0 && (read = inputStream.read(bArr, 0, (int) Math.min(j, 65536L))) != -1) {
                autoCloseable.write(bArr, 0, read);
                j -= read;
            }
            if (closeableRegistry.unregisterCloseable(autoCloseable)) {
                streamStateHandle = autoCloseable.closeAndGetHandle();
                autoCloseable = null;
            } else {
                streamStateHandle = null;
            }
            StreamStateHandle streamStateHandle2 = streamStateHandle;
            closeableRegistry2.registerCloseable(() -> {
                StateUtil.discardStateObjectQuietly(streamStateHandle2);
            });
            StreamStateHandle streamStateHandle3 = streamStateHandle;
            if (closeableRegistry.unregisterCloseable(inputStream)) {
                IOUtils.closeQuietly(inputStream);
            }
            if (closeableRegistry.unregisterCloseable(autoCloseable)) {
                IOUtils.closeQuietly(autoCloseable);
            }
            return streamStateHandle3;
        } catch (Throwable th) {
            if (closeableRegistry.unregisterCloseable(inputStream)) {
                IOUtils.closeQuietly(inputStream);
            }
            if (closeableRegistry.unregisterCloseable(autoCloseable)) {
                IOUtils.closeQuietly(autoCloseable);
            }
            throw th;
        }
    }

    private void copyFileFromCheckpoint(StreamStateHandle streamStateHandle, Path path, CloseableRegistry closeableRegistry) throws IOException {
        if (closeableRegistry.isClosed()) {
            return;
        }
        try {
            Closeable openInputStream = streamStateHandle.openInputStream();
            closeableRegistry.registerCloseable(openInputStream);
            Closeable create = this.dbFileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE);
            closeableRegistry.registerCloseable(create);
            byte[] bArr = new byte[READ_BUFFER_SIZE];
            while (true) {
                int read = openInputStream.read(bArr);
                if (read == -1) {
                    closeableRegistry.unregisterAndCloseAll(new Closeable[]{create, openInputStream});
                    return;
                }
                create.write(bArr, 0, read);
            }
        } catch (Exception e) {
            LOG.info("closing: {}, {}, {}", new Object[]{streamStateHandle, path, e});
            IOUtils.closeQuietly(closeableRegistry);
            throw new IOException(e);
        }
    }
}
