package org.apache.spark.sql.kinesis;

import java.io.Serializable;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.spark.SparkContext;
import org.apache.spark.internal.Logging;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.model.GetRecordsResult;
import org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.model.Shard;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.SerializableConfiguration;
import org.apache.spark.util.ThreadUtils$;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.collection.parallel.ForkJoinTaskSupport;
import scala.collection.parallel.mutable.ParArray;
import scala.concurrent.forkjoin.ForkJoinPool;
import scala.math.Ordering$String$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;

/* compiled from: KinesisSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\tUd!B\u0001\u0003\u0001\ta!!D&j]\u0016\u001c\u0018n]*pkJ\u001cWM\u0003\u0002\u0004\t\u000591.\u001b8fg&\u001c(BA\u0003\u0007\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u000f!\tQa\u001d9be.T!!\u0003\u0006\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005Y\u0011aA8sON)\u0001!D\u000b\u001eGA\u0011abE\u0007\u0002\u001f)\u0011\u0001#E\u0001\u0005Y\u0006twMC\u0001\u0013\u0003\u0011Q\u0017M^1\n\u0005Qy!AB(cU\u0016\u001cG\u000f\u0005\u0002\u001775\tqC\u0003\u0002\u00193\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u00035\u0011\t\u0011\"\u001a=fGV$\u0018n\u001c8\n\u0005q9\"AB*pkJ\u001cW\r\u0005\u0002\u001fC5\tqD\u0003\u0002!#\u0005\u0011\u0011n\\\u0005\u0003E}\u0011AbU3sS\u0006d\u0017N_1cY\u0016\u0004\"\u0001J\u0014\u000e\u0003\u0015R!A\n\u0004\u0002\u0011%tG/\u001a:oC2L!\u0001K\u0013\u0003\u000f1{wmZ5oO\"A!\u0006\u0001B\u0001B\u0003%A&\u0001\u0006tc2\u001cuN\u001c;fqR\u001c\u0001\u0001\u0005\u0002.]5\tA!\u0003\u00020\t\tQ1+\u0015'D_:$X\r\u001f;\t\u0011E\u0002!\u0011!Q\u0001\nI\nQb]8ve\u000e,w\n\u001d;j_:\u001c\b\u0003B\u001a:yqr!\u0001N\u001c\u000e\u0003UR\u0011AN\u0001\u0006g\u000e\fG.Y\u0005\u0003qU\na\u0001\u0015:fI\u00164\u0017B\u0001\u001e<\u0005\ri\u0015\r\u001d\u0006\u0003qU\u0002\"aM\u001f\n\u0005yZ$AB*ue&tw\r\u0003\u0005A\u0001\t\u0005\t\u0015!\u0003=\u00031iW\r^1eCR\f\u0007+\u0019;i\u0011!\u0011\u0005A!A!\u0002\u0013a\u0014AC:ue\u0016\fWNT1nK\"AA\t\u0001B\u0001B\u0003%Q)A\bj]&$\u0018.\u00197Q_NLG/[8o!\t1u)D\u0001\u0003\u0013\tA%A\u0001\fJ]&$\u0018.\u00197LS:,7/[:Q_NLG/[8o\u0011!Q\u0005A!A!\u0002\u0013a\u0014aC3oIB{\u0017N\u001c;V%2C\u0001\u0002\u0014\u0001\u0003\u0002\u0003\u0006I!T\u0001\u0015W&tWm]5t\u0007J,Gm\u001d)s_ZLG-\u001a:\u0011\u0005\u0019s\u0015BA(\u0003\u0005M\u0019\u0006/\u0019:l\u0003^\u001b6I]3eK:$\u0018.\u00197t\u0011!\t\u0006A!A!\u0002\u0013\u0011\u0016A\u00044bS2|e\u000eR1uC2{7o\u001d\t\u0003iMK!\u0001V\u001b\u0003\u000f\t{w\u000e\\3b]\")a\u000b\u0001C\u0001/\u00061A(\u001b8jiz\"\u0012\u0002W-[7rkfl\u00181\u0011\u0005\u0019\u0003\u0001\"\u0002\u0016V\u0001\u0004a\u0003\"B\u0019V\u0001\u0004\u0011\u0004\"\u0002!V\u0001\u0004a\u0004\"\u0002\"V\u0001\u0004a\u0004\"\u0002#V\u0001\u0004)\u0005\"\u0002&V\u0001\u0004a\u0004\"\u0002'V\u0001\u0004i\u0005bB)V!\u0003\u0005\rA\u0015\u0005\u0006E\u0002!IaY\u0001\u0003g\u000e,\u0012\u0001\u001a\t\u0003K\u001al\u0011AB\u0005\u0003O\u001a\u0011Ab\u00159be.\u001cuN\u001c;fqRDQ!\u001b\u0001\u0005\n)\fQb[5oKNL7OU3bI\u0016\u0014X#A6\u0011\u0005\u0019c\u0017BA7\u0003\u00055Y\u0015N\\3tSN\u0014V-\u00193fe\"9q\u000e\u0001a\u0001\n\u0013\u0001\u0018aE2veJ,g\u000e^*iCJ$wJ\u001a4tKR\u001cX#A9\u0011\u0007Q\u0012H/\u0003\u0002tk\t1q\n\u001d;j_:\u0004\"AR;\n\u0005Y\u0014!\u0001D*iCJ$wJ\u001a4tKR\u001c\bb\u0002=\u0001\u0001\u0004%I!_\u0001\u0018GV\u0014(/\u001a8u'\"\f'\u000fZ(gMN,Go]0%KF$\"A_?\u0011\u0005QZ\u0018B\u0001?6\u0005\u0011)f.\u001b;\t\u000fy<\u0018\u0011!a\u0001c\u0006\u0019\u0001\u0010J\u0019\t\u000f\u0005\u0005\u0001\u0001)Q\u0005c\u0006!2-\u001e:sK:$8\u000b[1sI>3gm]3ug\u0002B\u0011\"!\u0002\u0001\u0005\u0004%I!a\u0002\u0002%5LgNQ1uG\",7\u000fV8SKR\f\u0017N\\\u000b\u0003\u0003\u0013\u00012\u0001NA\u0006\u0013\r\ti!\u000e\u0002\u0004\u0013:$\b\u0002CA\t\u0001\u0001\u0006I!!\u0003\u0002'5LgNQ1uG\",7\u000fV8SKR\f\u0017N\u001c\u0011\t\u0013\u0005U\u0001A1A\u0005\n\u0005]\u0011!\u00063fg\u000e\u0014\u0018NY3TQ\u0006\u0014H-\u00138uKJ4\u0018\r\\\u000b\u0003\u00033\u00012\u0001NA\u000e\u0013\r\ti\"\u000e\u0002\u0005\u0019>tw\r\u0003\u0005\u0002\"\u0001\u0001\u000b\u0011BA\r\u0003Y!Wm]2sS\n,7\u000b[1sI&sG/\u001a:wC2\u0004\u0003\"CA\u0013\u0001\u0001\u0007I\u0011BA\f\u0003qa\u0017\r^3ti\u0012+7o\u0019:jE\u0016\u001c\u0006.\u0019:e)&lWm\u001d;b[BD\u0011\"!\u000b\u0001\u0001\u0004%I!a\u000b\u0002A1\fG/Z:u\t\u0016\u001c8M]5cKNC\u0017M\u001d3US6,7\u000f^1na~#S-\u001d\u000b\u0004u\u00065\u0002\"\u0003@\u0002(\u0005\u0005\t\u0019AA\r\u0011!\t\t\u0004\u0001Q!\n\u0005e\u0011!\b7bi\u0016\u001cH\u000fR3tGJL'-Z*iCJ$G+[7fgR\fW\u000e\u001d\u0011\t\u000f\u0005U\u0002\u0001\"\u0003\u00028\u0005\tR.\u001a;bI\u0006$\u0018mQ8n[&$H/\u001a:\u0016\u0005\u0005e\u0002#\u0002$\u0002<\u0005}\u0012bAA\u001f\u0005\t\tR*\u001a;bI\u0006$\u0018mQ8n[&$H/\u001a:\u0011\u0007\u0019\u000b\t%C\u0002\u0002D\t\u0011\u0011b\u00155be\u0012LeNZ8\t\u000f\u0005\u001d\u0003\u0001\"\u0003\u0002J\u0005)R.\u001a;b\t\u0006$\u0018mQ8n[&$H/\u001a:UsB,W#\u0001\u001f\t\u000f\u00055\u0003\u0001\"\u0003\u0002J\u0005)R.\u001a;b\t\u0006$\u0018mQ8n[&$H/\u001a:QCRD\u0007\"CA)\u0001\t\u0007I\u0011BA*\u0003E\tgo\\5e\u000b6\u0004H/\u001f\"bi\u000eDWm]\u000b\u0002%\"9\u0011q\u000b\u0001!\u0002\u0013\u0011\u0016AE1w_&$W)\u001c9us\n\u000bGo\u00195fg\u0002B\u0011\"a\u0017\u0001\u0005\u0004%I!a\u0002\u0002%5\f\u0007\u0010U1sC2dW\r\u001c+ie\u0016\fGm\u001d\u0005\t\u0003?\u0002\u0001\u0015!\u0003\u0002\n\u0005\u0019R.\u0019=QCJ\fG\u000e\\3m)\"\u0014X-\u00193tA!9\u00111\r\u0001\u0005\u0002\u0005\u0015\u0014aB8qi&|gn]\u000b\u0002e!9\u0011\u0011\u000e\u0001\u0005\u0002\u0005-\u0014!E4fi\u001a\u000b\u0017\u000e\\(o\t\u0006$\u0018\rT8tgR\t!\u000bC\u0004\u0002p\u0001!\t!!\u001d\u0002\u0015!\f7OT3x\t\u0006$\u0018\rF\u0002S\u0003gB\u0001\"!\u001e\u0002n\u0001\u0007\u0011qH\u0001\ng\"\f'\u000fZ%oM>Dq!!\u001f\u0001\t\u0003\tY(A\tdC:\u001c%/Z1uK:+wOQ1uG\"$2AUA?\u0011!\ty(a\u001eA\u0002\u0005\u0005\u0015AC:iCJ$7/\u00138g_B)A'a!\u0002@%\u0019\u0011QQ\u001b\u0003\u000b\u0005\u0013(/Y=\t\u000f\u0005%\u0005\u0001\"\u0001\u0002\f\u0006\u0019\u0002.Y:TQ\u0006\u0014H-\u00128e\u0003N|eMZ:fiR\u0019!+!$\t\u0011\u0005U\u0014q\u0011a\u0001\u0003\u001f\u0003b!!%\u0002\"\u0006}b\u0002BAJ\u0003;sA!!&\u0002\u001c6\u0011\u0011q\u0013\u0006\u0004\u00033[\u0013A\u0002\u001fs_>$h(C\u00017\u0013\r\ty*N\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\t\u0019+!*\u0003\u0007M+\u0017OC\u0002\u0002 VBq!!+\u0001\t\u0003\nY+A\u0005hKR|eMZ:fiV\u0011\u0011Q\u0016\t\u0005iI\fy\u000bE\u0002\u0017\u0003cK1!a-\u0018\u0005\u0019yeMZ:fi\"9\u0011q\u0017\u0001\u0005B\u0005e\u0016\u0001C4fi\n\u000bGo\u00195\u0015\r\u0005m\u0016q[An!\u0011\ti,!5\u000f\t\u0005}\u0016q\u001a\b\u0005\u0003\u0003\fiM\u0004\u0003\u0002D\u0006-g\u0002BAc\u0003\u0013tA!!&\u0002H&\t1\"\u0003\u0002\n\u0015%\u0011q\u0001C\u0005\u0003\u000b\u0019I1!a(\u0005\u0013\u0011\t\u0019.!6\u0003\u0013\u0011\u000bG/\u0019$sC6,'bAAP\t!A\u0011\u0011\\A[\u0001\u0004\ti+A\u0003ti\u0006\u0014H\u000f\u0003\u0005\u0002^\u0006U\u0006\u0019AAX\u0003\r)g\u000e\u001a\u0005\b\u0003C\u0004A\u0011IAr\u0003\u0019\u00198\r[3nCV\u0011\u0011Q\u001d\t\u0005\u0003O\fi/\u0004\u0002\u0002j*\u0019\u00111\u001e\u0003\u0002\u000bQL\b/Z:\n\t\u0005=\u0018\u0011\u001e\u0002\u000b'R\u0014Xo\u0019;UsB,\u0007bBAz\u0001\u0011\u0005\u0013Q_\u0001\u0005gR|\u0007\u000fF\u0001{\u0011\u001d\tI\u0010\u0001C!\u0003w\faaY8n[&$Hc\u0001>\u0002~\"A\u0011Q\\A|\u0001\u0004\ty\u000bC\u0004\u0003\u0002\u0001!\tEa\u0001\u0002\u0011Q|7\u000b\u001e:j]\u001e$\u0012\u0001\u0010\u0005\b\u0005\u000f\u0001A\u0011\u0002B\u0005\u0003I\u0001(/\u001a<CCR\u001c\u0007n\u00155be\u0012LeNZ8\u0015\t\u0005=%1\u0002\u0005\t\u0005\u001b\u0011)\u00011\u0001\u0002\u001a\u00059!-\u0019;dQ&#w\u0001\u0003B\t\u0005!\u0005!Aa\u0005\u0002\u001b-Kg.Z:jgN{WO]2f!\r1%Q\u0003\u0004\b\u0003\tA\tA\u0001B\f'\u0019\u0011)B!\u0007\u0003 A\u0019AGa\u0007\n\u0007\tuQG\u0001\u0004B]f\u0014VM\u001a\t\u0004i\t\u0005\u0012B\u0001\u00126\u0011\u001d1&Q\u0003C\u0001\u0005K!\"Aa\u0005\t\u0015\t%\"Q\u0003b\u0001\n\u0003\t9!A\u0004W\u000bJ\u001b\u0016j\u0014(\t\u0013\t5\"Q\u0003Q\u0001\n\u0005%\u0011\u0001\u0003,F%NKuJ\u0014\u0011\t\u0015\tE\"Q\u0003a\u0001\n\u0013\u0011\u0019$A\u0006`Q\u0006$wn\u001c9D_:4WC\u0001B\u001b!\u0011\u00119D!\u0010\u000e\u0005\te\"b\u0001B\u001e\r\u0005!Q\u000f^5m\u0013\u0011\u0011yD!\u000f\u00033M+'/[1mSj\f'\r\\3D_:4\u0017nZ;sCRLwN\u001c\u0005\u000b\u0005\u0007\u0012)\u00021A\u0005\n\t\u0015\u0013aD0iC\u0012|w\u000e]\"p]\u001a|F%Z9\u0015\u0007i\u00149\u0005C\u0005\u007f\u0005\u0003\n\t\u00111\u0001\u00036!I!1\nB\u000bA\u0003&!QG\u0001\r?\"\fGm\\8q\u0007>tg\r\t\u0005\t\u0005\u001f\u0012)\u0002\"\u0001\u0003R\u0005Q\u0001.\u00193p_B\u001cuN\u001c4\u0015\t\tU\"1\u000b\u0005\u0007U\t5\u0003\u0019\u0001\u0017\t\u0015\t]#QCI\u0001\n\u0003\u0011I&A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$H\u0005O\u000b\u0003\u00057R3A\u0015B/W\t\u0011y\u0006\u0005\u0003\u0003b\t-TB\u0001B2\u0015\u0011\u0011)Ga\u001a\u0002\u0013Ut7\r[3dW\u0016$'b\u0001B5k\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\t5$1\r\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0007B\u0003B9\u0005+\t\t\u0011\"\u0003\u0003t\u0005Y!/Z1e%\u0016\u001cx\u000e\u001c<f)\u0005i\u0001")
/* loaded from: input_file:org/apache/spark/sql/kinesis/KinesisSource.class */
public class KinesisSource implements Source, Serializable, Logging {
    private final SQLContext sqlContext;
    private final Map<String, String> sourceOptions;
    public final String org$apache$spark$sql$kinesis$KinesisSource$$metadataPath;
    public final String org$apache$spark$sql$kinesis$KinesisSource$$streamName;
    private final InitialKinesisPosition initialPosition;
    private final String endPointURL;
    private final SparkAWSCredentials kinesisCredsProvider;
    private final boolean failOnDataLoss;
    private Option<ShardOffsets> currentShardOffsets;
    private final int minBatchesToRetain;
    private final long describeShardInterval;
    private long latestDescribeShardTimestamp;
    private final boolean avoidEmptyBatches;
    private final int maxParallelThreads;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static SerializableConfiguration hadoopConf(SQLContext sQLContext) {
        return KinesisSource$.MODULE$.hadoopConf(sQLContext);
    }

