package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.class */
public class RemoteStorageScanner implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteStorageScanner.class);
    private static final int MAX_RETRY_TIME = 100;
    private static final int INITIAL_SCAN_INTERVAL_MS = 100;
    private static final int MAX_SCAN_INTERVAL_MS = 10000;
    private final String baseRemoteStoragePath;

    @Nullable
    private BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId> availabilityNotifier;
    private final ScheduledExecutorService scannerExecutor = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("remote storage scanner"));
    private int lastInterval = 100;
    private int currentRetryTime = 0;
    private final Map<Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>, Integer> requiredSegmentIds = new ConcurrentHashMap();
    private final Map<Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>, Integer> scannedMaxSegmentIds = new ConcurrentHashMap();
    private final ScanStrategy scanStrategy = new ScanStrategy(MAX_SCAN_INTERVAL_MS);
    private final FileSystem remoteFileSystem = createFileSystem();

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner$ScanStrategy.class */
    static class ScanStrategy {
        private final int maxScanInterval;

        ScanStrategy(int i) {
            Preconditions.checkArgument(i > 0, "maxScanInterval must be positive, was %s", new Object[]{Integer.valueOf(i)});
            this.maxScanInterval = i;
        }

        int getInterval(int i) {
            return Math.min(i * 2, this.maxScanInterval);
        }
    }

    public RemoteStorageScanner(String str) {
        this.baseRemoteStoragePath = str;
    }

    private FileSystem createFileSystem() {
        FileSystem fileSystem = null;
        try {
            fileSystem = new Path(this.baseRemoteStoragePath).getFileSystem();
        } catch (IOException e) {
            ExceptionUtils.rethrow(e, "Failed to initialize file system on the path: " + this.baseRemoteStoragePath);
        }
        return fileSystem;
    }

    public void start() {
        synchronized (this.scannerExecutor) {
            if (!this.scannerExecutor.isShutdown()) {
                this.scannerExecutor.schedule(this, this.lastInterval, TimeUnit.MILLISECONDS);
            }
        }
    }

    public void watchSegment(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageSubpartitionId tieredStorageSubpartitionId, int i) {
        this.scannedMaxSegmentIds.compute(Tuple2.of(tieredStoragePartitionId, tieredStorageSubpartitionId), (tuple2, num) -> {
            if (num == null || num.intValue() < i) {
                this.requiredSegmentIds.put(tuple2, Integer.valueOf(i));
            }
            return num;
        });
    }

    public void close() {
        synchronized (this.scannerExecutor) {
            this.scannerExecutor.shutdownNow();
        }
        try {
            if (this.scannerExecutor.awaitTermination(5L, TimeUnit.MINUTES)) {
            } else {
                throw new TimeoutException("Timeout to shutdown the flush thread.");
            }
        } catch (InterruptedException | TimeoutException e) {
            ExceptionUtils.rethrow(e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            Iterator<Map.Entry<Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>, Integer>> it = this.requiredSegmentIds.entrySet().iterator();
            boolean z = false;
            while (it.hasNext()) {
                Map.Entry<Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>, Integer> next = it.next();
                TieredStoragePartitionId tieredStoragePartitionId = (TieredStoragePartitionId) next.getKey().f0;
                TieredStorageSubpartitionId tieredStorageSubpartitionId = (TieredStorageSubpartitionId) next.getKey().f1;
                int intValue = next.getValue().intValue();
                if (this.scannedMaxSegmentIds.getOrDefault(next.getKey(), -1).intValue() < intValue || !checkSegmentExist(tieredStoragePartitionId, tieredStorageSubpartitionId, intValue)) {
                    scanMaxSegmentId(tieredStoragePartitionId, tieredStorageSubpartitionId);
                } else {
                    z = true;
                    it.remove();
                    ((BiConsumer) Preconditions.checkNotNull(this.availabilityNotifier)).accept(tieredStoragePartitionId, tieredStorageSubpartitionId);
                }
            }
            this.lastInterval = z ? 100 : this.scanStrategy.getInterval(this.lastInterval);
            start();
        } catch (Throwable th) {
            FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), th);
        }
    }

    public void registerAvailabilityAndPriorityNotifier(BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId> biConsumer) {
        this.availabilityNotifier = biConsumer;
    }

    private void scanMaxSegmentId(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageSubpartitionId tieredStorageSubpartitionId) {
        Path segmentFinishDirPath = SegmentPartitionFile.getSegmentFinishDirPath(this.baseRemoteStoragePath, tieredStoragePartitionId, tieredStorageSubpartitionId.getSubpartitionId());
        FileStatus[] fileStatusArr = new FileStatus[0];
        try {
        } catch (Throwable th) {
            if (th instanceof FileNotFoundException) {
                return;
            }
            this.currentRetryTime++;
            tryThrowException(th, "Failed to list the segment finish file.");
        }
        if (this.remoteFileSystem.exists(segmentFinishDirPath)) {
            fileStatusArr = this.remoteFileSystem.listStatus(segmentFinishDirPath);
            this.currentRetryTime = 0;
            if (fileStatusArr.length != 1) {
                return;
            }
            this.scannedMaxSegmentIds.put(Tuple2.of(tieredStoragePartitionId, tieredStorageSubpartitionId), Integer.valueOf(Integer.parseInt(fileStatusArr[0].getPath().getName())));
        }
    }

    private boolean checkSegmentExist(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageSubpartitionId tieredStorageSubpartitionId, int i) {
        boolean z = false;
        try {
            z = this.remoteFileSystem.exists(SegmentPartitionFile.getSegmentPath(this.baseRemoteStoragePath, tieredStoragePartitionId, tieredStorageSubpartitionId.getSubpartitionId(), i));
            this.currentRetryTime = 0;
        } catch (Throwable th) {
            this.currentRetryTime++;
            tryThrowException(th, "Failed to check the status of segment file.");
        }
        return z;
    }

    private void tryThrowException(Throwable th, String str) {
        LOG.warn(str);
        if (this.currentRetryTime > 100) {
            throw new RuntimeException(str, th);
        }
    }
}
