package org.apache.flink.state.forst.datatransfer;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
import org.apache.flink.state.forst.StateHandleTransferSpec;
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/state/forst/datatransfer/DataTransferStrategyBuilder.class */
public class DataTransferStrategyBuilder {
    private static final Logger LOG = LoggerFactory.getLogger(DataTransferStrategyBuilder.class);

    public static DataTransferStrategy buildForSnapshot(SnapshotType.SharingFilesStrategy sharingFilesStrategy, @Nullable ForStFlinkFileSystem forStFlinkFileSystem, @Nullable CheckpointStreamFactory checkpointStreamFactory) {
        return buildForSnapshot(sharingFilesStrategy, forStFlinkFileSystem, isDbPathUnderCheckpointPathForSnapshot(forStFlinkFileSystem, checkpointStreamFactory));
    }

    @VisibleForTesting
    static DataTransferStrategy buildForSnapshot(SnapshotType.SharingFilesStrategy sharingFilesStrategy, @Nullable ForStFlinkFileSystem forStFlinkFileSystem, boolean z) {
        if (sharingFilesStrategy == SnapshotType.SharingFilesStrategy.FORWARD_BACKWARD && forStFlinkFileSystem != null && z) {
            ReusableDataTransferStrategy reusableDataTransferStrategy = new ReusableDataTransferStrategy(forStFlinkFileSystem);
            LOG.info("Build DataTransferStrategy for Snapshot: {}", reusableDataTransferStrategy);
            return reusableDataTransferStrategy;
        }
        CopyDataTransferStrategy copyDataTransferStrategy = forStFlinkFileSystem == null ? new CopyDataTransferStrategy() : new CopyDataTransferStrategy(forStFlinkFileSystem);
        LOG.info("Build DataTransferStrategy for Snapshot: {}", copyDataTransferStrategy);
        return copyDataTransferStrategy;
    }

    private static boolean isDbPathUnderCheckpointPathForSnapshot(@Nullable ForStFlinkFileSystem forStFlinkFileSystem, @Nullable CheckpointStreamFactory checkpointStreamFactory) {
        if (forStFlinkFileSystem == null || checkpointStreamFactory == null || checkpointStreamFactory.getClass() != FsCheckpointStorageLocation.class) {
            return false;
        }
        Path targetPath = ((FsCheckpointStorageLocation) checkpointStreamFactory).getTargetPath(CheckpointedStateScope.SHARED);
        try {
            if (!targetPath.getFileSystem().getUri().equals(forStFlinkFileSystem.getDelegateFS().getUri())) {
                return false;
            }
            String remoteBase = forStFlinkFileSystem.getRemoteBase();
            String path = targetPath.toString();
            return path.equals(remoteBase) || remoteBase.startsWith(path);
        } catch (IOException e) {
            throw new RuntimeException("Failed to get FileSystem from cpSharedPath: " + targetPath, e);
        }
    }

    public static DataTransferStrategy buildForRestore(@Nullable ForStFlinkFileSystem forStFlinkFileSystem, Collection<StateHandleTransferSpec> collection, RecoveryClaimMode recoveryClaimMode) {
        FileSystem sharedStateFileSystem = getSharedStateFileSystem(collection);
        if (forStFlinkFileSystem == null || sharedStateFileSystem == null || !forStFlinkFileSystem.getUri().equals(sharedStateFileSystem.getUri()) || recoveryClaimMode != RecoveryClaimMode.CLAIM) {
            CopyDataTransferStrategy copyDataTransferStrategy = forStFlinkFileSystem == null ? new CopyDataTransferStrategy() : new CopyDataTransferStrategy(forStFlinkFileSystem);
            LOG.info("Build DataTransferStrategy for Restore: {}, forStFlinkFileSystem: {}, cpSharedFs:{}, recoveryClaimMode:{}", new Object[]{copyDataTransferStrategy, forStFlinkFileSystem, sharedStateFileSystem, recoveryClaimMode});
            return copyDataTransferStrategy;
        }
        ReusableDataTransferStrategy reusableDataTransferStrategy = new ReusableDataTransferStrategy(forStFlinkFileSystem);
        LOG.info("Build DataTransferStrategy for Restore: {}", reusableDataTransferStrategy);
        return reusableDataTransferStrategy;
    }

    @Nullable
    private static FileSystem getSharedStateFileSystem(Collection<StateHandleTransferSpec> collection) {
        Iterator<StateHandleTransferSpec> it = collection.iterator();
        while (it.hasNext()) {
            Iterator it2 = it.next().getStateHandle().getSharedState().iterator();
            while (it2.hasNext()) {
                FileStateHandle handle = ((IncrementalKeyedStateHandle.HandleAndLocalPath) it2.next()).getHandle();
                if (handle instanceof FileStateHandle) {
                    try {
                        return handle.getFilePath().getFileSystem();
                    } catch (IOException e) {
                        throw new RuntimeException("Failed to get FileSystem from handle: " + handle, e);
                    }
                }
            }
        }
        return null;
    }
}
