package org.apache.spark.sql.redis.stream;

import com.redislabs.provider.redis.RedisConfig;
import com.redislabs.provider.redis.util.ConnectionUtils$;
import com.redislabs.provider.redis.util.Logging;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import scala.Function0;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.math.Ordering$;
import scala.math.Ordering$Implicits$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: RedisStreamReader.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\rb\u0001B\u0004\t\u0001UA\u0001\u0002\f\u0001\u0003\u0002\u0003\u0006I!\f\u0005\u0006c\u0001!\tA\r\u0005\u0006m\u0001!\ta\u000e\u0005\u00069\u0002!I!\u0018\u0005\b\u0003\u0003\u0001A\u0011BA\u0002\u0011\u001d\t)\u0002\u0001C\u0005\u0003/\u0011\u0011CU3eSN\u001cFO]3b[J+\u0017\rZ3s\u0015\tI!\"\u0001\u0004tiJ,\u0017-\u001c\u0006\u0003\u00171\tQA]3eSNT!!\u0004\b\u0002\u0007M\fHN\u0003\u0002\u0010!\u0005)1\u000f]1sW*\u0011\u0011CE\u0001\u0007CB\f7\r[3\u000b\u0003M\t1a\u001c:h\u0007\u0001\u0019B\u0001\u0001\f\u001dSA\u0011qCG\u0007\u00021)\t\u0011$A\u0003tG\u0006d\u0017-\u0003\u0002\u001c1\t1\u0011I\\=SK\u001a\u0004\"!H\u0014\u000e\u0003yQ!a\b\u0011\u0002\tU$\u0018\u000e\u001c\u0006\u0003\u0017\u0005R!AI\u0012\u0002\u0011A\u0014xN^5eKJT!\u0001J\u0013\u0002\u0013I,G-[:mC\n\u001c(\"\u0001\u0014\u0002\u0007\r|W.\u0003\u0002)=\t9Aj\\4hS:<\u0007CA\f+\u0013\tY\u0003D\u0001\u0007TKJL\u0017\r\\5{C\ndW-A\u0006sK\u0012L7oQ8oM&<\u0007C\u0001\u00180\u001b\u0005\u0001\u0013B\u0001\u0019!\u0005-\u0011V\rZ5t\u0007>tg-[4\u0002\rqJg.\u001b;?)\t\u0019T\u0007\u0005\u00025\u00015\t\u0001\u0002C\u0003-\u0005\u0001\u0007Q&A\nv]J,\u0017\rZ*ue\u0016\fW.\u00128ue&,7\u000f\u0006\u00029/B\u0019\u0011(\u0011#\u000f\u0005izdBA\u001e?\u001b\u0005a$BA\u001f\u0015\u0003\u0019a$o\\8u}%\t\u0011$\u0003\u0002A1\u00059\u0001/Y2lC\u001e,\u0017B\u0001\"D\u0005!IE/\u001a:bi>\u0014(B\u0001!\u0019!\t)EK\u0004\u0002G%:\u0011q)\u0015\b\u0003\u0011Bs!!S(\u000f\u0005)seBA&N\u001d\tYD*C\u0001\u0014\u0013\t\t\"#\u0003\u0002\u0010!%\u0011QBD\u0005\u0003\u00171I!!\u0003\u0006\n\u0005MC\u0011\u0001\u0005*fI&\u001c8k\\;sG\u0016$\u0016\u0010]3t\u0013\t)fKA\u0006TiJ,\u0017-\\#oiJL(BA*\t\u0011\u0015A6\u00011\u0001Z\u0003-ygMZ:fiJ\u000bgnZ3\u0011\u0005QR\u0016BA.\t\u0005Y\u0011V\rZ5t'>,(oY3PM\u001a\u001cX\r\u001e*b]\u001e,\u0017A\u0006:fC\u0012\u001cFO]3b[\u0016sGO]=CCR\u001c\u0007.Z:\u0015\u0007y\u000b'\r\u0005\u0002F?&\u0011\u0001M\u0016\u0002\u0013'R\u0014X-Y7F]R\u0014\u0018PQ1uG\",7\u000fC\u0003Y\t\u0001\u0007\u0011\fC\u0003d\t\u0001\u0007A-\u0001\tti\u0006\u0014H/\u00128uef|eMZ:fiB!Q\r\\8x\u001d\t1'.D\u0001h\u0015\ty\u0002NC\u0001j\u0003\u0011Q\u0017M^1\n\u0005-<\u0017aA'ba&\u0011QN\u001c\u0002\u0006\u000b:$(/\u001f\u0006\u0003W\u001e\u0004\"\u0001\u001d;\u000f\u0005E\u0014\bCA\u001e\u0019\u0013\t\u0019\b$\u0001\u0004Qe\u0016$WMZ\u0005\u0003kZ\u0014aa\u0015;sS:<'BA:\u0019!\tAh0D\u0001z\u0015\tQ80A\u0003kK\u0012L7O\u0003\u0002}{\u000691\r\\5f]R\u001c(\"A\u0006\n\u0005}L(!D*ue\u0016\fW.\u00128uefLE)A\ngS2$XM]*ue\u0016\fW.\u00128ue&,7\u000f\u0006\u0003\u0002\u0006\u0005MAc\u0001\u001d\u0002\b!A\u0011\u0011B\u0003\u0005\u0002\u0004\tY!\u0001\u0007tiJ,\u0017-\\$s_V\u00048\u000fE\u0003\u0018\u0003\u001b\t\t\"C\u0002\u0002\u0010a\u0011\u0001\u0002\u00102z]\u0006lWM\u0010\t\u0004s\u0005s\u0006\"\u0002-\u0006\u0001\u0004I\u0016\u0001\u00064mCR$XM\\*ue\u0016\fW.\u00128ue&,7\u000fF\u00029\u00033Aq!a\u0007\u0007\u0001\u0004\ti\"A\u0003f]R\u0014\u0018\u0010E\u0002F\u0003?I1!!\tW\u0005A\u0019FO]3b[\u0016sGO]=CCR\u001c\u0007\u000e")
/* loaded from: input_file:org/apache/spark/sql/redis/stream/RedisStreamReader.class */
public class RedisStreamReader implements Logging, Serializable {
    private final RedisConfig redisConfig;
    private transient Logger com$redislabs$provider$redis$util$Logging$$_logger;

