package org.apache.spark.sql.redis.stream;

import com.redislabs.provider.redis.RedisConfig;
import com.redislabs.provider.redis.RedisConfig$;
import com.redislabs.provider.redis.util.Logging;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import redis.clients.jedis.Jedis;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.TraitSetter;

/* compiled from: RedisSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u001dg\u0001B\u0001\u0003\u0001=\u00111BU3eSN\u001cv.\u001e:dK*\u00111\u0001B\u0001\u0007gR\u0014X-Y7\u000b\u0005\u00151\u0011!\u0002:fI&\u001c(BA\u0004\t\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u0013)\tQa\u001d9be.T!a\u0003\u0007\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005i\u0011aA8sO\u000e\u00011\u0003\u0002\u0001\u00111\u0001\u0002\"!\u0005\f\u000e\u0003IQ!a\u0005\u000b\u0002\t1\fgn\u001a\u0006\u0002+\u0005!!.\u0019<b\u0013\t9\"C\u0001\u0004PE*,7\r\u001e\t\u00033yi\u0011A\u0007\u0006\u00037q\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005u1\u0011!C3yK\u000e,H/[8o\u0013\ty\"D\u0001\u0004T_V\u00148-\u001a\t\u0003C-j\u0011A\t\u0006\u0003G\u0011\nA!\u001e;jY*\u0011Q!\n\u0006\u0003M\u001d\n\u0001\u0002\u001d:pm&$WM\u001d\u0006\u0003Q%\n\u0011B]3eSNd\u0017MY:\u000b\u0003)\n1aY8n\u0013\ta#EA\u0004M_\u001e<\u0017N\\4\t\u00119\u0002!\u0011!Q\u0001\n=\n!b]9m\u0007>tG/\u001a=u!\t\u0001\u0014'D\u0001\u0007\u0013\t\u0011dA\u0001\u0006T#2\u001buN\u001c;fqRD\u0001\u0002\u000e\u0001\u0003\u0002\u0003\u0006I!N\u0001\r[\u0016$\u0018\rZ1uCB\u000bG\u000f\u001b\t\u0003mqr!a\u000e\u001e\u000e\u0003aR\u0011!O\u0001\u0006g\u000e\fG.Y\u0005\u0003wa\na\u0001\u0015:fI\u00164\u0017BA\u001f?\u0005\u0019\u0019FO]5oO*\u00111\b\u000f\u0005\t\u0001\u0002\u0011\t\u0011)A\u0005\u0003\u0006\tRo]3s\t\u00164\u0017N\\3e'\u000eDW-\\1\u0011\u0007]\u0012E)\u0003\u0002Dq\t1q\n\u001d;j_:\u0004\"!\u0012%\u000e\u0003\u0019S!a\u0012\u0004\u0002\u000bQL\b/Z:\n\u0005%3%AC*ueV\u001cG\u000fV=qK\"A1\n\u0001B\u0001B\u0003%A*\u0001\u0006qCJ\fW.\u001a;feN\u0004BAN'6k%\u0011aJ\u0010\u0002\u0004\u001b\u0006\u0004\b\"\u0002)\u0001\t\u0003\t\u0016A\u0002\u001fj]&$h\bF\u0003S)V3v\u000b\u0005\u0002T\u00015\t!\u0001C\u0003/\u001f\u0002\u0007q\u0006C\u00035\u001f\u0002\u0007Q\u0007C\u0003A\u001f\u0002\u0007\u0011\tC\u0003L\u001f\u0002\u0007A\nC\u0004Z\u0001\t\u0007I\u0011\u0002.\u0002\u0005M\u001cW#A.\u0011\u0005qkV\"\u0001\u0005\n\u0005yC!\u0001D*qCJ\\7i\u001c8uKb$\bB\u00021\u0001A\u0003%1,A\u0002tG\u0002BqA\u0019\u0001C\u0002\u0013-1-A\u0006sK\u0012L7oQ8oM&<W#\u00013\u0011\u0005\u00154W\"\u0001\u0013\n\u0005\u001d$#a\u0003*fI&\u001c8i\u001c8gS\u001eDa!\u001b\u0001!\u0002\u0013!\u0017\u0001\u0004:fI&\u001c8i\u001c8gS\u001e\u0004\u0003bB6\u0001\u0005\u0004%I\u0001\\\u0001\rg>,(oY3D_:4\u0017nZ\u000b\u0002[B\u00111K\\\u0005\u0003_\n\u0011\u0011CU3eSN\u001cv.\u001e:dK\u000e{gNZ5h\u0011\u0019\t\b\u0001)A\u0005[\u0006i1o\\;sG\u0016\u001cuN\u001c4jO\u0002Bqa\u001d\u0001C\u0002\u0013%A/A\u0007dkJ\u0014XM\u001c;TG\",W.Y\u000b\u0002\t\"1a\u000f\u0001Q\u0001\n\u0011\u000babY;se\u0016tGoU2iK6\f\u0007\u0005C\u0003y\u0001\u0011\u0005\u00110A\u0003ti\u0006\u0014H\u000fF\u0001{!\t940\u0003\u0002}q\t!QK\\5u\u0011\u0015q\b\u0001\"\u0011u\u0003\u0019\u00198\r[3nC\"9\u0011\u0011\u0001\u0001\u0005B\u0005\r\u0011!C4fi>3gm]3u+\t\t)\u0001\u0005\u00038\u0005\u0006\u001d\u0001cA\r\u0002\n%\u0019\u00111\u0002\u000e\u0003\r=3gm]3u\u0011\u001d\ty\u0001\u0001C!\u0003#\t\u0001bZ3u\u0005\u0006$8\r\u001b\u000b\u0007\u0003'\t9$!\u000f\u0011\t\u0005U\u0011\u0011\u0007\b\u0005\u0003/\tiC\u0004\u0003\u0002\u001a\u0005-b\u0002BA\u000e\u0003SqA!!\b\u0002(9!\u0011qDA\u0013\u001b\t\t\tCC\u0002\u0002$9\ta\u0001\u0010:p_Rt\u0014\"A\u0007\n\u0005-a\u0011BA\u0005\u000b\u0013\t9\u0001\"C\u0002\u00020\u0019\tq\u0001]1dW\u0006<W-\u0003\u0003\u00024\u0005U\"!\u0003#bi\u00064%/Y7f\u0015\r\tyC\u0002\u0005\bq\u00065\u0001\u0019AA\u0003\u0011!\tY$!\u0004A\u0002\u0005\u001d\u0011aA3oI\"9\u0011q\b\u0001\u0005B\u0005\u0005\u0013AB2p[6LG\u000fF\u0002{\u0003\u0007B\u0001\"a\u000f\u0002>\u0001\u0007\u0011q\u0001\u0005\u0007\u0003\u000f\u0002A\u0011I=\u0002\tM$x\u000e\u001d\u0005\b\u0003\u0017\u0002A\u0011BA'\u0003y\u0011Xm]3u\u0007>t7/^7fe\u001e\u0013x.\u001e9t\u0013\u001aD\u0015m](gMN,G\u000fF\u0002{\u0003\u001fB\u0001\"!\u0015\u0002J\u0001\u0007\u00111K\u0001\r_\u001a47/\u001a;SC:<Wm\u001d\t\u0007\u0003+\ni&a\u0019\u000f\t\u0005]\u00131\f\b\u0005\u0003?\tI&C\u0001:\u0013\r\ty\u0003O\u0005\u0005\u0003?\n\tGA\u0002TKFT1!a\f9!\r\u0019\u0016QM\u0005\u0004\u0003O\u0012!A\u0006*fI&\u001c8k\\;sG\u0016|eMZ:fiJ\u000bgnZ3\t\u000f\u0005-\u0004\u0001\"\u0003\u0002n\u00051cm\u001c:FC\u000eDwJ\u001a4tKR\u0014\u0016M\\4f/&$\bn\u0015;sK\u0006l7i\u001c8oK\u000e$\u0018n\u001c8\u0015\t\u0005=\u0014Q\u0012\u000b\u0004u\u0006E\u0004\u0002CA:\u0003S\u0002\r!!\u001e\u0002\u0005=\u0004\b\u0003C\u001c\u0002x\u0005m\u00141\r>\n\u0007\u0005e\u0004HA\u0005Gk:\u001cG/[8oeA!\u0011QPAE\u001b\t\tyH\u0003\u0003\u0002\u0002\u0006\r\u0015!\u00026fI&\u001c(\u0002BAC\u0003\u000f\u000bqa\u00197jK:$8OC\u0001\u0006\u0013\u0011\tY)a \u0003\u000b)+G-[:\t\u0011\u0005E\u0013\u0011\u000ea\u0001\u0003':q!!%\u0003\u0011\u0003\t\u0019*A\u0006SK\u0012L7oU8ve\u000e,\u0007cA*\u0002\u0016\u001a1\u0011A\u0001E\u0001\u0003/\u001bB!!&\u0002\u001aB\u0019q'a'\n\u0007\u0005u\u0005H\u0001\u0004B]f\u0014VM\u001a\u0005\b!\u0006UE\u0011AAQ)\t\t\u0019\n\u0003\u0005\u0002&\u0006UE\u0011AAT\u0003=9W\r^(gMN,GOU1oO\u0016\u001cH\u0003CA*\u0003S\u000bY+!,\t\u000fa\f\u0019\u000b1\u0001\u0002\u0006!A\u00111HAR\u0001\u0004\t9\u0001\u0003\u0005\u00020\u0006\r\u0006\u0019AAY\u0003=\u0019wN\\:v[\u0016\u00148i\u001c8gS\u001e\u001c\bCBA+\u0003;\n\u0019\fE\u0002T\u0003kK1!a.\u0003\u0005M\u0011V\rZ5t\u0007>t7/^7fe\u000e{gNZ5h\u0011!\tY,!&\u0005\u0002\u0005u\u0016\u0001D:ue\u0016\fW\u000eT1ti&#G#B\u001b\u0002@\u0006\r\u0007\u0002CAa\u0003s\u0003\r!a\u001f\u0002\t\r|gN\u001c\u0005\b\u0003\u000b\fI\f1\u00016\u0003%\u0019HO]3b[.+\u0017\u0010")
/* loaded from: input_file:org/apache/spark/sql/redis/stream/RedisSource.class */
public class RedisSource implements Source, Logging {
    private final SQLContext sqlContext;
    private final SparkContext sc;
    private final RedisConfig org$apache$spark$sql$redis$stream$RedisSource$$redisConfig;
    private final RedisSourceConfig org$apache$spark$sql$redis$stream$RedisSource$$sourceConfig;
    private final StructType currentSchema;
    private transient Logger com$redislabs$provider$redis$util$Logging$$_logger;

