/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.scheduler;

import java.io.Serializable;
import org.apache.spark.ExecutorAllocationClient;
import org.apache.spark.SparkConf;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.config.ConfigEntry;
import org.apache.spark.internal.config.Streaming$;
import org.apache.spark.resource.ResourceProfile$;
import org.apache.spark.scheduler.ExecutorDecommissionInfo;
import org.apache.spark.streaming.scheduler.ExecutorAllocationManager$;
import org.apache.spark.streaming.scheduler.OutputOperationInfo;
import org.apache.spark.streaming.scheduler.ReceiverTracker;
import org.apache.spark.streaming.scheduler.StreamingListener;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationCompleted;
import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
import org.apache.spark.streaming.scheduler.StreamingListenerStreamingStarted;
import org.apache.spark.streaming.util.RecurringTimer;
import org.apache.spark.util.Clock;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;
import scala.util.Random$;

@ScalaSignature(bytes="\u0006\u0005\u0005Ud!\u0002\u0014(\u0001%\n\u0004\u0002\u0003\"\u0001\u0005\u0003\u0005\u000b\u0011\u0002#\t\u0011!\u0003!\u0011!Q\u0001\n%C\u0001\u0002\u0014\u0001\u0003\u0002\u0003\u0006I!\u0014\u0005\t!\u0002\u0011\t\u0011)A\u0005#\"AA\u000b\u0001B\u0001B\u0003%Q\u000bC\u0003\\\u0001\u0011\u0005A\fC\u0004d\u0001\t\u0007I\u0011\u00023\t\r\u0015\u0004\u0001\u0015!\u0003R\u0011\u001d1\u0007A1A\u0005\n\u001dDaa\u001b\u0001!\u0002\u0013A\u0007b\u00027\u0001\u0005\u0004%Ia\u001a\u0005\u0007[\u0002\u0001\u000b\u0011\u00025\t\u000f9\u0004!\u0019!C\u0005_\"11\u000f\u0001Q\u0001\nADq\u0001\u001e\u0001C\u0002\u0013%q\u000e\u0003\u0004v\u0001\u0001\u0006I\u0001\u001d\u0005\bm\u0002\u0011\r\u0011\"\u0003x\u0011\u0019i\b\u0001)A\u0005q\"9a\u0010\u0001a\u0001\n\u0013!\u0007\u0002C@\u0001\u0001\u0004%I!!\u0001\t\u000f\u00055\u0001\u0001)Q\u0005#\"A\u0011q\u0003\u0001A\u0002\u0013%q\u000eC\u0005\u0002\u001a\u0001\u0001\r\u0011\"\u0003\u0002\u001c!9\u0011q\u0004\u0001!B\u0013\u0001\bbBA\u0012\u0001\u0011\u0005\u0011Q\u0005\u0005\b\u0003O\u0001A\u0011AA\u0013\u0011\u001d\tI\u0003\u0001C\u0005\u0003KAq!a\u000b\u0001\t\u0013\ti\u0003C\u0004\u00024\u0001!I!!\n\t\u000f\u0005U\u0002\u0001\"\u0003\u00028!9\u0011Q\b\u0001\u0005\n\u0005\u0015\u0002bBA \u0001\u0011\u0005\u0013\u0011I\u0004\t\u0003\u001b:\u0003\u0012A\u0015\u0002P\u00199ae\nE\u0001S\u0005E\u0003BB.#\t\u0003\t\u0019\u0006C\u0004\u0002V\t\"\t!a\u0016\t\u000f\u0005\u0005$\u0005\"\u0001\u0002d\tIR\t_3dkR|'/\u00117m_\u000e\fG/[8o\u001b\u0006t\u0017mZ3s\u0015\tA\u0013&A\u0005tG\",G-\u001e7fe*\u0011!fK\u0001\ngR\u0014X-Y7j]\u001eT!\u0001L\u0017\u0002\u000bM\u0004\u0018M]6\u000b\u00059z\u0013AB1qC\u000eDWMC\u00011\u0003\ry'oZ\n\u0005\u0001IBD\b\u0005\u00024m5\tAGC\u00016\u0003\u0015\u00198-\u00197b\u0013\t9DG\u0001\u0004B]f\u0014VM\u001a\t\u0003sij\u0011aJ\u0005\u0003w\u001d\u0012\u0011c\u0015;sK\u0006l\u0017N\\4MSN$XM\\3s!\ti\u0004)D\u0001?\u0015\ty4&\u0001\u0005j]R,'O\\1m\u0013\t\teHA\u0004M_\u001e<\u0017N\\4\u0002\r\rd\u0017.\u001a8u\u0007\u0001\u0001\"!\u0012$\u000e\u0003-J!aR\u0016\u00031\u0015CXmY;u_J\fE\u000e\\8dCRLwN\\\"mS\u0016tG/A\bsK\u000e,\u0017N^3s)J\f7m[3s!\tI$*\u0003\u0002LO\ty!+Z2fSZ,'\u000f\u0016:bG.,'/\u0001\u0003d_:4\u0007CA#O\u0013\ty5FA\u0005Ta\u0006\u00148nQ8oM\u0006y!-\u0019;dQ\u0012+(/\u0019;j_:l5\u000f\u0005\u00024%&\u00111\u000b\u000e\u0002\u0005\u0019>tw-A\u0003dY>\u001c7\u000e\u0005\u0002W36\tqK\u0003\u0002YW\u0005!Q\u000f^5m\u0013\tQvKA\u0003DY>\u001c7.\u0001\u0004=S:LGO\u0010\u000b\u0007;z{\u0006-\u00192\u0011\u0005e\u0002\u0001\"\u0002\"\u0007\u0001\u0004!\u0005\"\u0002%\u0007\u0001\u0004I\u0005\"\u0002'\u0007\u0001\u0004i\u0005\"\u0002)\u0007\u0001\u0004\t\u0006\"\u0002+\u0007\u0001\u0004)\u0016aE:dC2LgnZ%oi\u0016\u0014h/\u00197TK\u000e\u001cX#A)\u0002)M\u001c\u0017\r\\5oO&sG/\u001a:wC2\u001cVmY:!\u00039\u00198-\u00197j]\u001e,\u0006OU1uS>,\u0012\u0001\u001b\t\u0003g%L!A\u001b\u001b\u0003\r\u0011{WO\u00197f\u0003=\u00198-\u00197j]\u001e,\u0006OU1uS>\u0004\u0013\u0001E:dC2Lgn\u001a#po:\u0014\u0016\r^5p\u0003E\u00198-\u00197j]\u001e$un\u001e8SCRLw\u000eI\u0001\u0010[&tg*^7Fq\u0016\u001cW\u000f^8sgV\t\u0001\u000f\u0005\u00024c&\u0011!\u000f\u000e\u0002\u0004\u0013:$\u0018\u0001E7j]:+X.\u0012=fGV$xN]:!\u0003=i\u0017\r\u001f(v[\u0016CXmY;u_J\u001c\u0018\u0001E7bq:+X.\u0012=fGV$xN]:!\u0003\u0015!\u0018.\\3s+\u0005A\bCA=|\u001b\u0005Q(B\u0001-*\u0013\ta(P\u0001\bSK\u000e,(O]5oORKW.\u001a:\u0002\rQLW.\u001a:!\u0003A\u0011\u0017\r^2i!J|7\rV5nKN+X.\u0001\u000bcCR\u001c\u0007\u000e\u0015:pGRKW.Z*v[~#S-\u001d\u000b\u0005\u0003\u0007\tI\u0001E\u00024\u0003\u000bI1!a\u00025\u0005\u0011)f.\u001b;\t\u0011\u0005-A#!AA\u0002E\u000b1\u0001\u001f\u00132\u0003E\u0011\u0017\r^2i!J|7\rV5nKN+X\u000e\t\u0015\u0004+\u0005E\u0001cA\u001a\u0002\u0014%\u0019\u0011Q\u0003\u001b\u0003\u0011Y|G.\u0019;jY\u0016\f!CY1uG\"\u0004&o\\2US6,7i\\;oi\u00061\"-\u0019;dQB\u0013xn\u0019+j[\u0016\u001cu.\u001e8u?\u0012*\u0017\u000f\u0006\u0003\u0002\u0004\u0005u\u0001\u0002CA\u0006/\u0005\u0005\t\u0019\u00019\u0002'\t\fGo\u00195Qe>\u001cG+[7f\u0007>,h\u000e\u001e\u0011)\u0007a\t\t\"A\u0003ti\u0006\u0014H\u000f\u0006\u0002\u0002\u0004\u0005!1\u000f^8q\u0003Ai\u0017M\\1hK\u0006cGn\\2bi&|g.\u0001\tsKF,Xm\u001d;Fq\u0016\u001cW\u000f^8sgR!\u00111AA\u0018\u0011\u0019\t\t\u0004\ba\u0001a\u0006ya.^7OK^,\u00050Z2vi>\u00148/\u0001\u0007lS2dW\t_3dkR|'/\u0001\tbI\u0012\u0014\u0015\r^2i!J|7\rV5nKR!\u00111AA\u001d\u0011\u0019\tYD\ba\u0001#\u00061A/[7f\u001bN\f\u0001C^1mS\u0012\fG/Z*fiRLgnZ:\u0002!=t')\u0019;dQ\u000e{W\u000e\u001d7fi\u0016$G\u0003BA\u0002\u0003\u0007Bq!!\u0012!\u0001\u0004\t9%\u0001\bcCR\u001c\u0007nQ8na2,G/\u001a3\u0011\u0007e\nI%C\u0002\u0002L\u001d\u0012qd\u0015;sK\u0006l\u0017N\\4MSN$XM\\3s\u0005\u0006$8\r[\"p[BdW\r^3e\u0003e)\u00050Z2vi>\u0014\u0018\t\u001c7pG\u0006$\u0018n\u001c8NC:\fw-\u001a:\u0011\u0005e\u00123c\u0001\u00123yQ\u0011\u0011qJ\u0001\u001bSN$\u0015P\\1nS\u000e\fE\u000e\\8dCRLwN\\#oC\ndW\r\u001a\u000b\u0005\u00033\ny\u0006E\u00024\u00037J1!!\u00185\u0005\u001d\u0011un\u001c7fC:DQ\u0001\u0014\u0013A\u00025\u000bqb\u0019:fCR,\u0017JZ#oC\ndW\r\u001a\u000b\r\u0003K\nY'!\u001c\u0002p\u0005E\u00141\u000f\t\u0005g\u0005\u001dT,C\u0002\u0002jQ\u0012aa\u00149uS>t\u0007\"\u0002\"&\u0001\u0004!\u0005\"\u0002%&\u0001\u0004I\u0005\"\u0002'&\u0001\u0004i\u0005\"\u0002)&\u0001\u0004\t\u0006\"\u0002+&\u0001\u0004)\u0006")
public class ExecutorAllocationManager
implements StreamingListener,
Logging {
    private final ExecutorAllocationClient client;
    private final ReceiverTracker receiverTracker;
    private final SparkConf conf;
    private final long batchDurationMs;
    private final long scalingIntervalSecs;
    private final double scalingUpRatio;
    private final double scalingDownRatio;
    private final int minNumExecutors;
    private final int maxNumExecutors;
    private final RecurringTimer timer;
    private volatile long batchProcTimeSum;
    private volatile int batchProcTimeCount;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static Option<ExecutorAllocationManager> createIfEnabled(ExecutorAllocationClient client, ReceiverTracker receiverTracker, SparkConf conf, long batchDurationMs, Clock clock) {
        return ExecutorAllocationManager$.MODULE$.createIfEnabled(client, receiverTracker, conf, batchDurationMs, clock);
    }

    public static boolean isDynamicAllocationEnabled(SparkConf conf) {
        return ExecutorAllocationManager$.MODULE$.isDynamicAllocationEnabled(conf);
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    @Override
    public void onStreamingStarted(StreamingListenerStreamingStarted streamingStarted) {
        StreamingListener.onStreamingStarted$(this, streamingStarted);
    }

    @Override
    public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) {
        StreamingListener.onReceiverStarted$(this, receiverStarted);
    }

    @Override
    public void onReceiverError(StreamingListenerReceiverError receiverError) {
        StreamingListener.onReceiverError$(this, receiverError);
    }

    @Override
    public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
        StreamingListener.onReceiverStopped$(this, receiverStopped);
    }

    @Override
    public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {
        StreamingListener.onBatchSubmitted$(this, batchSubmitted);
    }

    @Override
    public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {
        StreamingListener.onBatchStarted$(this, batchStarted);
    }

    @Override
    public void onOutputOperationStarted(StreamingListenerOutputOperationStarted outputOperationStarted) {
        StreamingListener.onOutputOperationStarted$(this, outputOperationStarted);
    }

    @Override
    public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted outputOperationCompleted) {
        StreamingListener.onOutputOperationCompleted$(this, outputOperationCompleted);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private long scalingIntervalSecs() {
        return this.scalingIntervalSecs;
    }

    private double scalingUpRatio() {
        return this.scalingUpRatio;
    }

    private double scalingDownRatio() {
        return this.scalingDownRatio;
    }

    private int minNumExecutors() {
        return this.minNumExecutors;
    }

    private int maxNumExecutors() {
        return this.maxNumExecutors;
    }

    private RecurringTimer timer() {
        return this.timer;
    }

    private long batchProcTimeSum() {
        return this.batchProcTimeSum;
    }

    private void batchProcTimeSum_$eq(long x$1) {
        this.batchProcTimeSum = x$1;
    }

    private int batchProcTimeCount() {
        return this.batchProcTimeCount;
    }

    private void batchProcTimeCount_$eq(int x$1) {
        this.batchProcTimeCount = x$1;
    }

    public void start() {
        this.timer().start();
        this.logInfo((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(0).append("ExecutorAllocationManager started with ").append(new StringBuilder(33).append("ratios = [").append(this.scalingUpRatio()).append(", ").append(this.scalingDownRatio()).append("] and interval = ").append(this.scalingIntervalSecs()).append(" sec").toString()).toString());
    }

    public void stop() {
        this.timer().stop(true);
        this.logInfo((Function0<String>)(Function0 & Serializable)() -> "ExecutorAllocationManager stopped");
    }

    private synchronized void manageAllocation() {
        this.logInfo((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(47).append("Managing executor allocation with ratios = [").append(this.scalingUpRatio()).append(", ").append(this.scalingDownRatio()).append("]").toString());
        if (this.batchProcTimeCount() > 0) {
            long averageBatchProcTime = this.batchProcTimeSum() / (long)this.batchProcTimeCount();
            double ratio = (double)averageBatchProcTime / (double)this.batchDurationMs;
            this.logInfo((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(19).append("Average: ").append(averageBatchProcTime).append(", ratio = ").append(ratio).toString());
            if (ratio >= this.scalingUpRatio()) {
                this.logDebug((Function0<String>)(Function0 & Serializable)() -> "Requesting executors");
                int numNewExecutors = package$.MODULE$.max((int)package$.MODULE$.round(ratio), 1);
                this.requestExecutors(numNewExecutors);
            } else if (ratio <= this.scalingDownRatio()) {
                this.logDebug((Function0<String>)(Function0 & Serializable)() -> "Killing executors");
                this.killExecutor();
            }
        }
        this.batchProcTimeSum_$eq(0L);
        this.batchProcTimeCount_$eq(0);
    }

    private void requestExecutors(int numNewExecutors) {
        Predef$.MODULE$.require(numNewExecutors >= 1);
        scala.collection.immutable.Seq allExecIds = this.client.getExecutorIds();
        this.logDebug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(15).append("Executors (").append(allExecIds.size()).append(") = ").append(allExecIds).toString());
        int targetTotalExecutors = package$.MODULE$.max(package$.MODULE$.min(this.maxNumExecutors(), allExecIds.size() + numNewExecutors), this.minNumExecutors());
        this.client.requestTotalExecutors((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)ResourceProfile$.MODULE$.DEFAULT_RESOURCE_PROFILE_ID())), (Object)BoxesRunTime.boxToInteger((int)targetTotalExecutors))})), (Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)ResourceProfile$.MODULE$.DEFAULT_RESOURCE_PROFILE_ID())), (Object)BoxesRunTime.boxToInteger((int)0))})), Predef$.MODULE$.Map().empty());
        this.logInfo((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(26).append("Requested total ").append(targetTotalExecutors).append(" executors").toString());
    }

    private void killExecutor() {
        scala.collection.immutable.Seq allExecIds = this.client.getExecutorIds();
        this.logDebug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(15).append("Executors (").append(allExecIds.size()).append(") = ").append(allExecIds).toString());
        if (allExecIds.nonEmpty() && allExecIds.size() > this.minNumExecutors()) {
            scala.collection.immutable.Seq execIdsWithReceivers = ((IterableOnceOps)this.receiverTracker.allocatedExecutors().values().flatten(Predef$.MODULE$.$conforms())).toSeq();
            this.logInfo((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(29).append("Executors with receivers (").append(execIdsWithReceivers.size()).append("): ").append(execIdsWithReceivers).toString());
            scala.collection.immutable.Seq removableExecIds = (scala.collection.immutable.Seq)allExecIds.diff((Seq)execIdsWithReceivers);
            this.logDebug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(24).append("Removable executors (").append(removableExecIds.size()).append("): ").append(removableExecIds).toString());
            if (removableExecIds.nonEmpty()) {
                String execIdToRemove = (String)removableExecIds.apply(Random$.MODULE$.nextInt(removableExecIds.size()));
                boolean bl = BoxesRunTime.unboxToBoolean((Object)this.conf.get(org.apache.spark.internal.config.package$.MODULE$.DECOMMISSION_ENABLED())) ? this.client.decommissionExecutor(execIdToRemove, new ExecutorDecommissionInfo("spark scale down", (Option)None$.MODULE$), true, this.client.decommissionExecutor$default$4()) : this.client.killExecutor(execIdToRemove);
                this.logInfo((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(27).append("Requested to kill executor ").append(execIdToRemove).toString());
            } else {
                this.logInfo((Function0<String>)(Function0 & Serializable)() -> "No non-receiver executors to kill");
            }
        } else {
            this.logInfo((Function0<String>)(Function0 & Serializable)() -> "No available executor to kill");
        }
    }

    private synchronized void addBatchProcTime(long timeMs) {
        this.batchProcTimeSum_$eq(this.batchProcTimeSum() + timeMs);
        this.batchProcTimeCount_$eq(this.batchProcTimeCount() + 1);
        this.logDebug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(46).append("Added batch processing time ").append(timeMs).append(", sum = ").append(this.batchProcTimeSum()).append(", count = ").append(this.batchProcTimeCount()).toString());
    }

    private void validateSettings() {
        block0: {
            Predef$.MODULE$.require(this.scalingUpRatio() > this.scalingDownRatio(), (Function0 & Serializable)() -> new StringBuilder(0).append(new StringBuilder(33).append("Config ").append(Streaming$.MODULE$.STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO().key()).append(" must be more than config ").toString()).append(String.valueOf(Streaming$.MODULE$.STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO().key())).toString());
            if (!this.conf.contains(Streaming$.MODULE$.STREAMING_DYN_ALLOCATION_MIN_EXECUTORS().key()) || !this.conf.contains(Streaming$.MODULE$.STREAMING_DYN_ALLOCATION_MAX_EXECUTORS().key())) break block0;
            Predef$.MODULE$.require(this.maxNumExecutors() >= this.minNumExecutors(), (Function0 & Serializable)() -> new StringBuilder(0).append(new StringBuilder(33).append("Config ").append(Streaming$.MODULE$.STREAMING_DYN_ALLOCATION_MAX_EXECUTORS().key()).append(" must be more than config ").toString()).append(String.valueOf(Streaming$.MODULE$.STREAMING_DYN_ALLOCATION_MIN_EXECUTORS().key())).toString());
        }
    }

    @Override
    public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
        block0: {
            this.logDebug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(25).append("onBatchCompleted called: ").append(batchCompleted).toString());
            if (batchCompleted.batchInfo().outputOperationInfos().values().exists((Function1 & Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)ExecutorAllocationManager.$anonfun$onBatchCompleted$2(x$2)))) break block0;
            batchCompleted.batchInfo().processingDelay().foreach((Function1)(JFunction1.mcVJ.sp & Serializable)timeMs -> this.addBatchProcTime(timeMs));
        }
    }

    public static final /* synthetic */ boolean $anonfun$onBatchCompleted$2(OutputOperationInfo x$2) {
        return x$2.failureReason().nonEmpty();
    }

    public ExecutorAllocationManager(ExecutorAllocationClient client, ReceiverTracker receiverTracker, SparkConf conf, long batchDurationMs, Clock clock) {
        this.client = client;
        this.receiverTracker = receiverTracker;
        this.conf = conf;
        this.batchDurationMs = batchDurationMs;
        StreamingListener.$init$(this);
        Logging.$init$((Logging)this);
        this.scalingIntervalSecs = BoxesRunTime.unboxToLong((Object)conf.get(Streaming$.MODULE$.STREAMING_DYN_ALLOCATION_SCALING_INTERVAL()));
        this.scalingUpRatio = BoxesRunTime.unboxToDouble((Object)conf.get(Streaming$.MODULE$.STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO()));
        this.scalingDownRatio = BoxesRunTime.unboxToDouble((Object)conf.get(Streaming$.MODULE$.STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO()));
        this.minNumExecutors = BoxesRunTime.unboxToInt((Object)((Option)conf.get((ConfigEntry)Streaming$.MODULE$.STREAMING_DYN_ALLOCATION_MIN_EXECUTORS())).getOrElse((Function0)(JFunction0.mcI.sp & Serializable)() -> package$.MODULE$.max(1, $this.receiverTracker.numReceivers())));
        this.maxNumExecutors = BoxesRunTime.unboxToInt((Object)conf.get(Streaming$.MODULE$.STREAMING_DYN_ALLOCATION_MAX_EXECUTORS()));
        this.timer = new RecurringTimer(clock, this.scalingIntervalSecs() * 1000L, (Function1<Object, BoxedUnit>)(JFunction1.mcVJ.sp & Serializable)x$1 -> this.manageAllocation(), "streaming-executor-allocation-manager");
        this.batchProcTimeSum = 0L;
        this.batchProcTimeCount = 0;
        this.validateSettings();
    }
}