    @Override // com.redislabs.provider.redis.util.Logging
    public String loggerName() {
        String loggerName;
        loggerName = loggerName();
        return loggerName;
    }

    @Override // com.redislabs.provider.redis.util.Logging
    public Logger logger() {
        Logger logger;
        logger = logger();
        return logger;
    }

    @Override // com.redislabs.provider.redis.util.Logging
    public void logInfo(Function0<String> function0) {
        logInfo(function0);
    }

    @Override // com.redislabs.provider.redis.util.Logging
    public void logDebug(Function0<String> function0) {
        logDebug(function0);
    }

    @Override // com.redislabs.provider.redis.util.Logging
    public void logTrace(Function0<String> function0) {
        logTrace(function0);
    }

    @Override // com.redislabs.provider.redis.util.Logging
    public Logger com$redislabs$provider$redis$util$Logging$$_logger() {
        return this.com$redislabs$provider$redis$util$Logging$$_logger;
    }

    @Override // com.redislabs.provider.redis.util.Logging
    public void com$redislabs$provider$redis$util$Logging$$_logger_$eq(Logger logger) {
        this.com$redislabs$provider$redis$util$Logging$$_logger = logger;
    }

    public Iterator<Tuple2<StreamEntryID, Map<String, String>>> unreadStreamEntries(RedisSourceOffsetRange redisSourceOffsetRange) {
        RedisConsumerConfig config = redisSourceOffsetRange.config();
        logInfo(() -> {
            return new StringBuilder(39).append("Reading entries ").append("[").append(config.streamKey()).append(", ").append(config.groupName()).append(", ").append(config.consumerName()).append(", start=").append(redisSourceOffsetRange.start()).append(" ").append("end=").append(redisSourceOffsetRange.end()).append("]... ").toString();
        });
        return filterStreamEntries(redisSourceOffsetRange, () -> {
            AbstractMap.SimpleEntry simpleEntry = new AbstractMap.SimpleEntry(config.streamKey(), StreamEntryID.UNRECEIVED_ENTRY);
            return package$.MODULE$.Iterator().continually(() -> {
                return this.readStreamEntryBatches(redisSourceOffsetRange, simpleEntry);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<Map.Entry<String, List<StreamEntry>>> readStreamEntryBatches(RedisSourceOffsetRange redisSourceOffsetRange, Map.Entry<String, StreamEntryID> entry) {
        RedisConsumerConfig config = redisSourceOffsetRange.config();
        return (List) ConnectionUtils$.MODULE$.withConnection(this.redisConfig.connectionForKey(config.streamKey()), jedis -> {
            List xreadGroup = jedis.xreadGroup(config.groupName(), config.consumerName(), config.batchSize(), config.block(), true, new Map.Entry[]{entry});
            this.logDebug(() -> {
                return new StringBuilder(13).append("Got entries: ").append(xreadGroup).toString();
            });
            return xreadGroup;
        });
    }

    private Iterator<Tuple2<StreamEntryID, Map<String, String>>> filterStreamEntries(RedisSourceOffsetRange redisSourceOffsetRange, Function0<Iterator<List<Map.Entry<String, List<StreamEntry>>>>> function0) {
        StreamEntryID streamEntryID = new StreamEntryID(redisSourceOffsetRange.end());
        return ((Iterator) function0.apply()).takeWhile(list -> {
            return BoxesRunTime.boxToBoolean($anonfun$filterStreamEntries$1(list));
        }).flatMap(list2 -> {
            return ((IterableLike) JavaConverters$.MODULE$.asScalaBufferConverter(list2).asScala()).iterator();
        }).flatMap(entry -> {
            return this.flattenStreamEntries(entry);
        }).takeWhile(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$filterStreamEntries$4(streamEntryID, tuple2));
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Iterator<Tuple2<StreamEntryID, Map<String, String>>> flattenStreamEntries(Map.Entry<String, List<StreamEntry>> entry) {
        return ((IterableLike) JavaConverters$.MODULE$.asScalaBufferConverter(entry.getValue()).asScala()).iterator().map(streamEntry -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(streamEntry.getID()), streamEntry.getFields());
        });
    }

    public static final /* synthetic */ boolean $anonfun$filterStreamEntries$1(List list) {
        return (list == null || list.isEmpty()) ? false : true;
    }

    public static final /* synthetic */ boolean $anonfun$filterStreamEntries$4(StreamEntryID streamEntryID, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        return Ordering$Implicits$.MODULE$.infixOrderingOps((StreamEntryID) tuple2._1(), Ordering$.MODULE$.ordered(Predef$.MODULE$.$conforms())).$less$eq(streamEntryID);
    }

    public RedisStreamReader(RedisConfig redisConfig) {
        this.redisConfig = redisConfig;
        Logging.$init$(this);
    }
}