    public static String streamLastId(Jedis jedis, String str) {
        return RedisSource$.MODULE$.streamLastId(jedis, str);
    }

    public static Seq<RedisSourceOffsetRange> getOffsetRanges(Option<Offset> option, Offset offset, Seq<RedisConsumerConfig> seq) {
        return RedisSource$.MODULE$.getOffsetRanges(option, offset, seq);
    }

    @Override // com.redislabs.provider.redis.util.Logging
    public Logger com$redislabs$provider$redis$util$Logging$$_logger() {
        return this.com$redislabs$provider$redis$util$Logging$$_logger;
    }

    @Override // com.redislabs.provider.redis.util.Logging
    @TraitSetter
    public void com$redislabs$provider$redis$util$Logging$$_logger_$eq(Logger logger) {
        this.com$redislabs$provider$redis$util$Logging$$_logger = logger;
    }

    @Override // com.redislabs.provider.redis.util.Logging
    public String loggerName() {
        return Logging.Cclass.loggerName(this);
    }

    @Override // com.redislabs.provider.redis.util.Logging
    public Logger logger() {
        return Logging.Cclass.logger(this);
    }

    @Override // com.redislabs.provider.redis.util.Logging
    public void logInfo(Function0<String> function0) {
        Logging.Cclass.logInfo(this, function0);
    }

