/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.kafka010;

import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkEnv$;
import org.apache.spark.internal.Logging;
import org.apache.spark.scheduler.ExecutorCacheTaskLocation;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.streaming.HDFSMetadataLog;
import org.apache.spark.sql.execution.streaming.SerializedOffset;
import org.apache.spark.sql.kafka010.EarliestOffsetRangeLimit$;
import org.apache.spark.sql.kafka010.JsonUtils$;
import org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition;
import org.apache.spark.sql.kafka010.KafkaOffsetRange;
import org.apache.spark.sql.kafka010.KafkaOffsetRangeCalculator;
import org.apache.spark.sql.kafka010.KafkaOffsetRangeCalculator$;
import org.apache.spark.sql.kafka010.KafkaOffsetRangeLimit;
import org.apache.spark.sql.kafka010.KafkaOffsetReader;
import org.apache.spark.sql.kafka010.KafkaOffsetReader$;
import org.apache.spark.sql.kafka010.KafkaSourceOffset;
import org.apache.spark.sql.kafka010.KafkaSourceOffset$;
import org.apache.spark.sql.kafka010.KafkaSourceProvider$;
import org.apache.spark.sql.kafka010.LatestOffsetRangeLimit$;
import org.apache.spark.sql.kafka010.SpecificOffsetRangeLimit;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BlockManagerId;
import org.apache.spark.util.UninterruptibleThread;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenSet;
import scala.collection.Iterable;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SetLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.collection.immutable.StringOps$;
import scala.collection.mutable.ArrayOps;
import scala.math.Numeric;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\tub!\u0002\u0016,\u0001-*\u0004\u0002\u0003)\u0001\u0005\u0003\u0005\u000b\u0011\u0002*\t\u0011Y\u0003!\u0011!Q\u0001\n]C\u0001B\u001b\u0001\u0003\u0002\u0003\u0006Ia\u001b\u0005\t_\u0002\u0011\t\u0011)A\u0005;\"A\u0001\u000f\u0001B\u0001B\u0003%\u0011\u000f\u0003\u0005u\u0001\t\u0005\t\u0015!\u0003v\u0011\u0015I\b\u0001\"\u0001{\u0011-\t)\u0001\u0001a\u0001\u0002\u0004%I!a\u0002\t\u0017\u0005]\u0001\u00011AA\u0002\u0013%\u0011\u0011\u0004\u0005\f\u0003K\u0001\u0001\u0019!A!B\u0013\tI\u0001C\u0006\u0002(\u0001\u0001\r\u00111A\u0005\n\u0005\u001d\u0001bCA\u0015\u0001\u0001\u0007\t\u0019!C\u0005\u0003WA1\"a\f\u0001\u0001\u0004\u0005\t\u0015)\u0003\u0002\n!I\u0011\u0011\u0007\u0001C\u0002\u0013%\u00111\u0007\u0005\t\u0003w\u0001\u0001\u0015!\u0003\u00026!I\u0011Q\b\u0001C\u0002\u0013%\u0011q\b\u0005\t\u0003\u000f\u0002\u0001\u0015!\u0003\u0002B!I\u0011\u0011\n\u0001C\u0002\u0013%\u00111\n\u0005\t\u0003'\u0002\u0001\u0015!\u0003\u0002N!Q\u0011Q\u000b\u0001\t\u0006\u0004%I!a\u0002\t\u000f\u0005]\u0003\u0001\"\u0011\u0002Z!9\u0011q\u000e\u0001\u0005B\u0005E\u0004bBAG\u0001\u0011\u0005\u0013q\u0012\u0005\b\u0003#\u0003A\u0011IAH\u0011\u001d\t\u0019\n\u0001C!\u0003+Cq!a'\u0001\t\u0003\ni\nC\u0004\u0002,\u0002!\t%!,\t\u000f\u0005E\u0006\u0001\"\u0011\u00024\"9\u0011Q\u0017\u0001\u0005B\u0005]\u0006bBA]\u0001\u0011%\u00111\u0018\u0005\b\u0003{\u0003A\u0011BA`\u0011\u001d\ti\r\u0001C\u0005\u0003\u001fDq!a6\u0001\t\u0013\tIN\u0002\u0004\u0002`\u0002\u0001\u0011\u0011\u001d\u0005\u000b\u0003o\u0014#\u0011!Q\u0001\n\u0005e\b\u0002C8#\u0005\u0003\u0005\u000b\u0011B/\t\re\u0014C\u0011\u0001B\u0001\u0011%\u0011YA\tb\u0001\n\u0003\u0011i\u0001\u0003\u0005\u0003\u0016\t\u0002\u000b\u0011\u0002B\b\u0011\u001d\u00119B\tC!\u00053AqAa\f#\t\u0003\u0012\tDA\u000bLC\u001a\\\u0017-T5de>\u0014\u0015\r^2i%\u0016\fG-\u001a:\u000b\u00051j\u0013\u0001C6bM.\f\u0007'\r\u0019\u000b\u00059z\u0013aA:rY*\u0011\u0001'M\u0001\u0006gB\f'o\u001b\u0006\u0003eM\na!\u00199bG\",'\"\u0001\u001b\u0002\u0007=\u0014xm\u0005\u0003\u0001myR\u0005CA\u001c=\u001b\u0005A$BA\u001d;\u0003\u0011a\u0017M\\4\u000b\u0003m\nAA[1wC&\u0011Q\b\u000f\u0002\u0007\u001f\nTWm\u0019;\u0011\u0005}BU\"\u0001!\u000b\u0005\u0005\u0013\u0015!C:ue\u0016\fW.\u001b8h\u0015\t\u0019E)\u0001\u0004sK\u0006$WM\u001d\u0006\u0003\u000b\u001a\u000b!A\u001e\u001a\u000b\u0005\u001dk\u0013aB:pkJ\u001cWm]\u0005\u0003\u0013\u0002\u0013\u0001#T5de>\u0014\u0015\r^2i%\u0016\fG-\u001a:\u0011\u0005-sU\"\u0001'\u000b\u00055{\u0013\u0001C5oi\u0016\u0014h.\u00197\n\u0005=c%a\u0002'pO\u001eLgnZ\u0001\u0012W\u000647.Y(gMN,GOU3bI\u0016\u00148\u0001\u0001\t\u0003'Rk\u0011aK\u0005\u0003+.\u0012\u0011cS1gW\u0006|eMZ:fiJ+\u0017\rZ3s\u0003M)\u00070Z2vi>\u00148*\u00194lCB\u000b'/Y7t!\u0011A6,\u0018\u001c\u000e\u0003eS!A\u0017\u001e\u0002\tU$\u0018\u000e\\\u0005\u00039f\u00131!T1q!\tqvM\u0004\u0002`KB\u0011\u0001mY\u0007\u0002C*\u0011!-U\u0001\u0007yI|w\u000e\u001e \u000b\u0003\u0011\fQa]2bY\u0006L!AZ2\u0002\rA\u0013X\rZ3g\u0013\tA\u0017N\u0001\u0004TiJLgn\u001a\u0006\u0003M\u000e\fqa\u001c9uS>t7\u000f\u0005\u0002m[6\tA)\u0003\u0002o\t\n\tB)\u0019;b'>,(oY3PaRLwN\\:\u0002\u00195,G/\u00193bi\u0006\u0004\u0016\r\u001e5\u0002\u001fM$\u0018M\u001d;j]\u001e|eMZ:fiN\u0004\"a\u0015:\n\u0005M\\#!F&bM.\fwJ\u001a4tKR\u0014\u0016M\\4f\u0019&l\u0017\u000e^\u0001\u000fM\u0006LGn\u00148ECR\fGj\\:t!\t1x/D\u0001d\u0013\tA8MA\u0004C_>dW-\u00198\u0002\rqJg.\u001b;?)%YH0 @\u0000\u0003\u0003\t\u0019\u0001\u0005\u0002T\u0001!)\u0001k\u0002a\u0001%\")ak\u0002a\u0001/\")!n\u0002a\u0001W\")qn\u0002a\u0001;\")\u0001o\u0002a\u0001c\")Ao\u0002a\u0001k\u0006)2\u000f^1siB\u000b'\u000f^5uS>twJ\u001a4tKR\u001cXCAA\u0005!\u0011\tY!!\u0005\u000f\u0007M\u000bi!C\u0002\u0002\u0010-\nq\u0001]1dW\u0006<W-\u0003\u0003\u0002\u0014\u0005U!A\u0005)beRLG/[8o\u001f\u001a47/\u001a;NCBT1!a\u0004,\u0003e\u0019H/\u0019:u!\u0006\u0014H/\u001b;j_:|eMZ:fiN|F%Z9\u0015\t\u0005m\u0011\u0011\u0005\t\u0004m\u0006u\u0011bAA\u0010G\n!QK\\5u\u0011%\t\u0019#CA\u0001\u0002\u0004\tI!A\u0002yIE\nac\u001d;beR\u0004\u0016M\u001d;ji&|gn\u00144gg\u0016$8\u000fI\u0001\u0014K:$\u0007+\u0019:uSRLwN\\(gMN,Go]\u0001\u0018K:$\u0007+\u0019:uSRLwN\\(gMN,Go]0%KF$B!a\u0007\u0002.!I\u00111\u0005\u0007\u0002\u0002\u0003\u0007\u0011\u0011B\u0001\u0015K:$\u0007+\u0019:uSRLwN\\(gMN,Go\u001d\u0011\u0002\u001bA|G\u000e\u001c+j[\u0016|W\u000f^'t+\t\t)\u0004E\u0002w\u0003oI1!!\u000fd\u0005\u0011auN\\4\u0002\u001dA|G\u000e\u001c+j[\u0016|W\u000f^'tA\u0005!R.\u0019=PM\u001a\u001cX\r^:QKJ$&/[4hKJ,\"!!\u0011\u0011\u000bY\f\u0019%!\u000e\n\u0007\u0005\u00153M\u0001\u0004PaRLwN\\\u0001\u0016[\u0006DxJ\u001a4tKR\u001c\b+\u001a:Ue&<w-\u001a:!\u0003=\u0011\u0018M\\4f\u0007\u0006d7-\u001e7bi>\u0014XCAA'!\r\u0019\u0016qJ\u0005\u0004\u0003#Z#AG&bM.\fwJ\u001a4tKR\u0014\u0016M\\4f\u0007\u0006d7-\u001e7bi>\u0014\u0018\u0001\u0005:b]\u001e,7)\u00197dk2\fGo\u001c:!\u0003]Ig.\u001b;jC2\u0004\u0016M\u001d;ji&|gn\u00144gg\u0016$8/\u0001\btKR|eMZ:fiJ\u000bgnZ3\u0015\r\u0005m\u00111LA6\u0011\u001d\ti&\u0006a\u0001\u0003?\nQa\u001d;beR\u0004R\u0001WA1\u0003KJ1!a\u0019Z\u0005!y\u0005\u000f^5p]\u0006d\u0007cA \u0002h%\u0019\u0011\u0011\u000e!\u0003\r=3gm]3u\u0011\u001d\ti'\u0006a\u0001\u0003?\n1!\u001a8e\u0003M\u0001H.\u00198J]B,H\u000fU1si&$\u0018n\u001c8t)\t\t\u0019\bE\u0003Y\u0003k\nI(C\u0002\u0002xe\u0013A\u0001T5tiB1\u00111PA?\u0003\u0003k\u0011AQ\u0005\u0004\u0003\u007f\u0012%AD%oaV$\b+\u0019:uSRLwN\u001c\t\u0005\u0003\u0007\u000bI)\u0004\u0002\u0002\u0006*\u0019\u0011qQ\u0017\u0002\u0011\r\fG/\u00197zgRLA!a#\u0002\u0006\nY\u0011J\u001c;fe:\fGNU8x\u000399W\r^*uCJ$xJ\u001a4tKR$\"!!\u001a\u0002\u0019\u001d,G/\u00128e\u001f\u001a47/\u001a;\u0002#\u0011,7/\u001a:jC2L'0Z(gMN,G\u000f\u0006\u0003\u0002f\u0005]\u0005BBAM3\u0001\u0007Q,\u0001\u0003kg>t\u0017A\u0003:fC\u0012\u001c6\r[3nCR\u0011\u0011q\u0014\t\u0005\u0003C\u000b9+\u0004\u0002\u0002$*\u0019\u0011QU\u0017\u0002\u000bQL\b/Z:\n\t\u0005%\u00161\u0015\u0002\u000b'R\u0014Xo\u0019;UsB,\u0017AB2p[6LG\u000f\u0006\u0003\u0002\u001c\u0005=\u0006bBA77\u0001\u0007\u0011QM\u0001\u0005gR|\u0007\u000f\u0006\u0002\u0002\u001c\u0005AAo\\*ue&tw\rF\u0001^\u0003\t:W\r^(s\u0007J,\u0017\r^3J]&$\u0018.\u00197QCJ$\u0018\u000e^5p]>3gm]3ugR\u0011\u0011\u0011B\u0001\ne\u0006$X\rT5nSR$\u0002\"!\u0003\u0002B\u0006\u0015\u0017\u0011\u001a\u0005\b\u0003\u0007|\u0002\u0019AA\u001b\u0003\u0015a\u0017.\\5u\u0011\u001d\t9m\ba\u0001\u0003\u0013\tAA\u001a:p[\"9\u00111Z\u0010A\u0002\u0005%\u0011!B;oi&d\u0017!F4fiN{'\u000f^3e\u000bb,7-\u001e;pe2K7\u000f\u001e\u000b\u0003\u0003#\u0004BA^Aj;&\u0019\u0011Q[2\u0003\u000b\u0005\u0013(/Y=\u0002\u001dI,\u0007o\u001c:u\t\u0006$\u0018\rT8tgR!\u00111DAn\u0011\u0019\ti.\ta\u0001;\u00069Q.Z:tC\u001e,'AH&bM.\f7k\\;sG\u0016Le.\u001b;jC2|eMZ:fi^\u0013\u0018\u000e^3s'\r\u0011\u00131\u001d\t\u0007\u0003K\fi/!=\u000e\u0005\u0005\u001d(bA!\u0002j*\u0019\u00111^\u0017\u0002\u0013\u0015DXmY;uS>t\u0017\u0002BAx\u0003O\u0014q\u0002\u0013#G'6+G/\u00193bi\u0006dun\u001a\t\u0004'\u0006M\u0018bAA{W\t\t2*\u00194lCN{WO]2f\u001f\u001a47/\u001a;\u0002\u0019M\u0004\u0018M]6TKN\u001c\u0018n\u001c8\u0011\t\u0005m\u0018Q`\u0007\u0002[%\u0019\u0011q`\u0017\u0003\u0019M\u0003\u0018M]6TKN\u001c\u0018n\u001c8\u0015\r\t\r!q\u0001B\u0005!\r\u0011)AI\u0007\u0002\u0001!9\u0011q_\u0013A\u0002\u0005e\b\"B8&\u0001\u0004i\u0016a\u0002,F%NKuJT\u000b\u0003\u0005\u001f\u00012A\u001eB\t\u0013\r\u0011\u0019b\u0019\u0002\u0004\u0013:$\u0018\u0001\u0003,F%NKuJ\u0014\u0011\u0002\u0013M,'/[1mSj,GCBA\u000e\u00057\u0011y\u0002C\u0004\u0003\u001e!\u0002\r!!=\u0002\u00115,G/\u00193bi\u0006DqA!\t)\u0001\u0004\u0011\u0019#A\u0002pkR\u0004BA!\n\u0003,5\u0011!q\u0005\u0006\u0004\u0005SQ\u0014AA5p\u0013\u0011\u0011iCa\n\u0003\u0019=+H\u000f];u'R\u0014X-Y7\u0002\u0017\u0011,7/\u001a:jC2L'0\u001a\u000b\u0005\u0003c\u0014\u0019\u0004C\u0004\u00036%\u0002\rAa\u000e\u0002\u0005%t\u0007\u0003\u0002B\u0013\u0005sIAAa\u000f\u0003(\tY\u0011J\u001c9viN#(/Z1n\u0001")
public class KafkaMicroBatchReader
implements MicroBatchReader,
Logging {
    private scala.collection.immutable.Map<TopicPartition, Object> initialPartitionOffsets;
    private final KafkaOffsetReader kafkaOffsetReader;
    private final Map<String, Object> executorKafkaParams;
    private final String metadataPath;
    private final KafkaOffsetRangeLimit startingOffsets;
    private final boolean failOnDataLoss;
    private scala.collection.immutable.Map<TopicPartition, Object> startPartitionOffsets;
    private scala.collection.immutable.Map<TopicPartition, Object> endPartitionOffsets;
    private final long pollTimeoutMs;
    private final Option<Object> maxOffsetsPerTrigger;
    private final KafkaOffsetRangeCalculator rangeCalculator;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile boolean bitmap$0;

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private scala.collection.immutable.Map<TopicPartition, Object> startPartitionOffsets() {
        return this.startPartitionOffsets;
    }

    private void startPartitionOffsets_$eq(scala.collection.immutable.Map<TopicPartition, Object> x$1) {
        this.startPartitionOffsets = x$1;
    }

    private scala.collection.immutable.Map<TopicPartition, Object> endPartitionOffsets() {
        return this.endPartitionOffsets;
    }

    private void endPartitionOffsets_$eq(scala.collection.immutable.Map<TopicPartition, Object> x$1) {
        this.endPartitionOffsets = x$1;
    }

    private long pollTimeoutMs() {
        return this.pollTimeoutMs;
    }

    private Option<Object> maxOffsetsPerTrigger() {
        return this.maxOffsetsPerTrigger;
    }

    private KafkaOffsetRangeCalculator rangeCalculator() {
        return this.rangeCalculator;
    }

    private scala.collection.immutable.Map<TopicPartition, Object> initialPartitionOffsets$lzycompute() {
        KafkaMicroBatchReader kafkaMicroBatchReader = this;
        synchronized (kafkaMicroBatchReader) {
            if (!this.bitmap$0) {
                this.initialPartitionOffsets = this.getOrCreateInitialPartitionOffsets();
                this.bitmap$0 = true;
            }
        }
        return this.initialPartitionOffsets;
    }

    private scala.collection.immutable.Map<TopicPartition, Object> initialPartitionOffsets() {
        return !this.bitmap$0 ? this.initialPartitionOffsets$lzycompute() : this.initialPartitionOffsets;
    }

    public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
        this.initialPartitionOffsets();
        this.startPartitionOffsets_$eq((scala.collection.immutable.Map<TopicPartition, Object>)((scala.collection.immutable.Map)Option$.MODULE$.apply(start.orElse(null)).map((Function1 & Serializable & scala.Serializable)x$2 -> ((KafkaSourceOffset)((Object)x$2)).partitionToOffsets()).getOrElse((Function0 & Serializable & scala.Serializable)() -> this.initialPartitionOffsets())));
        this.endPartitionOffsets_$eq((scala.collection.immutable.Map<TopicPartition, Object>)((scala.collection.immutable.Map)Option$.MODULE$.apply(end.orElse(null)).map((Function1 & Serializable & scala.Serializable)x$3 -> ((KafkaSourceOffset)((Object)x$3)).partitionToOffsets()).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            scala.collection.immutable.Map<TopicPartition, Object> latestPartitionOffsets = $this.kafkaOffsetReader.fetchLatestOffsets((Option<scala.collection.immutable.Map<TopicPartition, Object>>)new Some(this.startPartitionOffsets()));
            return (scala.collection.immutable.Map)this.maxOffsetsPerTrigger().map((Function1 & Serializable & scala.Serializable)maxOffsets -> this.rateLimit(BoxesRunTime.unboxToLong((Object)maxOffsets), this.startPartitionOffsets(), (scala.collection.immutable.Map<TopicPartition, Object>)latestPartitionOffsets)).getOrElse((Function0 & Serializable & scala.Serializable)() -> latestPartitionOffsets);
        })));
    }

    public List<InputPartition<InternalRow>> planInputPartitions() {
        Set newPartitions = (Set)this.endPartitionOffsets().keySet().diff((GenSet)this.startPartitionOffsets().keySet());
        scala.collection.immutable.Map<TopicPartition, Object> newPartitionInitialOffsets = this.kafkaOffsetReader.fetchEarliestOffsets((Seq<TopicPartition>)newPartitions.toSeq());
        Set set = newPartitionInitialOffsets.keySet();
        Set set2 = newPartitions;
        if (set == null ? set2 != null : !set.equals(set2)) {
            Set deletedPartitions = (Set)newPartitions.diff((GenSet)newPartitionInitialOffsets.keySet());
            this.reportDataLoss(new StringBuilder(64).append("Cannot find earliest offsets of ").append(deletedPartitions).append(". Some data may have been missed").toString());
        }
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(18).append("Partitions added: ").append(newPartitionInitialOffsets).toString());
        ((IterableLike)newPartitionInitialOffsets.filter((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)KafkaMicroBatchReader.$anonfun$planInputPartitions$2(x$4)))).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            KafkaMicroBatchReader.$anonfun$planInputPartitions$3(this, x0$1);
            return BoxedUnit.UNIT;
        });
        Set deletedPartitions = (Set)this.startPartitionOffsets().keySet().diff((GenSet)this.endPartitionOffsets().keySet());
        if (deletedPartitions.nonEmpty()) {
            this.reportDataLoss(new StringBuilder(41).append(deletedPartitions).append(" are gone. Some data may have been missed").toString());
        }
        Seq topicPartitions = ((SetLike)this.endPartitionOffsets().keySet().filter((Function1 & Serializable & scala.Serializable)tp -> BoxesRunTime.boxToBoolean((boolean)KafkaMicroBatchReader.$anonfun$planInputPartitions$4(this, newPartitionInitialOffsets, tp)))).toSeq();
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(17).append("TopicPartitions: ").append(topicPartitions.mkString(", ")).toString());
        scala.collection.immutable.Map fromOffsets = this.startPartitionOffsets().$plus$plus(newPartitionInitialOffsets);
        scala.collection.immutable.Map<TopicPartition, Object> untilOffsets = this.endPartitionOffsets();
        untilOffsets.foreach((Function1 & Serializable & scala.Serializable)x0$2 -> {
            KafkaMicroBatchReader.$anonfun$planInputPartitions$6(this, fromOffsets, x0$2);
            return BoxedUnit.UNIT;
        });
        Seq<KafkaOffsetRange> offsetRanges = this.rangeCalculator().getRanges((scala.collection.immutable.Map<TopicPartition, Object>)fromOffsets, untilOffsets, (Seq<String>)Predef$.MODULE$.wrapRefArray((Object[])this.getSortedExecutorList()));
        boolean reuseKafkaConsumer = ((TraversableOnce)offsetRanges.map((Function1 & Serializable & scala.Serializable)x$5 -> x$5.topicPartition(), Seq$.MODULE$.canBuildFrom())).toSet().size() == offsetRanges.size();
        return (List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)offsetRanges.map((Function1 & Serializable & scala.Serializable)range -> new KafkaMicroBatchInputPartition((KafkaOffsetRange)range, $this.executorKafkaParams, this.pollTimeoutMs(), $this.failOnDataLoss, reuseKafkaConsumer), Seq$.MODULE$.canBuildFrom())).asJava();
    }

    public Offset getStartOffset() {
        return new KafkaSourceOffset(this.startPartitionOffsets());
    }

    public Offset getEndOffset() {
        return new KafkaSourceOffset(this.endPartitionOffsets());
    }

    public Offset deserializeOffset(String json) {
        return new KafkaSourceOffset(JsonUtils$.MODULE$.partitionOffsets(json));
    }

    public StructType readSchema() {
        return KafkaOffsetReader$.MODULE$.kafkaSchema();
    }

    public void commit(Offset end) {
    }

    public void stop() {
        this.kafkaOffsetReader.close();
    }

    public String toString() {
        return new StringBuilder(9).append("KafkaV2[").append(this.kafkaOffsetReader).append("]").toString();
    }

    private scala.collection.immutable.Map<TopicPartition, Object> getOrCreateInitialPartitionOffsets() {
        Predef$.MODULE$.assert(Thread.currentThread() instanceof UninterruptibleThread);
        Predef$.MODULE$.assert(SparkSession$.MODULE$.getActiveSession().nonEmpty());
        KafkaSourceInitialOffsetWriter metadataLog = new KafkaSourceInitialOffsetWriter((SparkSession)SparkSession$.MODULE$.getActiveSession().get(), this.metadataPath);
        return ((KafkaSourceOffset)((Object)metadataLog.get(0L).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            void var3_6;
            KafkaSourceOffset kafkaSourceOffset;
            KafkaOffsetRangeLimit kafkaOffsetRangeLimit = $this.startingOffsets;
            if (EarliestOffsetRangeLimit$.MODULE$.equals(kafkaOffsetRangeLimit)) {
                kafkaSourceOffset = new KafkaSourceOffset($this.kafkaOffsetReader.fetchEarliestOffsets());
            } else if (LatestOffsetRangeLimit$.MODULE$.equals(kafkaOffsetRangeLimit)) {
                kafkaSourceOffset = new KafkaSourceOffset($this.kafkaOffsetReader.fetchLatestOffsets((Option<scala.collection.immutable.Map<TopicPartition, Object>>)None$.MODULE$));
            } else if (kafkaOffsetRangeLimit instanceof SpecificOffsetRangeLimit) {
                SpecificOffsetRangeLimit specificOffsetRangeLimit = (SpecificOffsetRangeLimit)kafkaOffsetRangeLimit;
                scala.collection.immutable.Map<TopicPartition, Object> p = specificOffsetRangeLimit.partitionOffsets();
                kafkaSourceOffset = $this.kafkaOffsetReader.fetchSpecificOffsets(p, (Function1<String, BoxedUnit>)(Function1 & Serializable & scala.Serializable)message -> {
                    this.reportDataLoss(message);
                    return BoxedUnit.UNIT;
                });
            } else {
                throw new MatchError((Object)kafkaOffsetRangeLimit);
            }
            KafkaSourceOffset offsets = kafkaSourceOffset;
            metadataLog.add(0L, (Object)offsets);
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(17).append("Initial offsets: ").append((Object)offsets).toString());
            return var3_6;
        }))).partitionToOffsets();
    }

    private scala.collection.immutable.Map<TopicPartition, Object> rateLimit(long limit, scala.collection.immutable.Map<TopicPartition, Object> from, scala.collection.immutable.Map<TopicPartition, Object> until) {
        scala.collection.immutable.Map<TopicPartition, Object> fromNew = this.kafkaOffsetReader.fetchEarliestOffsets((Seq<TopicPartition>)until.keySet().diff((GenSet)from.keySet()).toSeq());
        scala.collection.immutable.Map sizes = (scala.collection.immutable.Map)until.flatMap((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            TopicPartition tp = (TopicPartition)tuple2._1();
            long end = tuple2._2$mcJ$sp();
            Iterable iterable = Option$.MODULE$.option2Iterable(from.get((Object)tp).orElse((Function0 & Serializable & scala.Serializable)() -> fromNew.get((Object)tp)).flatMap((Function1 & Serializable & scala.Serializable)begin -> KafkaMicroBatchReader.$anonfun$rateLimit$3(this, end, tp, BoxesRunTime.unboxToLong((Object)begin))));
            return iterable;
        }, Map$.MODULE$.canBuildFrom());
        double total = BoxesRunTime.unboxToLong((Object)sizes.values().sum((Numeric)Numeric.LongIsIntegral$.MODULE$));
        return total < 1.0 ? until : (scala.collection.immutable.Map)until.map((Function1 & Serializable & scala.Serializable)x0$2 -> {
            Tuple2 tuple2 = x0$2;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            TopicPartition tp = (TopicPartition)tuple2._1();
            long end = tuple2._2$mcJ$sp();
            Tuple2 tuple22 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp), sizes.get((Object)tp).map((Function1)(JFunction1.mcJJ.sp & Serializable & scala.Serializable)size -> {
                long begin = BoxesRunTime.unboxToLong((Object)from.get((Object)tp).getOrElse((Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> BoxesRunTime.unboxToLong((Object)fromNew.apply((Object)tp))));
                double prorate = (double)limit * ((double)size / total);
                long prorateLong = (long)(prorate < 1.0 ? Math.ceil(prorate) : Math.floor(prorate));
                long off = prorateLong > Long.MAX_VALUE - begin ? Long.MAX_VALUE : begin + prorateLong;
                return Math.min(end, off);
            }).getOrElse((Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> end));
            return tuple22;
        }, Map$.MODULE$.canBuildFrom());
    }

    private String[] getSortedExecutorList() {
        BlockManager bm = SparkEnv$.MODULE$.get().blockManager();
        return (String[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])bm.master().getPeers(bm.blockManagerId()).toArray(ClassTag$.MODULE$.apply(BlockManagerId.class)))).map((Function1 & Serializable & scala.Serializable)x -> new ExecutorCacheTaskLocation(x.host(), x.executorId()), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(ExecutorCacheTaskLocation.class))))).sortWith((Function2 & Serializable & scala.Serializable)(a, b) -> BoxesRunTime.boxToBoolean((boolean)KafkaMicroBatchReader.compare$1(a, b))))).map((Function1 & Serializable & scala.Serializable)x$6 -> x$6.toString(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)));
    }

    private void reportDataLoss(String message) {
        if (this.failOnDataLoss) {
            throw new IllegalStateException(new StringBuilder(2).append(message).append(". ").append(KafkaSourceProvider$.MODULE$.INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE()).toString());
        }
        this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(2).append(message).append(". ").append(KafkaSourceProvider$.MODULE$.INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE()).toString());
    }

    public static final /* synthetic */ long $anonfun$maxOffsetsPerTrigger$1(String x$1) {
        return new StringOps(Predef$.MODULE$.augmentString(x$1)).toLong();
    }

    public static final /* synthetic */ boolean $anonfun$planInputPartitions$2(Tuple2 x$4) {
        return x$4._2$mcJ$sp() != 0L;
    }

    public static final /* synthetic */ void $anonfun$planInputPartitions$3(KafkaMicroBatchReader $this, Tuple2 x0$1) {
        Tuple2 tuple2 = x0$1;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        TopicPartition p = (TopicPartition)tuple2._1();
        long o = tuple2._2$mcJ$sp();
        $this.reportDataLoss(new StringBuilder(74).append("Added partition ").append(p).append(" starts from ").append(o).append(" instead of 0. Some data may have been missed").toString());
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ boolean $anonfun$planInputPartitions$4(KafkaMicroBatchReader $this, scala.collection.immutable.Map newPartitionInitialOffsets$1, TopicPartition tp) {
        return newPartitionInitialOffsets$1.contains((Object)tp) || $this.startPartitionOffsets().contains((Object)tp);
    }

    public static final /* synthetic */ void $anonfun$planInputPartitions$6(KafkaMicroBatchReader $this, scala.collection.immutable.Map fromOffsets$1, Tuple2 x0$2) {
        Tuple2 tuple2 = x0$2;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        TopicPartition tp = (TopicPartition)tuple2._1();
        long untilOffset = tuple2._2$mcJ$sp();
        fromOffsets$1.get((Object)tp).foreach((Function1)(JFunction1.mcVJ.sp & Serializable & scala.Serializable)fromOffset -> {
            block0: {
                if (untilOffset >= fromOffset) break block0;
                $this.reportDataLoss(new StringBuilder(73).append("Partition ").append(tp).append("'s offset was changed from ").append(fromOffset).append(" to ").append(untilOffset).append(", some data may have been missed").toString());
            }
        });
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ Option $anonfun$rateLimit$3(KafkaMicroBatchReader $this, long end$1, TopicPartition tp$2, long begin) {
        long size = end$1 - begin;
        $this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(19).append("rateLimit ").append(tp$2).append(" size is ").append(size).toString());
        return size > 0L ? new Some((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp$2), (Object)BoxesRunTime.boxToLong((long)size))) : None$.MODULE$;
    }

    private static final boolean compare$1(ExecutorCacheTaskLocation a, ExecutorCacheTaskLocation b) {
        String string = a.host();
        String string2 = b.host();
        return !(string != null ? !string.equals(string2) : string2 != null) ? new StringOps(Predef$.MODULE$.augmentString(a.executorId())).$greater((Object)b.executorId()) : new StringOps(Predef$.MODULE$.augmentString(a.host())).$greater((Object)b.host());
    }

    public KafkaMicroBatchReader(KafkaOffsetReader kafkaOffsetReader, Map<String, Object> executorKafkaParams, DataSourceOptions options, String metadataPath, KafkaOffsetRangeLimit startingOffsets, boolean failOnDataLoss) {
        this.kafkaOffsetReader = kafkaOffsetReader;
        this.executorKafkaParams = executorKafkaParams;
        this.metadataPath = metadataPath;
        this.startingOffsets = startingOffsets;
        this.failOnDataLoss = failOnDataLoss;
        Logging.$init$((Logging)this);
        this.pollTimeoutMs = options.getLong("kafkaConsumer.pollTimeoutMs", SparkEnv$.MODULE$.get().conf().getTimeAsSeconds("spark.network.timeout", "120s") * 1000L);
        this.maxOffsetsPerTrigger = Option$.MODULE$.apply(options.get("maxOffsetsPerTrigger").orElse(null)).map((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToLong((long)KafkaMicroBatchReader.$anonfun$maxOffsetsPerTrigger$1(x$1)));
        this.rangeCalculator = KafkaOffsetRangeCalculator$.MODULE$.apply(options);
    }

    public class KafkaSourceInitialOffsetWriter
    extends HDFSMetadataLog<KafkaSourceOffset> {
        private final int VERSION;

        public int VERSION() {
            return this.VERSION;
        }

        public void serialize(KafkaSourceOffset metadata, OutputStream out) {
            out.write(0);
            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8));
            writer.write(new StringBuilder(2).append("v").append(this.VERSION()).append("\n").toString());
            writer.write(metadata.json());
            writer.flush();
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public KafkaSourceOffset deserialize(InputStream in) {
            KafkaSourceOffset kafkaSourceOffset;
            in.read();
            String content = IOUtils.toString((Reader)new InputStreamReader(in, StandardCharsets.UTF_8));
            Predef$.MODULE$.assert(content.length() != 0);
            if (StringOps$.MODULE$.apply$extension(Predef$.MODULE$.augmentString(content), 0) == 'v') {
                int indexOfNewLine = content.indexOf("\n");
                if (indexOfNewLine <= 0) throw new IllegalStateException("Log file was malformed: failed to detect the log file version line.");
                int version = this.parseVersion(content.substring(0, indexOfNewLine), this.VERSION());
                kafkaSourceOffset = KafkaSourceOffset$.MODULE$.apply(new SerializedOffset(content.substring(indexOfNewLine + 1)));
                return kafkaSourceOffset;
            } else {
                kafkaSourceOffset = KafkaSourceOffset$.MODULE$.apply(new SerializedOffset(content));
            }
            return kafkaSourceOffset;
        }

        public /* synthetic */ KafkaMicroBatchReader org$apache$spark$sql$kafka010$KafkaMicroBatchReader$KafkaSourceInitialOffsetWriter$$$outer() {
            return KafkaMicroBatchReader.this;
        }

        public KafkaSourceInitialOffsetWriter(SparkSession sparkSession, String metadataPath) {
            if (KafkaMicroBatchReader.this == null) {
                throw null;
            }
            super(sparkSession, metadataPath, ClassTag$.MODULE$.apply(KafkaSourceOffset.class));
            this.VERSION = 1;
        }
    }
}

