package org.apache.flink.runtime.checkpoint;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.runtime.checkpoint.Checkpoint;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointsCleaner.class */
public class CheckpointsCleaner implements Serializable, AutoCloseableAsync {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointsCleaner.class);
    private static final long serialVersionUID = 2545865801947537790L;
    private final boolean parallelMode;
    private final Object lock;

    @GuardedBy("lock")
    private int numberOfCheckpointsToClean;

    @GuardedBy("lock")
    @Nullable
    private CompletableFuture<Void> cleanUpFuture;

    @GuardedBy("lock")
    private final List<CompletedCheckpoint> subsumedCheckpoints;

    public CheckpointsCleaner() {
        this.lock = new Object();
        this.subsumedCheckpoints = new ArrayList();
        this.parallelMode = CheckpointingOptions.CLEANER_PARALLEL_MODE.defaultValue().booleanValue();
    }

    public CheckpointsCleaner(boolean z) {
        this.lock = new Object();
        this.subsumedCheckpoints = new ArrayList();
        this.parallelMode = z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumberOfCheckpointsToClean() {
        int i;
        synchronized (this.lock) {
            i = this.numberOfCheckpointsToClean;
        }
        return i;
    }

    public void cleanCheckpoint(Checkpoint checkpoint, boolean z, Runnable runnable, Executor executor) {
        CompletableFuture<Void> runAsync;
        LOG.debug("Clean checkpoint {} parallel-mode={} shouldDiscard={}", new Object[]{Long.valueOf(checkpoint.getCheckpointID()), Boolean.valueOf(this.parallelMode), Boolean.valueOf(z)});
        if (!z) {
            executor.execute(runnable);
            return;
        }
        incrementNumberOfCheckpointsToClean();
        Checkpoint.DiscardObject markAsDiscarded = checkpoint.markAsDiscarded();
        if (this.parallelMode) {
            runAsync = markAsDiscarded.discardAsync(executor);
        } else {
            Objects.requireNonNull(markAsDiscarded);
            runAsync = FutureUtils.runAsync(markAsDiscarded::discard, executor);
        }
        runAsync.handle((obj, th) -> {
            if (th != null) {
                LOG.warn("Could not properly discard completed checkpoint {}.", Long.valueOf(checkpoint.getCheckpointID()), th);
            }
            decrementNumberOfCheckpointsToClean();
            runnable.run();
            return null;
        });
    }

    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) {
        synchronized (this.lock) {
            this.subsumedCheckpoints.add(completedCheckpoint);
        }
    }

    public void cleanSubsumedCheckpoints(long j, Set<Long> set, Runnable runnable, Executor executor) {
        synchronized (this.lock) {
            Iterator<CompletedCheckpoint> it = this.subsumedCheckpoints.iterator();
            while (it.hasNext()) {
                CompletedCheckpoint next = it.next();
                if (next.getCheckpointID() < j && !set.contains(Long.valueOf(next.getCheckpointID()))) {
                    try {
                        LOG.debug("Try to discard checkpoint {}.", Long.valueOf(next.getCheckpointID()));
                        cleanCheckpoint(next, next.shouldBeDiscardedOnSubsume(), runnable, executor);
                        it.remove();
                    } catch (Exception e) {
                        LOG.warn("Fail to discard the old checkpoint {}.", next);
                    }
                }
            }
        }
    }

    public void cleanCheckpointOnFailedStoring(CompletedCheckpoint completedCheckpoint, Executor executor) {
        cleanCheckpoint(completedCheckpoint, true, () -> {
        }, executor);
    }

    private void incrementNumberOfCheckpointsToClean() {
        synchronized (this.lock) {
            Preconditions.checkState(this.cleanUpFuture == null, "CheckpointsCleaner has already been closed");
            this.numberOfCheckpointsToClean++;
        }
    }

    private void decrementNumberOfCheckpointsToClean() {
        synchronized (this.lock) {
            this.numberOfCheckpointsToClean--;
            maybeCompleteCloseUnsafe();
        }
    }

    private void maybeCompleteCloseUnsafe() {
        if (this.numberOfCheckpointsToClean != 0 || this.cleanUpFuture == null) {
            return;
        }
        this.cleanUpFuture.complete(null);
    }

    @Override // org.apache.flink.util.AutoCloseableAsync
    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> completableFuture;
        synchronized (this.lock) {
            if (this.cleanUpFuture == null) {
                this.cleanUpFuture = new CompletableFuture<>();
            }
            maybeCompleteCloseUnsafe();
            this.subsumedCheckpoints.clear();
            completableFuture = this.cleanUpFuture;
        }
        return completableFuture;
    }
}
