package akka.persistence.jdbc.query.scaladsl;

import akka.NotUsed;
import akka.actor.Cancellable;
import akka.actor.ExtendedActorSystem;
import akka.persistence.PersistentRepr;
import akka.persistence.jdbc.config.ReadJournalConfig;
import akka.persistence.jdbc.dao.ReadJournalDao;
import akka.persistence.jdbc.dao.bytea.journal.JournalTables;
import akka.persistence.jdbc.util.SlickDatabase$;
import akka.persistence.jdbc.util.SlickDriver$;
import akka.persistence.query.EventEnvelope;
import akka.persistence.query.scaladsl.AllPersistenceIdsQuery;
import akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery;
import akka.persistence.query.scaladsl.CurrentEventsByTagQuery;
import akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery;
import akka.persistence.query.scaladsl.EventWriter;
import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery;
import akka.persistence.query.scaladsl.EventsByTagQuery;
import akka.serialization.Serialization;
import akka.serialization.SerializationExtension$;
import akka.stream.ActorMaterializer$;
import akka.stream.Materializer;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import com.typesafe.config.Config;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import java.util.Map;
import scala.MatchError;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Iterable;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Seq$;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.compat.java8.runtime.LambdaDeserializer$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.package;
import scala.math.Ordering$Long$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.sys.package$;
import scala.util.Failure;
import scala.util.Success;
import slick.driver.JdbcProfile;
import slick.jdbc.JdbcBackend;

