/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.tasks;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.Timer;
import java.io.Serializable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.metrics.KafkaMetricsGroup;
import kafka.server.ReplicaManager;
import kafka.tier.TierDeletedPartitionsCoordinator;
import kafka.tier.TierReplicaManager;
import kafka.tier.fetcher.CancellationContext;
import kafka.tier.store.TierObjectStore;
import kafka.tier.tasks.ChangeManager;
import kafka.tier.tasks.TierTaskQueue;
import kafka.tier.tasks.TierTasks$;
import kafka.tier.tasks.TierTasksConfig;
import kafka.tier.tasks.archive.ArchiveTask;
import kafka.tier.tasks.archive.ArchiverTaskQueue;
import kafka.tier.tasks.archive.TierArchiver;
import kafka.tier.tasks.delete.DeletionTask;
import kafka.tier.tasks.delete.TierDeletionManager;
import kafka.tier.topic.TierTopicAppender;
import kafka.utils.ShutdownableThread;
import kafka.utils.ShutdownableThread$;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.Predef$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005Uh\u0001B\u0013'\u00015B\u0001\"\u0010\u0001\u0003\u0002\u0003\u0006IA\u0010\u0005\t\u0005\u0002\u0011\t\u0011)A\u0005\u0007\"A\u0011\n\u0001B\u0001B\u0003%!\n\u0003\u0005O\u0001\t\u0005\t\u0015!\u0003P\u0011!\u0011\u0006A!A!\u0002\u0013\u0019\u0006\u0002C-\u0001\u0005\u0003\u0005\u000b\u0011\u0002.\t\u0011\u0001\u0004!\u0011!Q\u0001\n\u0005DQ!\u001c\u0001\u0005\u00029DQa\u001e\u0001\u0005RaD\u0011\"!\u0004\u0001\u0001\u0004%I!a\u0004\t\u0013\u0005e\u0001\u00011A\u0005\n\u0005m\u0001\u0002CA\u0014\u0001\u0001\u0006K!!\u0005\t\u0013\u0005%\u0002A1A\u0005\n\u0005-\u0002\u0002CA\u001d\u0001\u0001\u0006I!!\f\t\u0013\u0005m\u0002A1A\u0005\n\u0005u\u0002\u0002CA*\u0001\u0001\u0006I!a\u0010\t\u0013\u0005U\u0003A1A\u0005\f\u0005]\u0003\u0002CA2\u0001\u0001\u0006I!!\u0017\t\u0013\u0005\u0015\u0004A1A\u0005\n\u0005\u001d\u0004\u0002CA;\u0001\u0001\u0006I!!\u001b\t\u0013\u0005]\u0004A1A\u0005\n\u0005e\u0004\u0002CAD\u0001\u0001\u0006I!a\u001f\t\u0013\u0005%\u0005A1A\u0005\n\u0005-\u0005\u0002CAJ\u0001\u0001\u0006I!!$\t\u0013\u0005U\u0005A1A\u0005\n\u0005]\u0005\u0002CAX\u0001\u0001\u0006I!!'\t\u000f\u0005E\u0006\u0001\"\u0011\u00024\"9\u0011Q\u0017\u0001\u0005\u0002\u0005M\u0006bBA\\\u0001\u0011\u0005\u00131\u0017\u0005\b\u0003s\u0003A\u0011AA^\u000f\u001d\t\u0019M\nE\u0001\u0003\u000b4a!\n\u0014\t\u0002\u0005\u001d\u0007BB7!\t\u0003\ty\rC\u0005\u0002R\u0002\u0012\r\u0011\"\u0002\u0002T\"A\u00111\u001c\u0011!\u0002\u001b\t)\u000eC\u0005\u0002^\u0002\n\n\u0011\"\u0001\u0002`\nIA+[3s)\u0006\u001c8n\u001d\u0006\u0003O!\nQ\u0001^1tWNT!!\u000b\u0016\u0002\tQLWM\u001d\u0006\u0002W\u0005)1.\u00194lC\u000e\u00011\u0003\u0002\u0001/ii\u0002\"a\f\u001a\u000e\u0003AR!!\r\u0016\u0002\u000bU$\u0018\u000e\\:\n\u0005M\u0002$AE*ikR$wn\u001e8bE2,G\u000b\u001b:fC\u0012\u0004\"!\u000e\u001d\u000e\u0003YR!a\u000e\u0016\u0002\u000f5,GO]5dg&\u0011\u0011H\u000e\u0002\u0012\u0017\u000647.Y'fiJL7m]$s_V\u0004\bCA\u0018<\u0013\ta\u0004GA\u0004M_\u001e<\u0017N\\4\u0002\r\r|gNZ5h!\ty\u0004)D\u0001'\u0013\t\teEA\bUS\u0016\u0014H+Y:lg\u000e{gNZ5h\u00039\u0011X\r\u001d7jG\u0006l\u0015M\\1hKJ\u0004\"\u0001R$\u000e\u0003\u0015S!A\u0012\u0016\u0002\rM,'O^3s\u0013\tAUI\u0001\bSKBd\u0017nY1NC:\fw-\u001a:\u0002%QLWM\u001d*fa2L7-Y'b]\u0006<WM\u001d\t\u0003\u00172k\u0011\u0001K\u0005\u0003\u001b\"\u0012!\u0003V5feJ+\u0007\u000f\\5dC6\u000bg.Y4fe\u0006\u0001C/[3s\t\u0016dW\r^3e!\u0006\u0014H/\u001b;j_:\u001c8i\\8sI&t\u0017\r^8s!\tY\u0005+\u0003\u0002RQ\t\u0001C+[3s\t\u0016dW\r^3e!\u0006\u0014H/\u001b;j_:\u001c8i\\8sI&t\u0017\r^8s\u0003E!\u0018.\u001a:U_BL7-\u00119qK:$WM\u001d\t\u0003)^k\u0011!\u0016\u0006\u0003-\"\nQ\u0001^8qS\u000eL!\u0001W+\u0003#QKWM\u001d+pa&\u001c\u0017\t\u001d9f]\u0012,'/A\buS\u0016\u0014xJ\u00196fGR\u001cFo\u001c:f!\tYf,D\u0001]\u0015\ti\u0006&A\u0003ti>\u0014X-\u0003\u0002`9\nyA+[3s\u001f\nTWm\u0019;Ti>\u0014X-\u0001\u0003uS6,\u0007C\u00012l\u001b\u0005\u0019'BA\u0019e\u0015\t)g-\u0001\u0004d_6lwN\u001c\u0006\u0003W\u001dT!\u0001[5\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005Q\u0017aA8sO&\u0011An\u0019\u0002\u0005)&lW-\u0001\u0004=S:LGO\u0010\u000b\t_B\f(o\u001d;vmB\u0011q\b\u0001\u0005\u0006{!\u0001\rA\u0010\u0005\u0006\u0005\"\u0001\ra\u0011\u0005\u0006\u0013\"\u0001\rA\u0013\u0005\u0006\u001d\"\u0001\ra\u0014\u0005\u0006%\"\u0001\ra\u0015\u0005\u00063\"\u0001\rA\u0017\u0005\bA\"\u0001\n\u00111\u0001b\u0003)awnZ4fe:\u000bW.Z\u000b\u0002sB\u0019!0a\u0002\u000f\u0007m\f\u0019\u0001\u0005\u0002}\u007f6\tQP\u0003\u0002\u007fY\u00051AH]8pizR!!!\u0001\u0002\u000bM\u001c\u0017\r\\1\n\u0007\u0005\u0015q0\u0001\u0004Qe\u0016$WMZ\u0005\u0005\u0003\u0013\tYA\u0001\u0004TiJLgn\u001a\u0006\u0004\u0003\u000by\u0018A\u00057bgRd\u0015m\u001a)sS:$H+[7f\u001bN,\"!!\u0005\u0011\t\u0005M\u0011QC\u0007\u0002\u007f&\u0019\u0011qC@\u0003\t1{gnZ\u0001\u0017Y\u0006\u001cH\u000fT1h!JLg\u000e\u001e+j[\u0016l5o\u0018\u0013fcR!\u0011QDA\u0012!\u0011\t\u0019\"a\b\n\u0007\u0005\u0005rP\u0001\u0003V]&$\b\"CA\u0013\u0017\u0005\u0005\t\u0019AA\t\u0003\rAH%M\u0001\u0014Y\u0006\u001cH\u000fT1h!JLg\u000e\u001e+j[\u0016l5\u000fI\u0001\u0004GRDXCAA\u0017!\u0011\ty#!\u000e\u000e\u0005\u0005E\"bAA\u001aQ\u00059a-\u001a;dQ\u0016\u0014\u0018\u0002BA\u001c\u0003c\u00111cQ1oG\u0016dG.\u0019;j_:\u001cuN\u001c;fqR\fAa\u0019;yA\u0005AQ\r_3dkR|'/\u0006\u0002\u0002@A!\u0011\u0011IA(\u001b\t\t\u0019E\u0003\u0003\u0002F\u0005\u001d\u0013AC2p]\u000e,(O]3oi*!\u0011\u0011JA&\u0003\u0011)H/\u001b7\u000b\u0005\u00055\u0013\u0001\u00026bm\u0006LA!!\u0015\u0002D\tyQ\t_3dkR|'oU3sm&\u001cW-A\u0005fq\u0016\u001cW\u000f^8sA\u0005!\u0001o\\8m+\t\tI\u0006\u0005\u0003\u0002\\\u0005}SBAA/\u0015\r\t)e`\u0005\u0005\u0003C\niF\u0001\rFq\u0016\u001cW\u000f^5p]\u000e{g\u000e^3yi\u0016CXmY;u_J\fQ\u0001]8pY\u0002\nA\u0002^5fe\u0006\u00138\r[5wKJ,\"!!\u001b\u0011\t\u0005-\u0014\u0011O\u0007\u0003\u0003[R1!a\u001c'\u0003\u001d\t'o\u00195jm\u0016LA!a\u001d\u0002n\taA+[3s\u0003J\u001c\u0007.\u001b<fe\u0006iA/[3s\u0003J\u001c\u0007.\u001b<fe\u0002\n1\u0003^5fe\u0012+G.\u001a;j_:l\u0015M\\1hKJ,\"!a\u001f\u0011\t\u0005u\u00141Q\u0007\u0003\u0003\u007fR1!!!'\u0003\u0019!W\r\\3uK&!\u0011QQA@\u0005M!\u0016.\u001a:EK2,G/[8o\u001b\u0006t\u0017mZ3s\u0003Q!\u0018.\u001a:EK2,G/[8o\u001b\u0006t\u0017mZ3sA\u0005i1\r[1oO\u0016l\u0015M\\1hKJ,\"!!$\u0011\u0007}\ny)C\u0002\u0002\u0012\u001a\u0012Qb\u00115b]\u001e,W*\u00198bO\u0016\u0014\u0018AD2iC:<W-T1oC\u001e,'\u000fI\u0001\u0010Gf\u001cG.\u001a+j[\u0016lU\r\u001e:jGV\u0011\u0011\u0011\u0014\t\u0005\u00037\u000bY+\u0004\u0002\u0002\u001e*!\u0011qTAQ\u0003\u0011\u0019wN]3\u000b\u0007]\n\u0019K\u0003\u0003\u0002&\u0006\u001d\u0016AB=b[6,'O\u0003\u0002\u0002*\u0006\u00191m\\7\n\t\u00055\u0016Q\u0014\u0002\u0006\u001b\u0016$XM]\u0001\u0011Gf\u001cG.\u001a+j[\u0016lU\r\u001e:jG\u0002\na\u0001Z8X_J\\GCAA\u000f\u0003mi\u0017-\u001f2f\u0019><G+[3s\u0003J\u001c\u0007.\u001b<fe2\u000bw-\u00138g_\u0006A1\u000f[;uI><h.A\tbe\u000eD\u0017N^3s)\u0006\u001c8.U;fk\u0016,\"!!0\u0011\t\u0005-\u0014qX\u0005\u0005\u0003\u0003\fiGA\tBe\u000eD\u0017N^3s)\u0006\u001c8.U;fk\u0016\f\u0011\u0002V5feR\u000b7o[:\u0011\u0005}\u00023c\u0001\u0011\u0002JB!\u00111CAf\u0013\r\tim \u0002\u0007\u0003:L(+\u001a4\u0015\u0005\u0005\u0015\u0017a\u0005)F%&{E)S\"`\u0019>;u\fT!H?6\u001bVCAAk!\u0011\t\u0019\"a6\n\u0007\u0005ewPA\u0002J]R\fA\u0003U#S\u0013>#\u0015jQ0M\u001f\u001e{F*Q$`\u001bN\u0003\u0013a\u0007\u0013mKN\u001c\u0018N\\5uI\u001d\u0014X-\u0019;fe\u0012\"WMZ1vYR$s'\u0006\u0002\u0002b*\u001a\u0011-a9,\u0005\u0005\u0015\b\u0003BAt\u0003cl!!!;\u000b\t\u0005-\u0018Q^\u0001\nk:\u001c\u0007.Z2lK\u0012T1!a<\u0000\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0003g\fIOA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\u0004")
public class TierTasks
extends ShutdownableThread
implements KafkaMetricsGroup {
    private final TierTasksConfig config;
    private final TierTopicAppender tierTopicAppender;
    private final Time time;
    private long lastLagPrintTimeMs;
    private final CancellationContext ctx;
    private final ExecutorService executor;
    private final ExecutionContextExecutor pool;
    private final TierArchiver kafka$tier$tasks$TierTasks$$tierArchiver;
    private final TierDeletionManager tierDeletionManager;
    private final ChangeManager changeManager;
    private final Meter cycleTimeMetric;

    public static Time $lessinit$greater$default$7() {
        return TierTasks$.MODULE$.$lessinit$greater$default$7();
    }

    public static int PERIODIC_LOG_LAG_MS() {
        return TierTasks$.MODULE$.PERIODIC_LOG_LAG_MS();
    }

    @Override
    public MetricName metricName(String name, Map<String, String> tags) {
        return KafkaMetricsGroup.metricName$(this, name, tags);
    }

    @Override
    public MetricName explicitMetricName(String group, String typeName, String name, Map<String, String> tags) {
        return KafkaMetricsGroup.explicitMetricName$(this, group, typeName, name, tags);
    }

    @Override
    public <T> Gauge<T> newGauge(String name, Gauge<T> metric, Map<String, String> tags) {
        return KafkaMetricsGroup.newGauge$(this, name, metric, tags);
    }

    @Override
    public <T> Map<String, String> newGauge$default$3() {
        return KafkaMetricsGroup.newGauge$default$3$(this);
    }

    @Override
    public Meter newMeter(String name, String eventType, TimeUnit timeUnit, Map<String, String> tags) {
        return KafkaMetricsGroup.newMeter$(this, name, eventType, timeUnit, tags);
    }

    @Override
    public Map<String, String> newMeter$default$4() {
        return KafkaMetricsGroup.newMeter$default$4$(this);
    }

    @Override
    public Histogram newHistogram(String name, boolean biased, Map<String, String> tags) {
        return KafkaMetricsGroup.newHistogram$(this, name, biased, tags);
    }

    @Override
    public boolean newHistogram$default$2() {
        return KafkaMetricsGroup.newHistogram$default$2$(this);
    }

    @Override
    public Map<String, String> newHistogram$default$3() {
        return KafkaMetricsGroup.newHistogram$default$3$(this);
    }

    @Override
    public Timer newTimer(String name, TimeUnit durationUnit, TimeUnit rateUnit, Map<String, String> tags) {
        return KafkaMetricsGroup.newTimer$(this, name, durationUnit, rateUnit, tags);
    }

    @Override
    public Map<String, String> newTimer$default$4() {
        return KafkaMetricsGroup.newTimer$default$4$(this);
    }

    @Override
    public void removeMetric(String name, Map<String, String> tags) {
        KafkaMetricsGroup.removeMetric$(this, name, tags);
    }

    @Override
    public Map<String, String> removeMetric$default$2() {
        return KafkaMetricsGroup.removeMetric$default$2$(this);
    }

    @Override
    public String loggerName() {
        return TierTasks.class.getName();
    }

    private long lastLagPrintTimeMs() {
        return this.lastLagPrintTimeMs;
    }

    private void lastLagPrintTimeMs_$eq(long x$1) {
        this.lastLagPrintTimeMs = x$1;
    }

    private CancellationContext ctx() {
        return this.ctx;
    }

    private ExecutorService executor() {
        return this.executor;
    }

    private ExecutionContextExecutor pool() {
        return this.pool;
    }

    public TierArchiver kafka$tier$tasks$TierTasks$$tierArchiver() {
        return this.kafka$tier$tasks$TierTasks$$tierArchiver;
    }

    private TierDeletionManager tierDeletionManager() {
        return this.tierDeletionManager;
    }

    private ChangeManager changeManager() {
        return this.changeManager;
    }

    private Meter cycleTimeMetric() {
        return this.cycleTimeMetric;
    }

    @Override
    public void doWork() {
        if (!this.tierTopicAppender.isReady()) {
            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "TierTopicAppender is not ready. Backing off.");
            while (!this.tierTopicAppender.isReady() && this.isRunning()) {
                Thread.sleep(this.config.mainLoopBackoffMs());
            }
            if (!this.isRunning()) {
                return;
            }
        }
        this.cycleTimeMetric().mark();
        this.changeManager().process();
        List<Future<ArchiveTask>> archiverFutures = this.kafka$tier$tasks$TierTasks$$tierArchiver().doWork();
        List<Future<DeletionTask>> deletionFutures = this.tierDeletionManager().doWork();
        this.maybeLogTierArchiverLagInfo();
        List futures = (List)archiverFutures.$plus$plus(deletionFutures, List$.MODULE$.canBuildFrom());
        if (this.kafka$tier$tasks$TierTasks$$tierArchiver().taskQueue().taskCount() == 0 && this.tierDeletionManager().taskQueue().taskCount() == 0) {
            this.changeManager().processAtLeastOne();
            return;
        }
        if (futures.isEmpty()) {
            Thread.sleep(this.config.mainLoopBackoffMs());
            return;
        }
        if (futures.size() >= this.config.numThreads()) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "working set is full, blocking until a task completes");
            Await$.MODULE$.ready((Awaitable)Future$.MODULE$.firstCompletedOf((TraversableOnce)futures, (ExecutionContext)this.pool()), (Duration)new package.DurationInt(package$.MODULE$.DurationInt(Integer.MAX_VALUE)).seconds());
            return;
        }
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "working set is not full, attempting to complete at least one future");
        try {
            Await$.MODULE$.ready((Awaitable)Future$.MODULE$.firstCompletedOf((TraversableOnce)futures, (ExecutionContext)this.pool()), (Duration)new package.DurationInt(package$.MODULE$.DurationInt(this.config.updateIntervalMs())).milliseconds());
            return;
        }
        catch (TimeoutException timeoutException) {
            return;
        }
    }

    public void maybeLogTierArchiverLagInfo() {
        if (this.time.milliseconds() > this.lastLagPrintTimeMs() + (long)TierTasks$.MODULE$.PERIODIC_LOG_LAG_MS()) {
            this.kafka$tier$tasks$TierTasks$$tierArchiver().logPartitionLagInfo();
            this.lastLagPrintTimeMs_$eq(this.time.milliseconds());
        }
    }

    @Override
    public void shutdown() {
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "shutting down");
        this.initiateShutdown();
        this.ctx().cancel();
        this.changeManager().close();
        this.kafka$tier$tasks$TierTasks$$tierArchiver().shutdown();
        this.tierDeletionManager().shutdown();
        this.executor().shutdownNow();
        this.awaitShutdown();
    }

    public ArchiverTaskQueue archiverTaskQueue() {
        return this.kafka$tier$tasks$TierTasks$$tierArchiver().taskQueue();
    }

    public TierTasks(TierTasksConfig config, ReplicaManager replicaManager, TierReplicaManager tierReplicaManager, TierDeletedPartitionsCoordinator tierDeletedPartitionsCoordinator, TierTopicAppender tierTopicAppender, TierObjectStore tierObjectStore, Time time) {
        this.config = config;
        this.tierTopicAppender = tierTopicAppender;
        this.time = time;
        super("tier-tasks", ShutdownableThread$.MODULE$.$lessinit$greater$default$2());
        KafkaMetricsGroup.$init$(this);
        this.lastLagPrintTimeMs = time.milliseconds();
        this.ctx = CancellationContext.newContext();
        this.executor = Executors.newFixedThreadPool(config.numThreads(), new ThreadFactory(null){
            private final AtomicInteger threadNum;

            private AtomicInteger threadNum() {
                return this.threadNum;
            }

            public Thread newThread(Runnable r) {
                int newThreadNum = this.threadNum().incrementAndGet();
                String nonDaemon_name = new StringBuilder(9).append("TierTask-").append(newThreadNum).toString();
                return new KafkaThread(nonDaemon_name, r, false);
            }
            {
                this.threadNum = new AtomicInteger(-1);
            }
        });
        this.pool = ExecutionContext$.MODULE$.fromExecutor((Executor)this.executor());
        this.kafka$tier$tasks$TierTasks$$tierArchiver = new TierArchiver(config, replicaManager, tierTopicAppender, tierObjectStore, this.ctx().subContext(), config.numThreads(), time, (ExecutionContext)this.pool());
        this.tierDeletionManager = new TierDeletionManager(replicaManager, tierTopicAppender, tierObjectStore, this.ctx().subContext(), config.numThreads(), config.logCleanupIntervalMs(), config.maxRetryBackoffMs(), time, (ExecutionContext)this.pool());
        this.changeManager = new ChangeManager(this.ctx().subContext(), (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TierTaskQueue[]{this.kafka$tier$tasks$TierTasks$$tierArchiver().taskQueue(), this.tierDeletionManager().taskQueue()})), time);
        this.removeMetric("CyclesPerSec", this.removeMetric$default$2());
        this.removeMetric("PartitionsInError", this.removeMetric$default$2());
        this.cycleTimeMetric = this.newMeter("CyclesPerSec", "tier tasks cycles per second", TimeUnit.SECONDS, this.newMeter$default$4());
        this.newGauge("NumPartitionsInError", new Gauge<Object>(this){
            private final /* synthetic */ TierTasks $outer;

            public int value() {
                return this.$outer.kafka$tier$tasks$TierTasks$$tierArchiver().taskQueue().errorPartitionCount();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        }, (Map<String, String>)((Map)Predef$.MODULE$.Map().apply((Seq)Nil$.MODULE$)));
        tierReplicaManager.addListener(this.changeManager());
        tierDeletedPartitionsCoordinator.registerListener(this.changeManager());
        if (Predef$.MODULE$ == null) {
            throw null;
        }
    }
}

