package org.apache.spark.sql.kinesis;

import java.io.Serializable;
import java.sql.Timestamp;
import java.util.Locale;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.spark.SparkContext;
import org.apache.spark.internal.Logging;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.InternalRow$;
import org.apache.spark.sql.catalyst.util.DateTimeUtils$;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.model.GetRecordsResult;
import org.apache.spark.sql.kinesis.shaded.amazonaws.services.kinesis.model.Shard;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.util.SerializableConfiguration;
import org.apache.spark.util.ThreadUtils$;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.SeqLike;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.collection.parallel.ForkJoinTaskSupport;
import scala.collection.parallel.mutable.ParArray;
import scala.math.Ordering$String$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KinesisSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\t]d!\u0002\u001b6\u0001Uz\u0004\u0002\u0003/\u0001\u0005\u0003\u0005\u000b\u0011\u00020\t\u0011\t\u0004!\u0011!Q\u0001\n\rD\u0001b\u001d\u0001\u0003\u0002\u0003\u0006I\u0001\u001d\u0005\ti\u0002\u0011\t\u0011)A\u0005a\"AQ\u000f\u0001B\u0001B\u0003%a\u000f\u0003\u0005{\u0001\t\u0005\t\u0015!\u0003q\u0011!Y\bA!A!\u0002\u0013a\b\"C@\u0001\u0005\u0003\u0005\u000b\u0011BA\u0001\u0011\u001d\tI\u0001\u0001C\u0001\u0003\u0017Aq!a\b\u0001\t\u0013\t\t\u0003C\u0004\u0002,\u0001!I!!\f\t\u0013\u0005U\u0002\u00011A\u0005\n\u0005]\u0002\"CA#\u0001\u0001\u0007I\u0011BA$\u0011!\t\u0019\u0006\u0001Q!\n\u0005e\u0002\"CA+\u0001\t\u0007I\u0011BA,\u0011!\ty\u0006\u0001Q\u0001\n\u0005e\u0003\"CA1\u0001\t\u0007I\u0011BA2\u0011!\tY\u0007\u0001Q\u0001\n\u0005\u0015\u0004\"CA7\u0001\u0001\u0007I\u0011BA2\u0011%\ty\u0007\u0001a\u0001\n\u0013\t\t\b\u0003\u0005\u0002v\u0001\u0001\u000b\u0015BA3\u0011\u001d\t9\b\u0001C\u0005\u0003sBq!a\"\u0001\t\u0013\tI\tC\u0004\u0002\f\u0002!I!!#\t\u0013\u00055\u0005A1A\u0005\n\u0005=\u0005\u0002CAI\u0001\u0001\u0006I!!\u0001\t\u0013\u0005M\u0005A1A\u0005\n\u0005]\u0003\u0002CAK\u0001\u0001\u0006I!!\u0017\t\u000f\u0005]\u0005\u0001\"\u0001\u0002\u001a\"9\u00111\u0014\u0001\u0005\u0002\u0005u\u0005bBAP\u0001\u0011\u0005\u0011\u0011\u0015\u0005\b\u0003O\u0003A\u0011AAU\u0011\u001d\t)\f\u0001C\u0001\u0003oCq!!4\u0001\t\u0003\ny\rC\u0004\u0002Z\u0002!\t%a7\t\u000f\t\u0005\u0001\u0001\"\u0011\u0003\u0004!9!\u0011\u0003\u0001\u0005B\tM\u0001b\u0002B\u000b\u0001\u0011\u0005#q\u0003\u0005\b\u00057\u0001A\u0011\tB\u000f\u0011\u001d\u0011y\u0002\u0001C\u0005\u0005C9\u0001Ba\n6\u0011\u0003)$\u0011\u0006\u0004\biUB\t!\u000eB\u0016\u0011\u001d\tIA\u000bC\u0001\u0005oA\u0011B!\u000f+\u0005\u0004%\t!a\u0016\t\u0011\tm\"\u0006)A\u0005\u00033B\u0011B!\u0010+\u0001\u0004%IAa\u0010\t\u0013\t5#\u00061A\u0005\n\t=\u0003\u0002\u0003B*U\u0001\u0006KA!\u0011\t\u000f\tU#\u0006\"\u0001\u0003X!I!1\f\u0016\u0012\u0002\u0013\u0005!Q\f\u0005\n\u0005gR\u0013\u0011!C\u0005\u0005k\u0012QbS5oKNL7oU8ve\u000e,'B\u0001\u001c8\u0003\u001dY\u0017N\\3tSNT!\u0001O\u001d\u0002\u0007M\fHN\u0003\u0002;w\u0005)1\u000f]1sW*\u0011A(P\u0001\u0007CB\f7\r[3\u000b\u0003y\n1a\u001c:h'\u0015\u0001\u0001\t\u0013)W!\t\te)D\u0001C\u0015\t\u0019E)\u0001\u0003mC:<'\"A#\u0002\t)\fg/Y\u0005\u0003\u000f\n\u0013aa\u00142kK\u000e$\bCA%O\u001b\u0005Q%BA&M\u0003%\u0019HO]3b[&twM\u0003\u0002No\u0005IQ\r_3dkRLwN\\\u0005\u0003\u001f*\u0013aaU8ve\u000e,\u0007CA)U\u001b\u0005\u0011&BA*E\u0003\tIw.\u0003\u0002V%\na1+\u001a:jC2L'0\u00192mKB\u0011qKW\u0007\u00021*\u0011\u0011,O\u0001\tS:$XM\u001d8bY&\u00111\f\u0017\u0002\b\u0019><w-\u001b8h\u0003)\u0019\u0018\u000f\\\"p]R,\u0007\u0010^\u0002\u0001!\ty\u0006-D\u00018\u0013\t\twG\u0001\u0006T#2\u001buN\u001c;fqR\fQb]8ve\u000e,w\n\u001d;j_:\u001c\b\u0003\u00023naBt!!Z6\u0011\u0005\u0019LW\"A4\u000b\u0005!l\u0016A\u0002\u001fs_>$hHC\u0001k\u0003\u0015\u00198-\u00197b\u0013\ta\u0017.\u0001\u0004Qe\u0016$WMZ\u0005\u0003]>\u00141!T1q\u0015\ta\u0017\u000e\u0005\u0002ec&\u0011!o\u001c\u0002\u0007'R\u0014\u0018N\\4\u0002\u00195,G/\u00193bi\u0006\u0004\u0016\r\u001e5\u0002\u0015M$(/Z1n\u001d\u0006lW-A\bj]&$\u0018.\u00197Q_NLG/[8o!\t9\b0D\u00016\u0013\tIXG\u0001\fJ]&$\u0018.\u00197LS:,7/[:Q_NLG/[8o\u0003-)g\u000e\u001a)pS:$XK\u0015'\u0002)-Lg.Z:jg\u000e\u0013X\rZ:Qe>4\u0018\u000eZ3s!\t9X0\u0003\u0002\u007fk\t\u00192\u000b]1sW\u0006;6k\u0011:fI\u0016tG/[1mg\u0006qa-Y5m\u001f:$\u0015\r^1M_N\u001c\b\u0003BA\u0002\u0003\u000bi\u0011![\u0005\u0004\u0003\u000fI'a\u0002\"p_2,\u0017M\\\u0001\u0007y%t\u0017\u000e\u001e \u0015%\u00055\u0011qBA\t\u0003'\t)\"a\u0006\u0002\u001a\u0005m\u0011Q\u0004\t\u0003o\u0002AQ\u0001X\u0005A\u0002yCQAY\u0005A\u0002\rDQa]\u0005A\u0002ADQ\u0001^\u0005A\u0002ADQ!^\u0005A\u0002YDQA_\u0005A\u0002ADQa_\u0005A\u0002qD\u0001b`\u0005\u0011\u0002\u0003\u0007\u0011\u0011A\u0001\u0003g\u000e,\"!a\t\u0011\t\u0005\u0015\u0012qE\u0007\u0002s%\u0019\u0011\u0011F\u001d\u0003\u0019M\u0003\u0018M]6D_:$X\r\u001f;\u0002\u001b-Lg.Z:jgJ+\u0017\rZ3s+\t\ty\u0003E\u0002x\u0003cI1!a\r6\u00055Y\u0015N\\3tSN\u0014V-\u00193fe\u0006\u00192-\u001e:sK:$8\u000b[1sI>3gm]3ugV\u0011\u0011\u0011\b\t\u0007\u0003\u0007\tY$a\u0010\n\u0007\u0005u\u0012N\u0001\u0004PaRLwN\u001c\t\u0004o\u0006\u0005\u0013bAA\"k\ta1\u000b[1sI>3gm]3ug\u000692-\u001e:sK:$8\u000b[1sI>3gm]3ug~#S-\u001d\u000b\u0005\u0003\u0013\ny\u0005\u0005\u0003\u0002\u0004\u0005-\u0013bAA'S\n!QK\\5u\u0011%\t\t&DA\u0001\u0002\u0004\tI$A\u0002yIE\nAcY;se\u0016tGo\u00155be\u0012|eMZ:fiN\u0004\u0013AE7j]\n\u000bGo\u00195fgR{'+\u001a;bS:,\"!!\u0017\u0011\t\u0005\r\u00111L\u0005\u0004\u0003;J'aA%oi\u0006\u0019R.\u001b8CCR\u001c\u0007.Z:U_J+G/Y5oA\u0005)B-Z:de&\u0014Wm\u00155be\u0012Le\u000e^3sm\u0006dWCAA3!\u0011\t\u0019!a\u001a\n\u0007\u0005%\u0014N\u0001\u0003M_:<\u0017A\u00063fg\u000e\u0014\u0018NY3TQ\u0006\u0014H-\u00138uKJ4\u0018\r\u001c\u0011\u000291\fG/Z:u\t\u0016\u001c8M]5cKNC\u0017M\u001d3US6,7\u000f^1na\u0006\u0001C.\u0019;fgR$Um]2sS\n,7\u000b[1sIRKW.Z:uC6\u0004x\fJ3r)\u0011\tI%a\u001d\t\u0013\u0005EC#!AA\u0002\u0005\u0015\u0014!\b7bi\u0016\u001cH\u000fR3tGJL'-Z*iCJ$G+[7fgR\fW\u000e\u001d\u0011\u0002#5,G/\u00193bi\u0006\u001cu.\\7jiR,'/\u0006\u0002\u0002|A)q/! \u0002\u0002&\u0019\u0011qP\u001b\u0003#5+G/\u00193bi\u0006\u001cu.\\7jiR,'\u000fE\u0002x\u0003\u0007K1!!\"6\u0005%\u0019\u0006.\u0019:e\u0013:4w.A\u000bnKR\fG)\u0019;b\u0007>lW.\u001b;uKJ$\u0016\u0010]3\u0016\u0003A\fQ#\\3uC\u0012\u000bG/Y\"p[6LG\u000f^3s!\u0006$\b.A\tbm>LG-R7qif\u0014\u0015\r^2iKN,\"!!\u0001\u0002%\u00054x.\u001b3F[B$\u0018PQ1uG\",7\u000fI\u0001\u0013[\u0006D\b+\u0019:bY2,G\u000e\u00165sK\u0006$7/A\nnCb\u0004\u0016M]1mY\u0016dG\u000b\u001b:fC\u0012\u001c\b%A\u0004paRLwN\\:\u0016\u0003\r\f\u0011cZ3u\r\u0006LGn\u00148ECR\fGj\\:t)\t\t\t!\u0001\u0006iCNtUm\u001e#bi\u0006$B!!\u0001\u0002$\"9\u0011QU\u0010A\u0002\u0005\u0005\u0015!C:iCJ$\u0017J\u001c4p\u0003E\u0019\u0017M\\\"sK\u0006$XMT3x\u0005\u0006$8\r\u001b\u000b\u0005\u0003\u0003\tY\u000bC\u0004\u0002.\u0002\u0002\r!a,\u0002\u0015MD\u0017M\u001d3t\u0013:4w\u000e\u0005\u0004\u0002\u0004\u0005E\u0016\u0011Q\u0005\u0004\u0003gK'!B!se\u0006L\u0018a\u00055bgNC\u0017M\u001d3F]\u0012\f5o\u00144gg\u0016$H\u0003BA\u0001\u0003sCq!!*\"\u0001\u0004\tY\f\u0005\u0004\u0002>\u0006\u001d\u0017\u0011\u0011\b\u0005\u0003\u007f\u000b\u0019MD\u0002g\u0003\u0003L\u0011A[\u0005\u0004\u0003\u000bL\u0017a\u00029bG.\fw-Z\u0005\u0005\u0003\u0013\fYMA\u0002TKFT1!!2j\u0003%9W\r^(gMN,G/\u0006\u0002\u0002RB1\u00111AA\u001e\u0003'\u00042!SAk\u0013\r\t9N\u0013\u0002\u0007\u001f\u001a47/\u001a;\u0002\u0011\u001d,GOQ1uG\"$b!!8\u0002z\u0006u\b\u0003BAp\u0003gtA!!9\u0002r:!\u00111]Ax\u001d\u0011\t)/!<\u000f\t\u0005\u001d\u00181\u001e\b\u0004M\u0006%\u0018\"\u0001 \n\u0005qj\u0014B\u0001\u001e<\u0013\tA\u0014(C\u0002\u0002F^JA!!>\u0002x\nIA)\u0019;b\rJ\fW.\u001a\u0006\u0004\u0003\u000b<\u0004bBA~G\u0001\u0007\u0011\u0011[\u0001\u0006gR\f'\u000f\u001e\u0005\b\u0003\u007f\u001c\u0003\u0019AAj\u0003\r)g\u000eZ\u0001\u0007g\u000eDW-\\1\u0016\u0005\t\u0015\u0001\u0003\u0002B\u0004\u0005\u001bi!A!\u0003\u000b\u0007\t-q'A\u0003usB,7/\u0003\u0003\u0003\u0010\t%!AC*ueV\u001cG\u000fV=qK\u0006!1\u000f^8q)\t\tI%\u0001\u0004d_6l\u0017\u000e\u001e\u000b\u0005\u0003\u0013\u0012I\u0002C\u0004\u0002��\u001a\u0002\r!a5\u0002\u0011Q|7\u000b\u001e:j]\u001e$\u0012\u0001]\u0001\u0013aJ,gOQ1uG\"\u001c\u0006.\u0019:e\u0013:4w\u000e\u0006\u0003\u0002<\n\r\u0002b\u0002B\u0013Q\u0001\u0007\u0011QM\u0001\bE\u0006$8\r[%e\u00035Y\u0015N\\3tSN\u001cv.\u001e:dKB\u0011qOK\n\u0006U\t5\"1\u0007\t\u0005\u0003\u0007\u0011y#C\u0002\u00032%\u0014a!\u00118z%\u00164\u0007\u0003BA\u0002\u0005kI!!V5\u0015\u0005\t%\u0012a\u0002,F%NKuJT\u0001\t-\u0016\u00136+S(OA\u0005Yq\f[1e_>\u00048i\u001c8g+\t\u0011\t\u0005\u0005\u0003\u0003D\t%SB\u0001B#\u0015\r\u00119%O\u0001\u0005kRLG.\u0003\u0003\u0003L\t\u0015#!G*fe&\fG.\u001b>bE2,7i\u001c8gS\u001e,(/\u0019;j_:\fqb\u00185bI>|\u0007oQ8oM~#S-\u001d\u000b\u0005\u0003\u0013\u0012\t\u0006C\u0005\u0002R=\n\t\u00111\u0001\u0003B\u0005aq\f[1e_>\u00048i\u001c8gA\u0005Q\u0001.\u00193p_B\u001cuN\u001c4\u0015\t\t\u0005#\u0011\f\u0005\u00069F\u0002\rAX\u0001\u001cI1,7o]5oSR$sM]3bi\u0016\u0014H\u0005Z3gCVdG\u000f\n\u001d\u0016\u0005\t}#\u0006BA\u0001\u0005CZ#Aa\u0019\u0011\t\t\u0015$qN\u0007\u0003\u0005ORAA!\u001b\u0003l\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0005[J\u0017AC1o]>$\u0018\r^5p]&!!\u0011\u000fB4\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u0001\fe\u0016\fGMU3t_24X\rF\u0001A\u0001")
/* loaded from: input_file:org/apache/spark/sql/kinesis/KinesisSource.class */
public class KinesisSource implements Source, Serializable, Logging {
    private final SQLContext sqlContext;
    private final Map<String, String> sourceOptions;
    private final String metadataPath;
    private final String streamName;
    private final InitialKinesisPosition initialPosition;
    private final String endPointURL;
    private final SparkAWSCredentials kinesisCredsProvider;
    private final boolean failOnDataLoss;
    private Option<ShardOffsets> currentShardOffsets;
    private final int minBatchesToRetain;
    private final long describeShardInterval;
    private long latestDescribeShardTimestamp;
    private final boolean avoidEmptyBatches;
    private final int maxParallelThreads;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static SerializableConfiguration hadoopConf(SQLContext sQLContext) {
        return KinesisSource$.MODULE$.hadoopConf(sQLContext);
    }

