package org.apache.druid.server.coordinator.duty;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/druid/server/coordinator/duty/KillUnusedSegments.class */
public class KillUnusedSegments implements CoordinatorDuty {
    public static final String KILL_TASK_TYPE = "kill";
    public static final String TASK_ID_PREFIX = "coordinator-issued";
    public static final Predicate<TaskStatusPlus> IS_AUTO_KILL_TASK = taskStatusPlus -> {
        return null != taskStatusPlus && "kill".equals(taskStatusPlus.getType()) && taskStatusPlus.getId().startsWith(TASK_ID_PREFIX);
    };
    private static final Logger log = new Logger(KillUnusedSegments.class);
    private final long period;
    private final long retainDuration;
    private final boolean ignoreRetainDuration;
    private final int maxSegmentsToKill;
    private final Map<String, DateTime> datasourceToLastKillIntervalEnd;
    private long lastKillTime = 0;
    private final long bufferPeriod;
    private final SegmentsMetadataManager segmentsMetadataManager;
    private final OverlordClient overlordClient;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/druid/server/coordinator/duty/KillUnusedSegments$TaskStats.class */
    public static class TaskStats {
        int availableTaskSlots = 0;
        int maxSlots = 0;
        int submittedTasks = 0;

        TaskStats() {
        }
    }

    public KillUnusedSegments(SegmentsMetadataManager segmentsMetadataManager, OverlordClient overlordClient, DruidCoordinatorConfig druidCoordinatorConfig) {
        this.period = druidCoordinatorConfig.getCoordinatorKillPeriod().getMillis();
        Preconditions.checkArgument(this.period >= druidCoordinatorConfig.getCoordinatorIndexingPeriod().getMillis(), "coordinator kill period must be greater than or equal to druid.coordinator.period.indexingPeriod");
        this.ignoreRetainDuration = druidCoordinatorConfig.getCoordinatorKillIgnoreDurationToRetain();
        this.retainDuration = druidCoordinatorConfig.getCoordinatorKillDurationToRetain().getMillis();
        if (this.ignoreRetainDuration) {
            log.debug("druid.coordinator.kill.durationToRetain [%s] will be ignored when discovering segments to kill because you have set druid.coordinator.kill.ignoreDurationToRetain to True.", Long.valueOf(this.retainDuration));
        }
        this.bufferPeriod = druidCoordinatorConfig.getCoordinatorKillBufferPeriod().getMillis();
        this.maxSegmentsToKill = druidCoordinatorConfig.getCoordinatorKillMaxSegments();
        Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0");
        this.datasourceToLastKillIntervalEnd = new ConcurrentHashMap();
        Logger logger = log;
        Object[] objArr = new Object[4];
        objArr[0] = Long.valueOf(this.period);
        objArr[1] = this.ignoreRetainDuration ? "IGNORING" : Long.valueOf(this.retainDuration);
        objArr[2] = Long.valueOf(this.bufferPeriod);
        objArr[3] = Integer.valueOf(this.maxSegmentsToKill);
        logger.info("Kill Task scheduling enabled with period [%s], retainDuration [%s], bufferPeriod [%s], maxSegmentsToKill [%s]", objArr);
        this.segmentsMetadataManager = segmentsMetadataManager;
        this.overlordClient = overlordClient;
    }