    public static int VERSION() {
        return KinesisSource$.MODULE$.VERSION();
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.class.initializeLogIfNecessary(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.class.initializeLogIfNecessary(this, z, z2);
    }

    public void initalizeLoganIfNecessary() {
        Logging.class.initalizeLoganIfNecessary(this);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.class.initializeLogIfNecessary$default$2(this);
    }

    private SparkContext sc() {
        return this.sqlContext.sparkContext();
    }

    private KinesisReader kinesisReader() {
        return new KinesisReader(this.sourceOptions, this.org$apache$spark$sql$kinesis$KinesisSource$$streamName, this.kinesisCredsProvider, this.endPointURL);
    }

    private Option<ShardOffsets> currentShardOffsets() {
        return this.currentShardOffsets;
    }

    private void currentShardOffsets_$eq(Option<ShardOffsets> option) {
        this.currentShardOffsets = option;
    }

    private int minBatchesToRetain() {
        return this.minBatchesToRetain;
    }

    private long describeShardInterval() {
        return this.describeShardInterval;
    }

    private long latestDescribeShardTimestamp() {
        return this.latestDescribeShardTimestamp;
    }

    private void latestDescribeShardTimestamp_$eq(long j) {
        this.latestDescribeShardTimestamp = j;
    }

    private MetadataCommitter<ShardInfo> metadataCommitter() {
        if ("hdfs".equals(metaDataCommitterType().toLowerCase(Locale.ROOT))) {
            return new HDFSMetadataCommitter(metaDataCommitterPath(), KinesisSource$.MODULE$.hadoopConf(this.sqlContext), this.sourceOptions, ClassTag$.MODULE$.apply(ShardInfo.class));
        }
        throw new IllegalArgumentException("only HDFS is supported");
    }

    private String metaDataCommitterType() {
        return ((String) this.sourceOptions.getOrElse("executor.metadata.committer", new KinesisSource$$anonfun$metaDataCommitterType$1(this))).toString();
    }

    private String metaDataCommitterPath() {
        return ((String) this.sourceOptions.getOrElse("executor.metadata.path", new KinesisSource$$anonfun$metaDataCommitterPath$1(this))).toString();
    }

    private boolean avoidEmptyBatches() {
        return this.avoidEmptyBatches;
    }

    private int maxParallelThreads() {
        return this.maxParallelThreads;
    }

    public Map<String, String> options() {
        return this.sourceOptions;
    }

    public boolean getFailOnDataLoss() {
        return this.failOnDataLoss;
    }

    public boolean hasNewData(ShardInfo shardInfo) {
        KinesisReader kinesisReader = kinesisReader();
        GetRecordsResult kinesisRecords = kinesisReader().getKinesisRecords(kinesisReader.getShardIterator(shardInfo.shardId(), shardInfo.iteratorType(), shardInfo.iteratorPosition(), kinesisReader.getShardIterator$default$4()), 1);
        return kinesisRecords.getRecords().size() > 0 || kinesisRecords.getMillisBehindLatest().longValue() > 0;
    }

    public boolean canCreateNewBatch(ShardInfo[] shardInfoArr) {
        ParArray par = Predef$.MODULE$.refArrayOps(shardInfoArr).par();
        ForkJoinPool newForkJoinPool = ThreadUtils$.MODULE$.newForkJoinPool("checkCreateNewBatch", Math.min(maxParallelThreads(), par.size()));
        par.tasksupport_$eq(new ForkJoinTaskSupport(newForkJoinPool));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        try {
            par.foreach(new KinesisSource$$anonfun$canCreateNewBatch$1(this, atomicBoolean));
            newForkJoinPool.shutdown();
            logDebug(new KinesisSource$$anonfun$canCreateNewBatch$2(this, atomicBoolean));
            return atomicBoolean.get();
        } catch (Throwable th) {
            newForkJoinPool.shutdown();
            throw th;
        }
    }

    public boolean hasShardEndAsOffset(Seq<ShardInfo> seq) {
        return seq.exists(new KinesisSource$$anonfun$hasShardEndAsOffset$1(this));
    }

    public synchronized Option<Offset> getOffset() {
        Seq<ShardInfo> latestShardInfo;
        None$ some;
        long batchId = ((ShardOffsets) currentShardOffsets().getOrElse(new KinesisSource$$anonfun$6(this, new ShardOffsets(-1L, this.org$apache$spark$sql$kinesis$KinesisSource$$streamName)))).batchId();
        Seq<ShardInfo> prevBatchShardInfo = prevBatchShardInfo(batchId);
        if (batchId < 0 || latestDescribeShardTimestamp() == -1 || latestDescribeShardTimestamp() + describeShardInterval() < System.currentTimeMillis()) {
            Seq<Shard> shards = kinesisReader().getShards();
            latestDescribeShardTimestamp_$eq(System.currentTimeMillis());
            latestShardInfo = ShardSyncer$.MODULE$.getLatestShardInfo(shards, prevBatchShardInfo, this.initialPosition, this.failOnDataLoss);
        } else {
            latestShardInfo = prevBatchShardInfo;
        }
        ShardInfo[] shardInfoArr = (ShardInfo[]) latestShardInfo.toArray(ClassTag$.MODULE$.apply(ShardInfo.class));
        if (!avoidEmptyBatches() || batchId < 0 || hasShardEndAsOffset(Predef$.MODULE$.wrapRefArray(shardInfoArr)) || ShardSyncer$.MODULE$.hasNewShards(prevBatchShardInfo, Predef$.MODULE$.wrapRefArray(shardInfoArr)) || ShardSyncer$.MODULE$.hasDeletedShards(prevBatchShardInfo, Predef$.MODULE$.wrapRefArray(shardInfoArr)) || canCreateNewBatch(shardInfoArr)) {
            currentShardOffsets_$eq(new Some(new ShardOffsets(batchId + 1, this.org$apache$spark$sql$kinesis$KinesisSource$$streamName, shardInfoArr)));
        } else {
            log().info("Offsets are unchanged since `kinesis.client.avoidEmptyBatches` is enabled");
        }
        Some currentShardOffsets = currentShardOffsets();
        if (None$.MODULE$.equals(currentShardOffsets)) {
            some = None$.MODULE$;
        } else {
            if (!(currentShardOffsets instanceof Some)) {
                throw new MatchError(currentShardOffsets);
            }
            some = new Some(new KinesisSourceOffset((ShardOffsets) currentShardOffsets.x()));
        }
        return some;
    }

    public Dataset<Row> getBatch(Option<Offset> option, Offset offset) {
        long j;
        logInfo(new KinesisSource$$anonfun$getBatch$1(this, offset));
        ShardOffsets shardOffsets = KinesisSourceOffset$.MODULE$.getShardOffsets(offset);
        long batchId = shardOffsets.batchId();
        if (option instanceof Some) {
            j = KinesisSourceOffset$.MODULE$.getShardOffsets((Offset) ((Some) option).x()).batchId();
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            j = -1;
        }
        Predef$.MODULE$.assert(j <= batchId);
        Seq seq = (Seq) ((SeqLike) shardOffsets.shardInfoMap().values().toSeq().filter(new KinesisSource$$anonfun$7(this))).sortBy(new KinesisSource$$anonfun$8(this), Ordering$String$.MODULE$);
        logInfo(new KinesisSource$$anonfun$getBatch$2(this, seq));
        RDD map = new KinesisSourceRDD(sc(), this.sourceOptions, this.org$apache$spark$sql$kinesis$KinesisSource$$streamName, batchId, seq, this.kinesisCredsProvider, this.endPointURL, KinesisSource$.MODULE$.hadoopConf(this.sqlContext), this.org$apache$spark$sql$kinesis$KinesisSource$$metadataPath, this.failOnDataLoss).map(new KinesisSource$$anonfun$9(this), ClassTag$.MODULE$.apply(InternalRow.class));
        if (currentShardOffsets().isEmpty()) {
            currentShardOffsets_$eq(new Some(shardOffsets));
        }
        logInfo(new KinesisSource$$anonfun$getBatch$3(this, seq));
        return this.sqlContext.internalCreateDataFrame(map, schema(), true);
    }

    public StructType schema() {
        return KinesisReader$.MODULE$.kinesisSchema();
    }

    public synchronized void stop() {
        kinesisReader().close();
    }

    public void commit(Offset offset) {
        long batchId = ((ShardOffsets) currentShardOffsets().getOrElse(new KinesisSource$$anonfun$10(this, new ShardOffsets(-1L, this.org$apache$spark$sql$kinesis$KinesisSource$$streamName)))).batchId() - minBatchesToRetain();
        if (batchId >= 0) {
            logInfo(new KinesisSource$$anonfun$commit$1(this, batchId));
            metadataCommitter().purge(batchId);
        }
    }

    public String toString() {
        return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"KinesisSource[", "]"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.org$apache$spark$sql$kinesis$KinesisSource$$streamName}));
    }

    private Seq<ShardInfo> prevBatchShardInfo(long j) {
        Seq<ShardInfo> seq;
        if (j < 0) {
            logInfo(new KinesisSource$$anonfun$11(this));
            seq = (Seq) Seq$.MODULE$.empty();
        } else {
            logDebug(new KinesisSource$$anonfun$12(this, j));
            Seq<ShardInfo> seq2 = metadataCommitter().get(j);
            if (seq2.isEmpty()) {
                throw new IllegalStateException(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Unable to fetch "})).s(Nil$.MODULE$)).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"committed metadata from previous batch. Some data may have been missed"})).s(Nil$.MODULE$)).toString());
            }
            seq = seq2;
        }
        Seq<ShardInfo> seq3 = seq;
        logDebug(new KinesisSource$$anonfun$prevBatchShardInfo$1(this, seq3));
        return seq3;
    }

    public KinesisSource(SQLContext sQLContext, Map<String, String> map, String str, String str2, InitialKinesisPosition initialKinesisPosition, String str3, SparkAWSCredentials sparkAWSCredentials, boolean z) {
        this.sqlContext = sQLContext;
        this.sourceOptions = map;
        this.org$apache$spark$sql$kinesis$KinesisSource$$metadataPath = str;
        this.org$apache$spark$sql$kinesis$KinesisSource$$streamName = str2;
        this.initialPosition = initialKinesisPosition;
        this.endPointURL = str3;
        this.kinesisCredsProvider = sparkAWSCredentials;
        this.failOnDataLoss = z;
        Source.class.$init$(this);
        Logging.class.$init$(this);
        this.currentShardOffsets = None$.MODULE$;
        this.minBatchesToRetain = sQLContext.sparkSession().sessionState().conf().minBatchesToRetain();
        Predef$.MODULE$.require(minBatchesToRetain() > 0, new KinesisSource$$anonfun$1(this));
        this.describeShardInterval = Utils$.MODULE$.timeStringAsMs((String) map.getOrElse(KinesisSourceProvider$.MODULE$.DESCRIBE_SHARD_INTERVAL(), new KinesisSource$$anonfun$2(this)));
        Predef$.MODULE$.require(describeShardInterval() >= 0, new KinesisSource$$anonfun$3(this));
        this.latestDescribeShardTimestamp = -1L;
        this.avoidEmptyBatches = new StringOps(Predef$.MODULE$.augmentString((String) map.getOrElse("client.avoidEmptyBatches".toLowerCase(Locale.ROOT), new KinesisSource$$anonfun$4(this)))).toBoolean();
        this.maxParallelThreads = new StringOps(Predef$.MODULE$.augmentString((String) map.getOrElse("client.maxParallelThreads".toLowerCase(Locale.ROOT), new KinesisSource$$anonfun$5(this)))).toInt();
    }
}