    @Override // com.redislabs.provider.redis.util.Logging
    public void logDebug(Function0<String> function0) {
        Logging.Cclass.logDebug(this, function0);
    }

    @Override // com.redislabs.provider.redis.util.Logging
    public void logTrace(Function0<String> function0) {
        Logging.Cclass.logTrace(this, function0);
    }

    private SparkContext sc() {
        return this.sc;
    }

    public RedisConfig org$apache$spark$sql$redis$stream$RedisSource$$redisConfig() {
        return this.org$apache$spark$sql$redis$stream$RedisSource$$redisConfig;
    }

    public RedisSourceConfig org$apache$spark$sql$redis$stream$RedisSource$$sourceConfig() {
        return this.org$apache$spark$sql$redis$stream$RedisSource$$sourceConfig;
    }

    private StructType currentSchema() {
        return this.currentSchema;
    }

    public void start() {
        org$apache$spark$sql$redis$stream$RedisSource$$sourceConfig().consumerConfigs().foreach(new RedisSource$$anonfun$start$1(this));
    }

    public StructType schema() {
        return currentSchema();
    }

    public Option<Offset> getOffset() {
        RedisSourceOffset redisSourceOffset = (RedisSourceOffset) org$apache$spark$sql$redis$stream$RedisSource$$sourceConfig().consumerConfigs().foldLeft(new RedisSourceOffset((Map) Predef$.MODULE$.Map().apply(Nil$.MODULE$)), new RedisSource$$anonfun$6(this));
        return redisSourceOffset.offsets().isEmpty() ? None$.MODULE$ : new Some(redisSourceOffset);
    }