/* compiled from: JdbcReadJournal.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005=x!B\u0001\u0003\u0011\u0003i\u0011a\u0004&eE\u000e\u0014V-\u00193K_V\u0014h.\u00197\u000b\u0005\r!\u0011\u0001C:dC2\fGm\u001d7\u000b\u0005\u00151\u0011!B9vKJL(BA\u0004\t\u0003\u0011QGMY2\u000b\u0005%Q\u0011a\u00039feNL7\u000f^3oG\u0016T\u0011aC\u0001\u0005C.\\\u0017m\u0001\u0001\u0011\u00059yQ\"\u0001\u0002\u0007\u000bA\u0011\u0001\u0012A\t\u0003\u001f)#'m\u0019*fC\u0012Tu.\u001e:oC2\u001c\"a\u0004\n\u0011\u0005M1R\"\u0001\u000b\u000b\u0003U\tQa]2bY\u0006L!a\u0006\u000b\u0003\r\u0005s\u0017PU3g\u0011\u0015Ir\u0002\"\u0001\u001b\u0003\u0019a\u0014N\\5u}Q\tQ\u0002C\u0004\u001d\u001f\t\u0007IQA\u000f\u0002\u0015%#WM\u001c;jM&,'/F\u0001\u001f\u001f\u0005y\u0012%\u0001\u0011\u0002#)$'mY\u0017sK\u0006$WF[8ve:\fG\u000e\u0003\u0004#\u001f\u0001\u0006iAH\u0001\f\u0013\u0012,g\u000e^5gS\u0016\u0014\bE\u0002\u0003\u0011\u0005\u0001!3CC\u0012\u0013K-r\u0013\u0007N\u001c;{A\u0011a%K\u0007\u0002O)\u00111\u0001\u000b\u0006\u0003\u000b!I!AK\u0014\u0003\u0017I+\u0017\r\u001a&pkJt\u0017\r\u001c\t\u0003M1J!!L\u0014\u00035\r+(O]3oiB+'o]5ti\u0016t7-Z%egF+XM]=\u0011\u0005\u0019z\u0013B\u0001\u0019(\u0005Y\tE\u000e\u001c)feNL7\u000f^3oG\u0016LEm])vKJL\bC\u0001\u00143\u0013\t\u0019tEA\u0011DkJ\u0014XM\u001c;Fm\u0016tGo\u001d\"z!\u0016\u00148/[:uK:\u001cW-\u00133Rk\u0016\u0014\u0018\u0010\u0005\u0002'k%\u0011ag\n\u0002\u001b\u000bZ,g\u000e^:CsB+'o]5ti\u0016t7-Z%e#V,'/\u001f\t\u0003MaJ!!O\u0014\u0003/\r+(O]3oi\u00163XM\u001c;t\u0005f$\u0016mZ)vKJL\bC\u0001\u0014<\u0013\tatEA\u0006Fm\u0016tGo\u0016:ji\u0016\u0014\bC\u0001\u0014?\u0013\tytE\u0001\tFm\u0016tGo\u001d\"z)\u0006<\u0017+^3ss\"A\u0011i\tB\u0001B\u0003%!)\u0001\u0004d_:4\u0017n\u001a\t\u0003\u0007&k\u0011\u0001\u0012\u0006\u0003\u0003\u0016S!AR$\u0002\u0011QL\b/Z:bM\u0016T\u0011\u0001S\u0001\u0004G>l\u0017B\u0001&E\u0005\u0019\u0019uN\u001c4jO\"AAj\tBC\u0002\u0013\rQ*\u0001\u0004tsN$X-\\\u000b\u0002\u001dB\u0011qJU\u0007\u0002!*\u0011\u0011KC\u0001\u0006C\u000e$xN]\u0005\u0003'B\u00131#\u0012=uK:$W\rZ!di>\u00148+_:uK6D\u0001\"V\u0012\u0003\u0002\u0003\u0006IAT\u0001\bgf\u001cH/Z7!\u0011\u0015I2\u0005\"\u0001X)\tA6\f\u0006\u0002Z5B\u0011ab\t\u0005\u0006\u0019Z\u0003\u001dA\u0014\u0005\u0006\u0003Z\u0003\rA\u0011\u0005\b;\u000e\u0012\r\u0011b\u0001_\u0003\t)7-F\u0001`!\t\u00017-D\u0001b\u0015\t\u0011G#\u0001\u0006d_:\u001cWO\u001d:f]RL!\u0001Z1\u0003!\u0015CXmY;uS>t7i\u001c8uKb$\bB\u00024$A\u0003%q,A\u0002fG\u0002Bq\u0001[\u0012C\u0002\u0013\r\u0011.A\u0002nCR,\u0012A\u001b\t\u0003W:l\u0011\u0001\u001c\u0006\u0003[*\taa\u001d;sK\u0006l\u0017BA8m\u00051i\u0015\r^3sS\u0006d\u0017N_3s\u0011\u0019\t8\u0005)A\u0005U\u0006!Q.\u0019;!\u0011\u001d\u00198E1A\u0005\u0002Q\f\u0011C]3bI*{WO\u001d8bY\u000e{gNZ5h+\u0005)\bC\u0001<y\u001b\u00059(BA!\u0007\u0013\tIxOA\tSK\u0006$'j\\;s]\u0006d7i\u001c8gS\u001eDaa_\u0012!\u0002\u0013)\u0018A\u0005:fC\u0012Tu.\u001e:oC2\u001cuN\u001c4jO\u0002Bq!`\u0012C\u0002\u0013\u0005a0\u0001\u0002eEV\tq\u0010\u0005\u0003\u0002\u0002\u0005ma\u0002BA\u0002\u0003+qA!!\u0002\u0002\u00129!\u0011qAA\u0007\u001b\t\tIAC\u0002\u0002\f1\ta\u0001\u0010:p_Rt\u0014BAA\b\u0003\u0015\u0019H.[2l\u0013\r9\u00111\u0003\u0006\u0003\u0003\u001fIA!a\u0006\u0002\u001a\u0005Y!\n\u001a2d\u0005\u0006\u001c7.\u001a8e\u0015\r9\u00111C\u0005\u0005\u0003;\tyB\u0001\u0005ECR\f'-Y:f\u0013\u0011\t\t#!\u0007\u0003\u0017)#'m\u0019\"bG.,g\u000e\u001a\u0005\b\u0003K\u0019\u0003\u0015!\u0003��\u0003\r!'\r\t\u0005\n\u0003S\u0019#\u0019!C\u0001\u0003W\taB]3bI*{WO\u001d8bY\u0012\u000bw.\u0006\u0002\u0002.A!\u0011qFA\u001b\u001b\t\t\tDC\u0002\u00024\u0019\t1\u0001Z1p\u0013\u0011\t9$!\r\u0003\u001dI+\u0017\r\u001a&pkJt\u0017\r\u001c#b_\"A\u00111H\u0012!\u0002\u0013\ti#A\bsK\u0006$'j\\;s]\u0006dG)Y8!\u0011%\tyd\tb\u0001\n\u0013\t\t%A\u0006eK2\f\u0017pU8ve\u000e,WCAA\"!!\t)%!\u0013\u0002N\u0005MSBAA$\u0015\t\u0019A.\u0003\u0003\u0002L\u0005\u001d#AB*pkJ\u001cW\rE\u0002\u0014\u0003\u001fJ1!!\u0015\u0015\u0005\rIe\u000e\u001e\t\u0004\u001f\u0006U\u0013bAA,!\nY1)\u00198dK2d\u0017M\u00197f\u0011!\tYf\tQ\u0001\n\u0005\r\u0013\u0001\u00043fY\u0006L8k\\;sG\u0016\u0004\u0003bBA0G\u0011\u0005\u0013\u0011M\u0001\u0016GV\u0014(/\u001a8u!\u0016\u00148/[:uK:\u001cW-\u00133t)\t\t\u0019\u0007\u0005\u0005\u0002F\u0005%\u0013QMA:!\u0011\t9'!\u001c\u000f\u0007M\tI'C\u0002\u0002lQ\ta\u0001\u0015:fI\u00164\u0017\u0002BA8\u0003c\u0012aa\u0015;sS:<'bAA6)A!\u0011QOA<\u001b\u0005Q\u0011bAA=\u0015\t9aj\u001c;Vg\u0016$\u0007bBA?G\u0011\u0005\u0013\u0011M\u0001\u0012C2d\u0007+\u001a:tSN$XM\\2f\u0013\u0012\u001c\bbBAAG\u0011\u0005\u00131Q\u0001\u001dGV\u0014(/\u001a8u\u000bZ,g\u000e^:CsB+'o]5ti\u0016t7-Z%e)!\t))a$\u0002\u0014\u0006u\u0005\u0003CA#\u0003\u0013\n9)a\u001d\u0011\t\u0005%\u00151R\u0007\u0002Q%\u0019\u0011Q\u0012\u0015\u0003\u001b\u00153XM\u001c;F]Z,Gn\u001c9f\u0011!\t\t*a A\u0002\u0005\u0015\u0014!\u00049feNL7\u000f^3oG\u0016LE\r\u0003\u0005\u0002\u0016\u0006}\u0004\u0019AAL\u000391'o\\7TKF,XM\\2f\u001dJ\u00042aEAM\u0013\r\tY\n\u0006\u0002\u0005\u0019>tw\r\u0003\u0005\u0002 \u0006}\u0004\u0019AAL\u00031!xnU3rk\u0016t7-\u001a(s\u0011\u001d\t\u0019k\tC!\u0003K\u000bQ#\u001a<f]R\u001c()\u001f)feNL7\u000f^3oG\u0016LE\r\u0006\u0005\u0002\u0006\u0006\u001d\u0016\u0011VAV\u0011!\t\t*!)A\u0002\u0005\u0015\u0004\u0002CAK\u0003C\u0003\r!a&\t\u0011\u0005}\u0015\u0011\u0015a\u0001\u0003/Cq!a,$\t\u0003\n\t,\u0001\ndkJ\u0014XM\u001c;Fm\u0016tGo\u001d\"z)\u0006<GCBAC\u0003g\u000b9\f\u0003\u0005\u00026\u00065\u0006\u0019AA3\u0003\r!\u0018m\u001a\u0005\t\u0003s\u000bi\u000b1\u0001\u0002\u0018\u00061qN\u001a4tKRDq!!0$\t\u0003\ny,A\u0006fm\u0016tGo\u001d\"z)\u0006<GCBAC\u0003\u0003\f\u0019\r\u0003\u0005\u00026\u0006m\u0006\u0019AA3\u0011!\tI,a/A\u0002\u0005]\u0005bBAdG\u0011\u0005\u0013\u0011Z\u0001\fKZ,g\u000e^,sSR,'/\u0006\u0002\u0002LBQ\u0011QIAg\u0003#\f\t.a\u001d\n\t\u0005=\u0017q\t\u0002\u0005\r2|w\u000f\u0005\u0003\u0002T\u0006%h\u0002BAk\u0003KtA!a6\u0002d:!\u0011\u0011\\Aq\u001d\u0011\tY.a8\u000f\t\u0005\u001d\u0011Q\\\u0005\u0002\u0017%\u0011\u0011BC\u0005\u0003\u000b!I!a\u0001\u0015\n\u0007\u0005\u001dx%A\u0006Fm\u0016tGo\u0016:ji\u0016\u0014\u0018\u0002BAv\u0003[\u0014!b\u0016:ji\u0016,e/\u001a8u\u0015\r\t9o\n")
/* loaded from: input_file:akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.class */
public class JdbcReadJournal implements CurrentPersistenceIdsQuery, AllPersistenceIdsQuery, CurrentEventsByPersistenceIdQuery, EventsByPersistenceIdQuery, CurrentEventsByTagQuery, EventWriter, EventsByTagQuery {
    private final ExtendedActorSystem system;
    private final ExecutionContext ec;
    private final Materializer mat;
    private final ReadJournalConfig readJournalConfig;
    private final JdbcBackend.DatabaseDef db;
    private final ReadJournalDao readJournalDao;
    private final Source<Object, Cancellable> delaySource;
    private static /* synthetic */ Map $deserializeLambdaCache$;

