package org.apache.flink.state.forst.datatransfer;

import java.io.IOException;
import java.util.Objects;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
import org.apache.flink.state.forst.fs.filemapping.FileOwnership;
import org.apache.flink.state.forst.fs.filemapping.FileOwnershipDecider;
import org.apache.flink.state.forst.fs.filemapping.MappingEntry;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/state/forst/datatransfer/ReusableDataTransferStrategy.class */
public class ReusableDataTransferStrategy extends CopyDataTransferStrategy {
    /* JADX INFO: Access modifiers changed from: package-private */
    public ReusableDataTransferStrategy(FileSystem fileSystem) {
        super(fileSystem);
        Preconditions.checkArgument(fileSystem instanceof ForStFlinkFileSystem, "Unexpected dbFileSystem type: " + fileSystem.getClass() + ", expected: " + ForStFlinkFileSystem.class);
    }

    private ForStFlinkFileSystem getForStFlinkFileSystem() {
        return (ForStFlinkFileSystem) this.dbFileSystem;
    }

    private IncrementalKeyedStateHandle.HandleAndLocalPath reuseFileToCheckpoint(Path path, ForStFlinkFileSystem forStFlinkFileSystem) throws IOException {
        LOG.trace("Reuse file to checkpoint: {}", path);
        StreamStateHandle stateHandle = ((MappingEntry) Objects.requireNonNull(forStFlinkFileSystem.getMappingEntry(path))).getSource().toStateHandle();
        forStFlinkFileSystem.giveUpOwnership(path, stateHandle);
        return IncrementalKeyedStateHandle.HandleAndLocalPath.of(stateHandle, path.getName());
    }

    @Override // org.apache.flink.state.forst.datatransfer.CopyDataTransferStrategy, org.apache.flink.state.forst.datatransfer.DataTransferStrategy
    public IncrementalKeyedStateHandle.HandleAndLocalPath transferToCheckpoint(Path path, long j, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope checkpointedStateScope, CloseableRegistry closeableRegistry, CloseableRegistry closeableRegistry2) throws IOException {
        if (j < 0) {
            j = Long.MAX_VALUE;
        }
        ForStFlinkFileSystem forStFlinkFileSystem = getForStFlinkFileSystem();
        MappingEntry mappingEntry = forStFlinkFileSystem.getMappingEntry(path);
        Preconditions.checkNotNull(mappingEntry);
        if (mappingEntry.getFileOwnership() != FileOwnership.PRIVATE_OWNED_BY_DB) {
            return reuseFileToCheckpoint(path, forStFlinkFileSystem);
        }
        LOG.trace("Do not reuse file to checkpoint because the file is privately owned by DB: {}", path);
        return super.transferToCheckpoint(path, j, checkpointStreamFactory, checkpointedStateScope, closeableRegistry, closeableRegistry2);
    }

    @Override // org.apache.flink.state.forst.datatransfer.CopyDataTransferStrategy, org.apache.flink.state.forst.datatransfer.DataTransferStrategy
    public void transferFromCheckpoint(StreamStateHandle streamStateHandle, Path path, CloseableRegistry closeableRegistry) throws IOException {
        if (streamStateHandle instanceof ByteStreamStateHandle) {
            LOG.trace("Not reusing file from checkpoint because it is stored in Memory: {}", path);
            super.transferFromCheckpoint(streamStateHandle, path, closeableRegistry);
        } else if (FileOwnershipDecider.decideForRestoredFile(path) != FileOwnership.PRIVATE_OWNED_BY_DB) {
            reuseFileFromCheckpoint(streamStateHandle, path);
        } else {
            LOG.trace("Not reusing file from checkpoint because the file is privately owned by DB: {}", path);
            super.transferFromCheckpoint(streamStateHandle, path, closeableRegistry);
        }
    }

    private void reuseFileFromCheckpoint(StreamStateHandle streamStateHandle, Path path) throws IOException {
        LOG.trace("Reuse file from checkpoint: {}, {}", streamStateHandle, path);
        String physicalStateHandleID = streamStateHandle.getStreamStateHandleID().toString();
        getForStFlinkFileSystem().registerReusedRestoredFile(physicalStateHandleID, streamStateHandle, path);
        getForStFlinkFileSystem().link(physicalStateHandleID, path);
    }

    @Override // org.apache.flink.state.forst.datatransfer.CopyDataTransferStrategy
    public String toString() {
        return "ReusableDataTransferStrategy{dbFileSystem=" + this.dbFileSystem + "}";
    }
}