    @Override // org.apache.druid.server.coordinator.duty.CoordinatorDuty
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams) {
        if (this.lastKillTime + this.period <= System.currentTimeMillis()) {
            return runInternal(druidCoordinatorRuntimeParams);
        }
        log.debug("Skipping kill of unused segments as kill period has not elapsed yet.", new Object[0]);
        return druidCoordinatorRuntimeParams;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams druidCoordinatorRuntimeParams) {
        TaskStats taskStats = new TaskStats();
        Set<String> specificDataSourcesToKillUnusedSegmentsIn = druidCoordinatorRuntimeParams.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
        int killTaskCapacity = getKillTaskCapacity(CoordinatorDutyUtils.getTotalWorkerCapacity(this.overlordClient), druidCoordinatorRuntimeParams.getCoordinatorDynamicConfig().getKillTaskSlotRatio(), druidCoordinatorRuntimeParams.getCoordinatorDynamicConfig().getMaxKillTaskSlots());
        int availableKillTaskSlots = getAvailableKillTaskSlots(killTaskCapacity, CoordinatorDutyUtils.getNumActiveTaskSlots(this.overlordClient, IS_AUTO_KILL_TASK).size());
        CoordinatorRunStats coordinatorStats = druidCoordinatorRuntimeParams.getCoordinatorStats();
        taskStats.availableTaskSlots = availableKillTaskSlots;
        taskStats.maxSlots = killTaskCapacity;
        if (0 < availableKillTaskSlots) {
            if (CollectionUtils.isNullOrEmpty(specificDataSourcesToKillUnusedSegmentsIn)) {
                specificDataSourcesToKillUnusedSegmentsIn = this.segmentsMetadataManager.retrieveAllDataSourceNames();
            }
            log.debug("Killing unused segments in datasources: %s", specificDataSourcesToKillUnusedSegmentsIn);
            this.lastKillTime = System.currentTimeMillis();
            taskStats.submittedTasks = killUnusedSegments(specificDataSourcesToKillUnusedSegmentsIn, availableKillTaskSlots);
        }
        this.datasourceToLastKillIntervalEnd.keySet().retainAll(specificDataSourcesToKillUnusedSegmentsIn);
        addStats(taskStats, coordinatorStats);
        return druidCoordinatorRuntimeParams;
    }

    private void addStats(TaskStats taskStats, CoordinatorRunStats coordinatorRunStats) {
        coordinatorRunStats.add(Stats.Kill.AVAILABLE_SLOTS, taskStats.availableTaskSlots);
        coordinatorRunStats.add(Stats.Kill.SUBMITTED_TASKS, taskStats.submittedTasks);
        coordinatorRunStats.add(Stats.Kill.MAX_SLOTS, taskStats.maxSlots);
    }

    private int killUnusedSegments(Collection<String> collection, int i) {
        int i2 = 0;
        if (0 < i && !CollectionUtils.isNullOrEmpty(collection)) {
            Iterator<String> it2 = collection.iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                }
                String next = it2.next();
                if (i2 >= i) {
                    log.debug(StringUtils.format("Submitted [%d] kill tasks and reached kill task slot limit [%d]. Will resume on the next coordinator cycle.", Integer.valueOf(i2), Integer.valueOf(i)), new Object[0]);
                    break;
                }
                Interval findIntervalForKill = findIntervalForKill(next);
                if (findIntervalForKill == null) {
                    this.datasourceToLastKillIntervalEnd.remove(next);
                } else {
                    try {
                        FutureUtils.getUnchecked(this.overlordClient.runKillTask(TASK_ID_PREFIX, next, findIntervalForKill, Integer.valueOf(this.maxSegmentsToKill)), true);
                        i2++;
                        this.datasourceToLastKillIntervalEnd.put(next, findIntervalForKill.getEnd());
                    } catch (Exception e) {
                        log.error(e, "Failed to submit kill task for dataSource [%s]", next);
                        if (Thread.currentThread().isInterrupted()) {
                            log.warn("skipping kill task scheduling because thread is interrupted.", new Object[0]);
                            break;
                        }
                    }
                }
            }
        }
        if (log.isDebugEnabled()) {
            Logger logger = log;
            Object[] objArr = new Object[3];
            objArr[0] = Integer.valueOf(i2);
            objArr[1] = Integer.valueOf(collection.size());
            objArr[2] = i < collection.size() ? StringUtils.format(" Datasources skipped: %s", ImmutableList.copyOf((Collection) collection).subList(i2, collection.size())) : "";
            logger.debug("Submitted [%d] kill tasks for [%d] datasources.%s", objArr);
        }
        return i2;
    }

    @Nullable
    private Interval findIntervalForKill(String str) {
        List<Interval> unusedSegmentIntervals = this.segmentsMetadataManager.getUnusedSegmentIntervals(str, this.datasourceToLastKillIntervalEnd.get(str), this.ignoreRetainDuration ? DateTimes.COMPARE_DATE_AS_STRING_MAX : DateTimes.nowUtc().minus(this.retainDuration), this.maxSegmentsToKill, DateTimes.nowUtc().minus(this.bufferPeriod));
        if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) {
            return null;
        }
        return unusedSegmentIntervals.size() == 1 ? unusedSegmentIntervals.get(0) : JodaUtils.umbrellaInterval(unusedSegmentIntervals);
    }

    private int getAvailableKillTaskSlots(int i, int i2) {
        return Math.max(0, i - i2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public static int getKillTaskCapacity(int i, double d, int i2) {
        return Math.min((int) (i * Math.min(d, 1.0d)), i2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public Map<String, DateTime> getDatasourceToLastKillIntervalEnd() {
        return this.datasourceToLastKillIntervalEnd;
    }
}
