package org.apache.flink.state.rocksdb.sstmerge;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.shaded.guava32.com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/state/rocksdb/sstmerge/CompactionScheduler.class */
class CompactionScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(CompactionScheduler.class);
    private final ScheduledExecutorService scheduledExecutor;
    private final ExecutorService ioExecutor;
    private final long checkPeriodMs;
    private final CompactionTracker tracker;
    private final Compactor compactor;
    private final CompactionTaskProducer taskProducer;
    private final Object lock;
    private boolean running;

    public CompactionScheduler(RocksDBManualCompactionConfig rocksDBManualCompactionConfig, ExecutorService executorService, CompactionTaskProducer compactionTaskProducer, Compactor compactor, CompactionTracker compactionTracker) {
        this(rocksDBManualCompactionConfig, executorService, compactionTaskProducer, compactor, compactionTracker, Executors.newSingleThreadScheduledExecutor());
    }

    public CompactionScheduler(RocksDBManualCompactionConfig rocksDBManualCompactionConfig, ExecutorService executorService, CompactionTaskProducer compactionTaskProducer, Compactor compactor, CompactionTracker compactionTracker, ScheduledExecutorService scheduledExecutorService) {
        this.lock = new Object();
        this.running = true;
        this.ioExecutor = executorService;
        this.scheduledExecutor = scheduledExecutorService;
        this.checkPeriodMs = rocksDBManualCompactionConfig.minInterval;
        this.tracker = compactionTracker;
        this.compactor = compactor;
        this.taskProducer = compactionTaskProducer;
    }

    public void start() {
        scheduleScan();
    }

    public void stop() throws InterruptedException {
        synchronized (this.lock) {
            if (this.running) {
                this.running = false;
                this.scheduledExecutor.shutdownNow();
            }
        }
        if (Uninterruptibles.awaitTerminationUninterruptibly(this.scheduledExecutor, 5L, TimeUnit.SECONDS)) {
            return;
        }
        LOG.warn("Unable to terminate scheduled tasks in 5s");
    }

    public void scheduleScan() {
        synchronized (this.lock) {
            if (this.running) {
                LOG.trace("Schedule SST scan in {} ms", Long.valueOf(this.checkPeriodMs));
                this.scheduledExecutor.schedule(() -> {
                    this.ioExecutor.execute(this::maybeScan);
                }, this.checkPeriodMs, TimeUnit.MILLISECONDS);
            } else {
                LOG.debug("Not scheduling next scan: shutting down");
            }
        }
    }

    public void maybeScan() {
        LOG.trace("Starting SST scan");
        if (this.tracker.haveManualCompactions() || this.tracker.isShuttingDown()) {
            LOG.trace("Skip SST scan {}", this.tracker);
            return;
        }
        List<CompactionTask> scan = scan();
        LOG.trace("SST scan resulted in targets {}", scan);
        if (scan.isEmpty()) {
            scheduleScan();
            return;
        }
        for (CompactionTask compactionTask : scan) {
            this.ioExecutor.execute(() -> {
                this.tracker.runWithTracking(compactionTask.columnFamilyHandle, () -> {
                    this.compactor.compact(compactionTask.columnFamilyHandle, compactionTask.level, compactionTask.files);
                }, this::scheduleScan);
            });
        }
    }

    private List<CompactionTask> scan() {
        try {
            return this.taskProducer.produce();
        } catch (Exception e) {
            LOG.warn("Unable to scan for compaction targets", e);
            return Collections.emptyList();
        }
    }
}