    public static int VERSION() {
        return KinesisSource$.MODULE$.VERSION();
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Offset initialOffset() {
        return Source.initialOffset$(this);
    }

    public Offset deserializeOffset(String str) {
        return Source.deserializeOffset$(this, str);
    }

    public void commit(Offset offset) {
        Source.commit$(this, offset);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private SparkContext sc() {
        return this.sqlContext.sparkContext();
    }

    private KinesisReader kinesisReader() {
        return new KinesisReader(this.sourceOptions, this.streamName, this.kinesisCredsProvider, this.endPointURL);
    }

    private Option<ShardOffsets> currentShardOffsets() {
        return this.currentShardOffsets;
    }

    private void currentShardOffsets_$eq(Option<ShardOffsets> option) {
        this.currentShardOffsets = option;
    }

    private int minBatchesToRetain() {
        return this.minBatchesToRetain;
    }

    private long describeShardInterval() {
        return this.describeShardInterval;
    }

    private long latestDescribeShardTimestamp() {
        return this.latestDescribeShardTimestamp;
    }

    private void latestDescribeShardTimestamp_$eq(long j) {
        this.latestDescribeShardTimestamp = j;
    }

    private MetadataCommitter<ShardInfo> metadataCommitter() {
        if ("hdfs".equals(metaDataCommitterType().toLowerCase(Locale.ROOT))) {
            return new HDFSMetadataCommitter(metaDataCommitterPath(), KinesisSource$.MODULE$.hadoopConf(this.sqlContext), this.sourceOptions, ClassTag$.MODULE$.apply(ShardInfo.class));
        }
        throw new IllegalArgumentException("only HDFS is supported");
    }

    private String metaDataCommitterType() {
        return ((String) this.sourceOptions.getOrElse("executor.metadata.committer", () -> {
            return "hdfs";
        })).toString();
    }

    private String metaDataCommitterPath() {
        return ((String) this.sourceOptions.getOrElse("executor.metadata.path", () -> {
            return this.metadataPath;
        })).toString();
    }

    private boolean avoidEmptyBatches() {
        return this.avoidEmptyBatches;
    }

    private int maxParallelThreads() {
        return this.maxParallelThreads;
    }

    public Map<String, String> options() {
        return this.sourceOptions;
    }

    public boolean getFailOnDataLoss() {
        return this.failOnDataLoss;
    }

    public boolean hasNewData(ShardInfo shardInfo) {
        KinesisReader kinesisReader = kinesisReader();
        GetRecordsResult kinesisRecords = kinesisReader().getKinesisRecords(kinesisReader.getShardIterator(shardInfo.shardId(), shardInfo.iteratorType(), shardInfo.iteratorPosition(), kinesisReader.getShardIterator$default$4()), 1);
        return kinesisRecords.getRecords().size() > 0 || kinesisRecords.getMillisBehindLatest().longValue() > 0;
    }

    public boolean canCreateNewBatch(ShardInfo[] shardInfoArr) {
        ParArray par = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(shardInfoArr)).par();
        ForkJoinPool newForkJoinPool = ThreadUtils$.MODULE$.newForkJoinPool("checkCreateNewBatch", Math.min(maxParallelThreads(), par.size()));
        par.tasksupport_$eq(new ForkJoinTaskSupport(newForkJoinPool));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        try {
            par.foreach(shardInfo -> {
                $anonfun$canCreateNewBatch$1(this, atomicBoolean, shardInfo);
                return BoxedUnit.UNIT;
            });
            newForkJoinPool.shutdown();
            logDebug(() -> {
                return new StringBuilder(23).append("Can create new batch = ").append(atomicBoolean.get()).toString();
            });
            return atomicBoolean.get();
        } catch (Throwable th) {
            newForkJoinPool.shutdown();
            throw th;
        }
    }

    public boolean hasShardEndAsOffset(Seq<ShardInfo> seq) {
        return seq.exists(shardInfo -> {
            return BoxesRunTime.boxToBoolean($anonfun$hasShardEndAsOffset$1(shardInfo));
        });
    }

    public synchronized Option<org.apache.spark.sql.execution.streaming.Offset> getOffset() {
        Seq<ShardInfo> latestShardInfo;
        None$ some;
        ShardOffsets shardOffsets = new ShardOffsets(-1L, this.streamName);
        long batchId = ((ShardOffsets) currentShardOffsets().getOrElse(() -> {
            return shardOffsets;
        })).batchId();
        Seq<ShardInfo> prevBatchShardInfo = prevBatchShardInfo(batchId);
        if (batchId < 0 || latestDescribeShardTimestamp() == -1 || latestDescribeShardTimestamp() + describeShardInterval() < System.currentTimeMillis()) {
            Seq<Shard> shards = kinesisReader().getShards();
            latestDescribeShardTimestamp_$eq(System.currentTimeMillis());
            latestShardInfo = ShardSyncer$.MODULE$.getLatestShardInfo(shards, prevBatchShardInfo, this.initialPosition, this.failOnDataLoss);
        } else {
            latestShardInfo = prevBatchShardInfo;
        }
        ShardInfo[] shardInfoArr = (ShardInfo[]) latestShardInfo.toArray(ClassTag$.MODULE$.apply(ShardInfo.class));
        if (!avoidEmptyBatches() || batchId < 0 || hasShardEndAsOffset(Predef$.MODULE$.wrapRefArray(shardInfoArr)) || ShardSyncer$.MODULE$.hasNewShards(prevBatchShardInfo, Predef$.MODULE$.wrapRefArray(shardInfoArr)) || ShardSyncer$.MODULE$.hasDeletedShards(prevBatchShardInfo, Predef$.MODULE$.wrapRefArray(shardInfoArr)) || canCreateNewBatch(shardInfoArr)) {
            currentShardOffsets_$eq(new Some(new ShardOffsets(batchId + 1, this.streamName, shardInfoArr)));
        } else {
            log().info("Offsets are unchanged since `kinesis.client.avoidEmptyBatches` is enabled");
        }
        Some currentShardOffsets = currentShardOffsets();
        if (None$.MODULE$.equals(currentShardOffsets)) {
            some = None$.MODULE$;
        } else {
            if (!(currentShardOffsets instanceof Some)) {
                throw new MatchError(currentShardOffsets);
            }
            some = new Some(new KinesisSourceOffset((ShardOffsets) currentShardOffsets.value()));
        }
        return some;
    }

    public Dataset<Row> getBatch(Option<org.apache.spark.sql.execution.streaming.Offset> option, org.apache.spark.sql.execution.streaming.Offset offset) {
        long j;
        logInfo(() -> {
            return new StringBuilder(14).append("End Offset is ").append(offset.toString()).toString();
        });
        ShardOffsets shardOffsets = KinesisSourceOffset$.MODULE$.getShardOffsets(offset);
        long batchId = shardOffsets.batchId();
        if (option instanceof Some) {
            j = KinesisSourceOffset$.MODULE$.getShardOffsets((org.apache.spark.sql.execution.streaming.Offset) ((Some) option).value()).batchId();
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            j = -1;
        }
        Predef$.MODULE$.assert(j <= batchId);
        Seq seq = (Seq) ((SeqLike) shardOffsets.shardInfoMap().values().toSeq().filter(shardInfo -> {
            return BoxesRunTime.boxToBoolean($anonfun$getBatch$2(shardInfo));
        })).sortBy(shardInfo2 -> {
            return shardInfo2.shardId().toString();
        }, Ordering$String$.MODULE$);
        logInfo(() -> {
            return new StringBuilder(24).append("Processing ").append(seq.length()).append(" shards from ").append(seq).toString();
        });
        RDD map = new KinesisSourceRDD(sc(), this.sourceOptions, this.streamName, batchId, seq, this.kinesisCredsProvider, this.endPointURL, KinesisSource$.MODULE$.hadoopConf(this.sqlContext), this.metadataPath, this.failOnDataLoss).map(record -> {
            return InternalRow$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{record.getData().array(), UTF8String.fromString(this.streamName), UTF8String.fromString(record.getPartitionKey()), UTF8String.fromString(record.getSequenceNumber()), BoxesRunTime.boxToLong(DateTimeUtils$.MODULE$.fromJavaTimestamp(new Timestamp(record.getApproximateArrivalTimestamp().getTime())))}));
        }, ClassTag$.MODULE$.apply(InternalRow.class));
        if (currentShardOffsets().isEmpty()) {
            currentShardOffsets_$eq(new Some(shardOffsets));
        }
        logInfo(() -> {
            return new StringBuilder(41).append("GetBatch generating RDD of offset range: ").append(seq.mkString(", ")).toString();
        });
        return this.sqlContext.internalCreateDataFrame(map, schema(), true);
    }

    public StructType schema() {
        return KinesisReader$.MODULE$.kinesisSchema();
    }

    public synchronized void stop() {
        kinesisReader().close();
    }

    public void commit(org.apache.spark.sql.execution.streaming.Offset offset) {
        ShardOffsets shardOffsets = new ShardOffsets(-1L, this.streamName);
        long batchId = ((ShardOffsets) currentShardOffsets().getOrElse(() -> {
            return shardOffsets;
        })).batchId() - minBatchesToRetain();
        if (batchId >= 0) {
            logInfo(() -> {
                return new StringBuilder(46).append("Purging Committed Entries. ThresholdBatchId = ").append(batchId).toString();
            });
            metadataCommitter().purge(batchId);
        }
    }

    public String toString() {
        return new StringBuilder(15).append("KinesisSource[").append(this.streamName).append("]").toString();
    }

    private Seq<ShardInfo> prevBatchShardInfo(long j) {
        Seq<ShardInfo> seq;
        if (j < 0) {
            logInfo(() -> {
                return "This is the first batch. Returning Empty sequence";
            });
            seq = (Seq) Nil$.MODULE$;
        } else {
            logDebug(() -> {
                return new StringBuilder(40).append("BatchId of previously executed batch is ").append(j).toString();
            });
            Seq<ShardInfo> seq2 = metadataCommitter().get(j);
            if (seq2.isEmpty()) {
                throw new IllegalStateException(new StringBuilder(86).append("Unable to fetch ").append("committed metadata from previous batch. Some data may have been missed").toString());
            }
            seq = seq2;
        }
        Seq<ShardInfo> seq3 = seq;
        logDebug(() -> {
            return new StringBuilder(14).append("Shard Info is ").append(seq3.mkString(", ")).toString();
        });
        return seq3;
    }

    public static final /* synthetic */ void $anonfun$canCreateNewBatch$1(KinesisSource kinesisSource, AtomicBoolean atomicBoolean, ShardInfo shardInfo) {
        if (atomicBoolean.get() || !kinesisSource.hasNewData(shardInfo)) {
            return;
        }
        atomicBoolean.set(true);
    }

    public static final /* synthetic */ boolean $anonfun$hasShardEndAsOffset$1(ShardInfo shardInfo) {
        return shardInfo.iteratorType().contains(new ShardEnd().iteratorType());
    }

    public static final /* synthetic */ boolean $anonfun$getBatch$2(ShardInfo shardInfo) {
        return !shardInfo.iteratorType().contains(new ShardEnd().iteratorType());
    }

    public KinesisSource(SQLContext sQLContext, Map<String, String> map, String str, String str2, InitialKinesisPosition initialKinesisPosition, String str3, SparkAWSCredentials sparkAWSCredentials, boolean z) {
        this.sqlContext = sQLContext;
        this.sourceOptions = map;
        this.metadataPath = str;
        this.streamName = str2;
        this.initialPosition = initialKinesisPosition;
        this.endPointURL = str3;
        this.kinesisCredsProvider = sparkAWSCredentials;
        this.failOnDataLoss = z;
        Source.$init$(this);
        Logging.$init$(this);
        this.currentShardOffsets = None$.MODULE$;
        this.minBatchesToRetain = sQLContext.sparkSession().sessionState().conf().minBatchesToRetain();
        Predef$.MODULE$.require(minBatchesToRetain() > 0, () -> {
            return "minBatchesToRetain has to be positive";
        });
        this.describeShardInterval = Utils$.MODULE$.timeStringAsMs((String) map.getOrElse(KinesisSourceProvider$.MODULE$.DESCRIBE_SHARD_INTERVAL(), () -> {
            return "1s";
        }));
        Predef$.MODULE$.require(describeShardInterval() >= 0, () -> {
            return "describeShardInterval cannot be less than 0 sec";
        });
        this.latestDescribeShardTimestamp = -1L;
        this.avoidEmptyBatches = new StringOps(Predef$.MODULE$.augmentString((String) map.getOrElse("client.avoidEmptyBatches".toLowerCase(Locale.ROOT), () -> {
            return "true";
        }))).toBoolean();
        this.maxParallelThreads = new StringOps(Predef$.MODULE$.augmentString((String) map.getOrElse("client.maxParallelThreads".toLowerCase(Locale.ROOT), () -> {
            return "8";
        }))).toInt();
    }
}