    public static String Identifier() {
        return JdbcReadJournal$.MODULE$.Identifier();
    }

    public ExtendedActorSystem system() {
        return this.system;
    }

    public ExecutionContext ec() {
        return this.ec;
    }

    public Materializer mat() {
        return this.mat;
    }

    public ReadJournalConfig readJournalConfig() {
        return this.readJournalConfig;
    }

    public JdbcBackend.DatabaseDef db() {
        return this.db;
    }

    public ReadJournalDao readJournalDao() {
        return this.readJournalDao;
    }

    private Source<Object, Cancellable> delaySource() {
        return this.delaySource;
    }

    public Source<String, NotUsed> currentPersistenceIds() {
        return readJournalDao().allPersistenceIdsSource(Long.MAX_VALUE);
    }

    public Source<String, NotUsed> allPersistenceIds() {
        return Source$.MODULE$.repeat(BoxesRunTime.boxToInteger(0)).flatMapConcat(obj -> {
            return akka$persistence$jdbc$query$scaladsl$JdbcReadJournal$$$anonfun$2(BoxesRunTime.unboxToInt(obj));
        }).statefulMapConcat(() -> {
            ObjectRef create = ObjectRef.create(Set$.MODULE$.empty());
            return str -> {
                return next$1(str, create);
            };
        });
    }