    public Dataset<Row> getBatch(Option<Offset> option, Offset offset) {
        logInfo(new RedisSource$$anonfun$getBatch$1(this, option, offset));
        StructType currentSchema = currentSchema();
        Seq<RedisSourceOffsetRange> offsetRanges = RedisSource$.MODULE$.getOffsetRanges(option, offset, org$apache$spark$sql$redis$stream$RedisSource$$sourceConfig().consumerConfigs());
        resetConsumerGroupsIfHasOffset(offsetRanges);
        return this.sqlContext.internalCreateDataFrame(new RedisSourceRdd(sc(), org$apache$spark$sql$redis$stream$RedisSource$$redisConfig(), offsetRanges, RedisSourceRdd$.MODULE$.$lessinit$greater$default$4()).map(new RedisSource$$anonfun$7(this, currentSchema), ClassTag$.MODULE$.apply(InternalRow.class)), schema(), true);
    }

    public void commit(Offset offset) {
        logInfo(new RedisSource$$anonfun$commit$1(this, offset));
    }

    public void stop() {
    }

    private void resetConsumerGroupsIfHasOffset(Seq<RedisSourceOffsetRange> seq) {
        forEachOffsetRangeWithStreamConnection(seq, new RedisSource$$anonfun$resetConsumerGroupsIfHasOffset$1(this));
    }

    private void forEachOffsetRangeWithStreamConnection(Seq<RedisSourceOffsetRange> seq, Function2<Jedis, RedisSourceOffsetRange, BoxedUnit> function2) {
        seq.groupBy((Function1<RedisSourceOffsetRange, K>) new RedisSource$$anonfun$forEachOffsetRangeWithStreamConnection$1(this)).foreach(new RedisSource$$anonfun$forEachOffsetRangeWithStreamConnection$2(this, function2));
    }

    public RedisSource(SQLContext sQLContext, String str, Option<StructType> option, Map<String, String> map) {
        this.sqlContext = sQLContext;
        Source.class.$init$(this);
        Logging.Cclass.$init$(this);
        this.sc = sQLContext.sparkContext();
        this.org$apache$spark$sql$redis$stream$RedisSource$$redisConfig = RedisConfig$.MODULE$.fromSparkConf(sc().getConf());
        this.org$apache$spark$sql$redis$stream$RedisSource$$sourceConfig = RedisSourceConfig$.MODULE$.fromMap(map);
        this.currentSchema = (StructType) option.getOrElse(new RedisSource$$anonfun$1(this));
    }
}