    public Source<EventEnvelope, NotUsed> currentEventsByPersistenceId(String str, long j, long j2) {
        return readJournalDao().messages(str, j, j2, Long.MAX_VALUE).mapAsync(1, r3 -> {
            return Future$.MODULE$.fromTry(r3);
        }).map(persistentRepr -> {
            return new EventEnvelope(persistentRepr.sequenceNr(), persistentRepr.persistenceId(), persistentRepr.sequenceNr(), persistentRepr.payload());
        });
    }

    public Source<EventEnvelope, NotUsed> eventsByPersistenceId(String str, long j, long j2) {
        return Source$.MODULE$.unfoldAsync(BoxesRunTime.boxToLong(Math.max(1L, j)), obj -> {
            return akka$persistence$jdbc$query$scaladsl$JdbcReadJournal$$$anonfun$8(str, j2, BoxesRunTime.unboxToLong(obj));
        }).mapConcat(seq -> {
            return (Seq) Predef$.MODULE$.identity(seq);
        });
    }

    public Source<EventEnvelope, NotUsed> currentEventsByTag(String str, long j) {
        return readJournalDao().eventsByTag(str, j, Long.MAX_VALUE).mapAsync(1, r3 -> {
            return Future$.MODULE$.fromTry(r3);
        }).map(tuple3 -> {
            if (tuple3 == null) {
                throw new MatchError(tuple3);
            }
            PersistentRepr persistentRepr = (PersistentRepr) tuple3._1();
            return new EventEnvelope(((JournalTables.JournalRow) tuple3._3()).ordering(), persistentRepr.persistenceId(), persistentRepr.sequenceNr(), persistentRepr.payload());
        });
    }

    public Source<EventEnvelope, NotUsed> eventsByTag(String str, long j) {
        return Source$.MODULE$.unfoldAsync(BoxesRunTime.boxToLong(j), obj -> {
            return akka$persistence$jdbc$query$scaladsl$JdbcReadJournal$$$anonfun$15(str, BoxesRunTime.unboxToLong(obj));
        }).mapConcat(seq -> {
            return (Seq) Predef$.MODULE$.identity(seq);
        });
    }

    public Flow<EventWriter.WriteEvent, EventWriter.WriteEvent, NotUsed> eventWriter() {
        return Flow$.MODULE$.apply().via(readJournalDao().writeEvents());
    }

    public final /* synthetic */ Source akka$persistence$jdbc$query$scaladsl$JdbcReadJournal$$$anonfun$3(int i) {
        return currentPersistenceIds();
    }

    public final /* synthetic */ Source akka$persistence$jdbc$query$scaladsl$JdbcReadJournal$$$anonfun$2(int i) {
        return delaySource().flatMapConcat(obj -> {
            return akka$persistence$jdbc$query$scaladsl$JdbcReadJournal$$$anonfun$3(BoxesRunTime.unboxToInt(obj));
        });
    }

    private final Iterable next$1(String str, ObjectRef objectRef) {
        Set diff = Set$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{str})).diff((Set) objectRef.elem);
        objectRef.elem = ((Set) objectRef.elem).$plus(str);
        return diff;
    }

    private final long nextFromSeqNr$1(Seq seq, long j) {
        return !seq.isEmpty() ? BoxesRunTime.unboxToLong(((TraversableOnce) seq.map(eventEnvelope -> {
            return BoxesRunTime.boxToLong(eventEnvelope.sequenceNr());
        }, Seq$.MODULE$.canBuildFrom())).max(Ordering$Long$.MODULE$)) + 1 : j;
    }

    public final /* synthetic */ Source akka$persistence$jdbc$query$scaladsl$JdbcReadJournal$$$anonfun$10(String str, long j, long j2, int i) {
        return currentEventsByPersistenceId(str, j2, j).take(readJournalConfig().maxBufferSize());
    }

    public final /* synthetic */ Future akka$persistence$jdbc$query$scaladsl$JdbcReadJournal$$$anonfun$8(String str, long j, long j2) {
        return ((Future) delaySource().flatMapConcat(obj -> {
            return akka$persistence$jdbc$query$scaladsl$JdbcReadJournal$$$anonfun$10(str, j, j2, BoxesRunTime.unboxToInt(obj));
        }).runWith(Sink$.MODULE$.seq(), mat())).map(seq -> {
            return new Some(new Tuple2(BoxesRunTime.boxToLong(nextFromSeqNr$1(seq, j2)), seq));
        }, ec());
    }

    private final long nextFromOffset$1(Seq seq, long j) {
        return !seq.isEmpty() ? BoxesRunTime.unboxToLong(((TraversableOnce) seq.map(eventEnvelope -> {
            return BoxesRunTime.boxToLong(eventEnvelope.offset());
        }, Seq$.MODULE$.canBuildFrom())).max(Ordering$Long$.MODULE$)) + 1 : j;
    }

    public final /* synthetic */ Source akka$persistence$jdbc$query$scaladsl$JdbcReadJournal$$$anonfun$17(String str, long j, int i) {
        return currentEventsByTag(str, j).take(readJournalConfig().maxBufferSize());
    }

    public final /* synthetic */ Future akka$persistence$jdbc$query$scaladsl$JdbcReadJournal$$$anonfun$15(String str, long j) {
        return ((Future) delaySource().flatMapConcat(obj -> {
            return akka$persistence$jdbc$query$scaladsl$JdbcReadJournal$$$anonfun$17(str, j, BoxesRunTime.unboxToInt(obj));
        }).runWith(Sink$.MODULE$.seq(), mat())).map(seq -> {
            return new Some(new Tuple2(BoxesRunTime.boxToLong(nextFromOffset$1(seq, j)), seq));
        }, ec());
    }

    public JdbcReadJournal(Config config, ExtendedActorSystem extendedActorSystem) {
        this.system = extendedActorSystem;
        this.ec = extendedActorSystem.dispatcher();
        this.mat = ActorMaterializer$.MODULE$.apply(ActorMaterializer$.MODULE$.apply$default$1(), ActorMaterializer$.MODULE$.apply$default$2(), extendedActorSystem);
        this.readJournalConfig = new ReadJournalConfig(config);
        this.db = SlickDatabase$.MODULE$.forConfig(config, readJournalConfig().slickConfiguration());
        package$.MODULE$.addShutdownHook(() -> {
            db().close();
        });
        Success createInstanceFor = extendedActorSystem.dynamicAccess().createInstanceFor(readJournalConfig().pluginConfig().dao(), Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2(JdbcBackend.DatabaseDef.class, db()), new Tuple2(JdbcProfile.class, SlickDriver$.MODULE$.forDriverName(config)), new Tuple2(ReadJournalConfig.class, readJournalConfig()), new Tuple2(Serialization.class, SerializationExtension$.MODULE$.apply(extendedActorSystem)), new Tuple2(ExecutionContext.class, ec()), new Tuple2(Materializer.class, mat())})), ClassTag$.MODULE$.apply(ReadJournalDao.class));
        if (createInstanceFor instanceof Success) {
            this.readJournalDao = (ReadJournalDao) createInstanceFor.value();
            this.delaySource = Source$.MODULE$.tick(readJournalConfig().refreshInterval(), new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(0)).seconds(), BoxesRunTime.boxToInteger(0)).take(1L);
        } else {
            if (!(createInstanceFor instanceof Failure)) {
                throw new MatchError(createInstanceFor);
            }
            throw ((Failure) createInstanceFor).exception();
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        Map map = $deserializeLambdaCache$;
        if (map == null) {
            map = new HashMap();
            $deserializeLambdaCache$ = map;
        }
        return LambdaDeserializer$.MODULE$.deserializeLambda(MethodHandles.lookup(), map, serializedLambda);
    }
}
