package org.apache.spark.sql.execution.streaming;

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.UnaryNode;
import org.apache.spark.sql.catalyst.streaming.WriteToStream;
import org.apache.spark.sql.catalyst.trees.TreePattern$;
import org.apache.spark.sql.catalyst.trees.TreePatternBits;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
import org.apache.spark.sql.connector.read.streaming.SparkDataStream;
import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl;
import org.apache.spark.sql.connector.read.streaming.SupportsTriggerAvailableNow;
import org.apache.spark.sql.errors.QueryExecutionErrors$;
import org.apache.spark.sql.execution.SQLExecution$;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$;
import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec;
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource;
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource$;
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSourceV1;
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSourceV1$;
import org.apache.spark.sql.internal.SQLConf$;
import org.apache.spark.sql.streaming.StreamingQueryStatus;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.util.Clock;
import org.apache.spark.util.Utils$;
import scala.$less$colon$less$;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.MapOps;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Map$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.NonLocalReturnControl;
import scala.runtime.Statics;

/* compiled from: MicroBatchExecution.scala */
@ScalaSignature(bytes = "\u0006\u0005\tUc\u0001\u0002\u00192\u0001yB\u0011B\u0012\u0001\u0003\u0002\u0003\u0006IaR&\t\u00131\u0003!\u0011!Q\u0001\n5\u0013\u0006\"C*\u0001\u0005\u0003\u0005\u000b\u0011\u0002+[\u0011!Y\u0006A!A!\u0002\u0013a\u0006\u0002\u00037\u0001\u0005\u0003\u0005\u000b\u0011B7\t\u000bQ\u0004A\u0011A;\t\u0011q\u0004!\u0019!C\tkuDq!a\u0001\u0001A\u0003%a\u0010C\u0005\u0002\u0006\u0001\u0001\r\u0011\"\u0005\u0002\b!I\u0011Q\u0006\u0001A\u0002\u0013E\u0011q\u0006\u0005\t\u0003{\u0001\u0001\u0015)\u0003\u0002\n!a\u0011q\t\u0001A\u0002\u0003\u0007I\u0011C\u001b\u0002J!a\u0011\u0011\u000b\u0001A\u0002\u0003\u0007I\u0011C\u001b\u0002T!Y\u0011q\u000b\u0001A\u0002\u0003\u0005\u000b\u0015BA&\u0011\u001d\tY\u0006\u0001C\t\u0003;B1\"a\u0018\u0001\u0001\u0004\u0005\r\u0011\"\u0005\u0002b!Y\u0011\u0011\u000e\u0001A\u0002\u0003\u0007I\u0011CA6\u0011-\ty\u0007\u0001a\u0001\u0002\u0003\u0006K!a\u0019\t\u0015\u0005E\u0004\u0001#b\u0001\n\u0003\n\u0019\bC\u0005\u0002\u0006\u0002\u0001\r\u0011\"\u0003\u0002\b\"I\u0011q\u0012\u0001A\u0002\u0013%\u0011\u0011\u0013\u0005\t\u0003+\u0003\u0001\u0015)\u0003\u0002\n\"9\u0011q\u0013\u0001\u0005B\u0005e\u0005\"CAN\u0001\t\u0007I\u0011BAO\u0011!\t)\u000b\u0001Q\u0001\n\u0005}\u0005bBAT\u0001\u0011\u0005\u0013\u0011\u0014\u0005\b\u0003S\u0003A\u0011KAM\u0011\u001d\tY\u000b\u0001C\t\u0003[Cq!a-\u0001\t\u0003\t)\fC\u0004\u0002N\u0002!I!a4\t\u000f\u0005U\u0007\u0001\"\u0003\u0002\b\"9\u0011q\u001b\u0001\u0005\n\u0005e\u0007bBAs\u0001\u0011%\u0011q\u001d\u0005\b\u0003[\u0004A\u0011CAx\u0011\u001d\t)\u0010\u0001C\u0005\u0003oDq!!@\u0001\t#\tI\nC\u0004\u0002��\u0002!\t\"!'\t\u000f\t\u0005\u0001\u0001\"\u0005\u0002\u001a\"9!1\u0001\u0001\u0005\u0012\u0005e\u0005\u0002\u0003B\u0003\u0001\u0011\u0005QGa\u0002\t\u001b\t-\u0002\u0001%A\u0002\u0002\u0003%IA!\fL\u00115\u0011y\u0003\u0001I\u0001\u0004\u0003\u0005I\u0011\u0002B\u00195\u001e9!1G\u0019\t\u0002\tUbA\u0002\u00192\u0011\u0003\u00119\u0004\u0003\u0004uY\u0011\u0005!q\b\u0005\n\u0005\u0003b#\u0019!C\u0001\u0005\u0007B\u0001Ba\u0015-A\u0003%!Q\t\u0002\u0014\u001b&\u001c'o\u001c\"bi\u000eDW\t_3dkRLwN\u001c\u0006\u0003eM\n\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005Q*\u0014!C3yK\u000e,H/[8o\u0015\t1t'A\u0002tc2T!\u0001O\u001d\u0002\u000bM\u0004\u0018M]6\u000b\u0005iZ\u0014AB1qC\u000eDWMC\u0001=\u0003\ry'oZ\u0002\u0001'\r\u0001qh\u0011\t\u0003\u0001\u0006k\u0011!M\u0005\u0003\u0005F\u0012qb\u0015;sK\u0006lW\t_3dkRLwN\u001c\t\u0003\u0001\u0012K!!R\u0019\u0003\u001b\u0005\u001b\u0018P\\2M_\u001e\u0004VO]4f\u00031\u0019\b/\u0019:l'\u0016\u001c8/[8o!\tA\u0015*D\u00016\u0013\tQUG\u0001\u0007Ta\u0006\u00148nU3tg&|g.\u0003\u0002G\u0003\u00069AO]5hO\u0016\u0014\bC\u0001(Q\u001b\u0005y%B\u0001\u001a6\u0013\t\tvJA\u0004Ue&<w-\u001a:\n\u00051\u000b\u0015\u0001\u0004;sS\u001e<WM]\"m_\u000e\\\u0007CA+Y\u001b\u00051&BA,8\u0003\u0011)H/\u001b7\n\u0005e3&!B\"m_\u000e\\\u0017BA*B\u00031)\u0007\u0010\u001e:b\u001fB$\u0018n\u001c8t!\u0011if-[5\u000f\u0005y#\u0007CA0c\u001b\u0005\u0001'BA1>\u0003\u0019a$o\\8u})\t1-A\u0003tG\u0006d\u0017-\u0003\u0002fE\u00061\u0001K]3eK\u001aL!a\u001a5\u0003\u00075\u000b\u0007O\u0003\u0002fEB\u0011QL[\u0005\u0003W\"\u0014aa\u0015;sS:<\u0017\u0001\u00029mC:\u0004\"A\u001c:\u000e\u0003=T!A\r9\u000b\u0005E,\u0014\u0001C2bi\u0006d\u0017p\u001d;\n\u0005M|'!D,sSR,Gk\\*ue\u0016\fW.\u0001\u0004=S:LGO\u0010\u000b\u0007m^D\u0018P_>\u0011\u0005\u0001\u0003\u0001\"\u0002$\u0007\u0001\u00049\u0005\"\u0002'\u0007\u0001\u0004i\u0005\"B*\u0007\u0001\u0004!\u0006\"B.\u0007\u0001\u0004a\u0006\"\u00027\u0007\u0001\u0004i\u0017!D3se>\u0014hj\u001c;jM&,'/F\u0001\u007f!\t\u0001u0C\u0002\u0002\u0002E\u0012Q\"\u0012:s_Jtu\u000e^5gS\u0016\u0014\u0018AD3se>\u0014hj\u001c;jM&,'\u000fI\u0001\bg>,(oY3t+\t\tI\u0001\u0005\u0004\u0002\f\u0005U\u00111\u0004\b\u0005\u0003\u001b\t\tBD\u0002`\u0003\u001fI\u0011aY\u0005\u0004\u0003'\u0011\u0017a\u00029bG.\fw-Z\u0005\u0005\u0003/\tIBA\u0002TKFT1!a\u0005c!\u0011\ti\"!\u000b\u000e\u0005\u0005}!b\u0001\u001a\u0002\")!\u00111EA\u0013\u0003\u0011\u0011X-\u00193\u000b\u0007\u0005\u001dR'A\u0005d_:tWm\u0019;pe&!\u00111FA\u0010\u0005=\u0019\u0006/\u0019:l\t\u0006$\u0018m\u0015;sK\u0006l\u0017aC:pkJ\u001cWm]0%KF$B!!\r\u0002:A!\u00111GA\u001b\u001b\u0005\u0011\u0017bAA\u001cE\n!QK\\5u\u0011%\tYDCA\u0001\u0002\u0004\tI!A\u0002yIE\n\u0001b]8ve\u000e,7\u000f\t\u0015\u0004\u0017\u0005\u0005\u0003\u0003BA\u001a\u0003\u0007J1!!\u0012c\u0005!1x\u000e\\1uS2,\u0017a\u0004;sS\u001e<WM]#yK\u000e,Ho\u001c:\u0016\u0005\u0005-\u0003c\u0001!\u0002N%\u0019\u0011qJ\u0019\u0003\u001fQ\u0013\u0018nZ4fe\u0016CXmY;u_J\f1\u0003\u001e:jO\u001e,'/\u0012=fGV$xN]0%KF$B!!\r\u0002V!I\u00111H\u0007\u0002\u0002\u0003\u0007\u00111J\u0001\u0011iJLwmZ3s\u000bb,7-\u001e;pe\u0002B3ADA!\u0003)9W\r\u001e+sS\u001e<WM\u001d\u000b\u0003\u0003\u0017\n\u0001c^1uKJl\u0017M]6Ue\u0006\u001c7.\u001a:\u0016\u0005\u0005\r\u0004c\u0001!\u0002f%\u0019\u0011qM\u0019\u0003!]\u000bG/\u001a:nCJ\\GK]1dW\u0016\u0014\u0018\u0001F<bi\u0016\u0014X.\u0019:l)J\f7m[3s?\u0012*\u0017\u000f\u0006\u0003\u00022\u00055\u0004\"CA\u001e#\u0005\u0005\t\u0019AA2\u0003E9\u0018\r^3s[\u0006\u00148\u000e\u0016:bG.,'\u000fI\u0001\fY><\u0017nY1m!2\fg.\u0006\u0002\u0002vA!\u0011qOAA\u001b\t\tIH\u0003\u0003\u0002|\u0005u\u0014a\u00027pO&\u001c\u0017\r\u001c\u0006\u0004\u0003\u007f\u0002\u0018!\u00029mC:\u001c\u0018\u0002BAB\u0003s\u00121\u0002T8hS\u000e\fG\u000e\u00157b]\u0006I\u0012n]\"veJ,g\u000e\u001e\"bi\u000eD7i\u001c8tiJ,8\r^3e+\t\tI\t\u0005\u0003\u00024\u0005-\u0015bAAGE\n9!i\\8mK\u0006t\u0017!H5t\u0007V\u0014(/\u001a8u\u0005\u0006$8\r[\"p]N$(/^2uK\u0012|F%Z9\u0015\t\u0005E\u00121\u0013\u0005\n\u0003w)\u0012\u0011!a\u0001\u0003\u0013\u000b!$[:DkJ\u0014XM\u001c;CCR\u001c\u0007nQ8ogR\u0014Xo\u0019;fI\u0002\nAa\u001d;paR\u0011\u0011\u0011G\u0001\u0014o\u0006$XM]7be.\u0004&o\u001c9bO\u0006$xN]\u000b\u0003\u0003?\u00032\u0001QAQ\u0013\r\t\u0019+\r\u0002\u0014/\u0006$XM]7be.\u0004&o\u001c9bO\u0006$xN]\u0001\u0015o\u0006$XM]7be.\u0004&o\u001c9bO\u0006$xN\u001d\u0011\u0002\u000f\rdW-\u00198va\u0006a1\u000f^1siR\u0013\u0018nZ4fe\u0006\u0011\"/\u001e8BGRLg/\u0019;fIN#(/Z1n)\u0011\t\t$a,\t\r\u0005EF\u00041\u0001H\u0003U\u0019\b/\u0019:l'\u0016\u001c8/[8o\r>\u00148\u000b\u001e:fC6\f\u0011E^1mS\u0012\fG/Z(gMN,G\u000fT8h\u0003:$w)\u001a;Qe\u00164xJ\u001a4tKR$B!a.\u0002DB1\u00111GA]\u0003{K1!a/c\u0005\u0019y\u0005\u000f^5p]B\u0019\u0001)a0\n\u0007\u0005\u0005\u0017GA\u0005PM\u001a\u001cX\r^*fc\"9\u0011QY\u000fA\u0002\u0005\u001d\u0017!\u00047bi\u0016\u001cHOQ1uG\"LE\r\u0005\u0003\u00024\u0005%\u0017bAAfE\n!Aj\u001c8h\u0003Q\u0001x\u000e];mCR,7\u000b^1si>3gm]3ugR!\u0011\u0011GAi\u0011\u0019\t\u0019N\ba\u0001\u000f\u0006A2\u000f]1sWN+7o]5p]R{'+\u001e8CCR\u001c\u0007.Z:\u0002%%\u001ch*Z<ECR\f\u0017I^1jY\u0006\u0014G.Z\u0001\u000fO\u0016$8\u000b^1si>3gm]3u)\u0011\tY.!9\u0011\t\u0005u\u0011Q\\\u0005\u0005\u0003?\fyB\u0001\u0004PM\u001a\u001cX\r\u001e\u0005\b\u0003G\u0004\u0003\u0019AA\u000e\u0003)!\u0017\r^1TiJ,\u0017-\\\u0001\u0013G>t7\u000f\u001e:vGRtU\r\u001f;CCR\u001c\u0007\u000e\u0006\u0003\u0002\n\u0006%\bbBAvC\u0001\u0007\u0011\u0011R\u0001\u0015]>$\u0015\r^1CCR\u001c\u0007.Z:F]\u0006\u0014G.\u001a3\u0002\u001b\r|W.\\5u'>,(oY3t)\u0011\t\t$!=\t\u000f\u0005M(\u00051\u0001\u0002>\u0006IqN\u001a4tKR\u001cV-]\u0001\teVt')\u0019;dQR!\u0011\u0011GA}\u0011\u0019\tYp\ta\u0001\u000f\u000612\u000f]1sWN+7o]5p]R{'+\u001e8CCR\u001c\u0007.A\nnCJ\\W*[2s_\n\u000bGo\u00195Ti\u0006\u0014H/\u0001\u000fnCJ\\W*[2s_\n\u000bGo\u00195Fq\u0016\u001cW\u000f^5p]N#\u0018M\u001d;\u0002#5\f'o['jGJ|')\u0019;dQ\u0016sG-A\u000fdY\u0016\fg.\u00169MCN$X\t_3dkR,G-T5de>\u0014\u0015\r^2i\u0003I9\u0018\u000e\u001e5Qe><'/Z:t\u0019>\u001c7.\u001a3\u0016\t\t%!q\u0002\u000b\u0005\u0005\u0017\u0011\t\u0003\u0005\u0003\u0003\u000e\t=A\u0002\u0001\u0003\b\u0005#A#\u0019\u0001B\n\u0005\u0005!\u0016\u0003\u0002B\u000b\u00057\u0001B!a\r\u0003\u0018%\u0019!\u0011\u00042\u0003\u000f9{G\u000f[5oOB!\u00111\u0007B\u000f\u0013\r\u0011yB\u0019\u0002\u0004\u0003:L\b\u0002\u0003B\u0012Q\u0011\u0005\rA!\n\u0002\u0003\u0019\u0004b!a\r\u0003(\t-\u0011b\u0001B\u0015E\nAAHY=oC6,g(\u0001\ntkB,'\u000fJ:qCJ\\7+Z:tS>tW#A$\u0002%M,\b/\u001a:%iJLwmZ3s\u00072|7m[\u000b\u0002)\u0006\u0019R*[2s_\n\u000bGo\u00195Fq\u0016\u001cW\u000f^5p]B\u0011\u0001\tL\n\u0004Y\te\u0002\u0003BA\u001a\u0005wI1A!\u0010c\u0005\u0019\te.\u001f*fMR\u0011!QG\u0001\r\u0005\u0006#6\tS0J\t~[U)W\u000b\u0003\u0005\u000b\u0002BAa\u0012\u0003R5\u0011!\u0011\n\u0006\u0005\u0005\u0017\u0012i%\u0001\u0003mC:<'B\u0001B(\u0003\u0011Q\u0017M^1\n\u0007-\u0014I%A\u0007C\u0003R\u001b\u0005jX%E?.+\u0015\f\t")
/* loaded from: input_file:org/apache/spark/sql/execution/streaming/MicroBatchExecution.class */
public class MicroBatchExecution extends StreamExecution implements AsyncLogPurge {
    private LogicalPlan logicalPlan;
    private final Map<String, String> extraOptions;
    private final WriteToStream plan;
    private final ErrorNotifier errorNotifier;
    private volatile Seq<SparkDataStream> sources;
    private volatile TriggerExecutor triggerExecutor;
    private WatermarkTracker watermarkTracker;
    private boolean isCurrentBatchConstructed;
    private final WatermarkPropagator watermarkPropagator;
    private ThreadPoolExecutor org$apache$spark$sql$execution$streaming$AsyncLogPurge$$asyncPurgeExecutorService;
    private AtomicBoolean org$apache$spark$sql$execution$streaming$AsyncLogPurge$$purgeRunning;
    private boolean useAsyncPurge;
    private volatile byte bitmap$0;

    public static String BATCH_ID_KEY() {
        return MicroBatchExecution$.MODULE$.BATCH_ID_KEY();
    }

    @Override // org.apache.spark.sql.execution.streaming.AsyncLogPurge
    public void purgeAsync() {
        purgeAsync();
    }

    @Override // org.apache.spark.sql.execution.streaming.AsyncLogPurge
    public void asyncLogPurgeShutdown() {
        asyncLogPurgeShutdown();
    }

    @Override // org.apache.spark.sql.execution.streaming.AsyncLogPurge
    public boolean arePendingAsyncPurge() {
        boolean arePendingAsyncPurge;
        arePendingAsyncPurge = arePendingAsyncPurge();
        return arePendingAsyncPurge;
    }

    @Override // org.apache.spark.sql.execution.streaming.AsyncLogPurge
    public ThreadPoolExecutor org$apache$spark$sql$execution$streaming$AsyncLogPurge$$asyncPurgeExecutorService() {
        return this.org$apache$spark$sql$execution$streaming$AsyncLogPurge$$asyncPurgeExecutorService;
    }

    @Override // org.apache.spark.sql.execution.streaming.AsyncLogPurge
    public AtomicBoolean org$apache$spark$sql$execution$streaming$AsyncLogPurge$$purgeRunning() {
        return this.org$apache$spark$sql$execution$streaming$AsyncLogPurge$$purgeRunning;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.spark.sql.execution.streaming.MicroBatchExecution] */
    private boolean useAsyncPurge$lzycompute() {
        boolean useAsyncPurge;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                useAsyncPurge = useAsyncPurge();
                this.useAsyncPurge = useAsyncPurge;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.useAsyncPurge;
    }

    @Override // org.apache.spark.sql.execution.streaming.AsyncLogPurge
    public boolean useAsyncPurge() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? useAsyncPurge$lzycompute() : this.useAsyncPurge;
    }

    @Override // org.apache.spark.sql.execution.streaming.AsyncLogPurge
    public final void org$apache$spark$sql$execution$streaming$AsyncLogPurge$_setter_$org$apache$spark$sql$execution$streaming$AsyncLogPurge$$asyncPurgeExecutorService_$eq(ThreadPoolExecutor threadPoolExecutor) {
        this.org$apache$spark$sql$execution$streaming$AsyncLogPurge$$asyncPurgeExecutorService = threadPoolExecutor;
    }

    @Override // org.apache.spark.sql.execution.streaming.AsyncLogPurge
    public final void org$apache$spark$sql$execution$streaming$AsyncLogPurge$_setter_$org$apache$spark$sql$execution$streaming$AsyncLogPurge$$purgeRunning_$eq(AtomicBoolean atomicBoolean) {
        this.org$apache$spark$sql$execution$streaming$AsyncLogPurge$$purgeRunning = atomicBoolean;
    }

    public /* synthetic */ SparkSession org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession() {
        return super.sparkSession();
    }

    private /* synthetic */ Clock super$triggerClock() {
        return super.triggerClock();
    }

    @Override // org.apache.spark.sql.execution.streaming.AsyncLogPurge
    public ErrorNotifier errorNotifier() {
        return this.errorNotifier;
    }

    @Override // org.apache.spark.sql.execution.streaming.ProgressReporter
    public Seq<SparkDataStream> sources() {
        return this.sources;
    }

    public void sources_$eq(Seq<SparkDataStream> seq) {
        this.sources = seq;
    }

    public TriggerExecutor triggerExecutor() {
        return this.triggerExecutor;
    }

    public void triggerExecutor_$eq(TriggerExecutor triggerExecutor) {
        this.triggerExecutor = triggerExecutor;
    }

    public TriggerExecutor getTrigger() {
        Predef$.MODULE$.assert(sources().nonEmpty(), () -> {
            return "sources should have been retrieved from the plan!";
        });
        ProcessingTimeTrigger trigger = super.trigger();
        if (trigger instanceof ProcessingTimeTrigger) {
            return new ProcessingTimeExecutor(trigger, super.triggerClock());
        }
        if (OneTimeTrigger$.MODULE$.equals(trigger)) {
            return new SingleBatchExecutor();
        }
        if (!AvailableNowTrigger$.MODULE$.equals(trigger)) {
            throw new IllegalStateException("Unknown type of trigger: " + super.trigger());
        }
        if (!BoxesRunTime.unboxToBoolean(super.sparkSession().sqlContext().conf().getConf(SQLConf$.MODULE$.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED()))) {
            return ((IterableOnceOps) sources().distinct()).forall(sparkDataStream -> {
                return BoxesRunTime.boxToBoolean($anonfun$getTrigger$3(this, sparkDataStream));
            }) ? new MultiBatchExecutor() : new SingleBatchExecutor();
        }
        logInfo(() -> {
            return "Configured to use the wrapper of Trigger.AvailableNow for query " + this.prettyIdString() + ".";
        });
        return new MultiBatchExecutor();
    }

    public WatermarkTracker watermarkTracker() {
        return this.watermarkTracker;
    }

    public void watermarkTracker_$eq(WatermarkTracker watermarkTracker) {
        this.watermarkTracker = watermarkTracker;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private LogicalPlan logicalPlan$lzycompute() {
        LogicalPlan writeToMicroBatchDataSourceV1;
        synchronized (this) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                Predef$.MODULE$.assert(queryExecutionThread() == Thread.currentThread(), () -> {
                    return "logicalPlan must be initialized in QueryExecutionThread but the current thread was " + Thread.currentThread();
                });
                LongRef create = LongRef.create(0L);
                LogicalPlan transform = analyzedPlan().transform(new MicroBatchExecution$$anonfun$1(this, (scala.collection.mutable.Map) Map$.MODULE$.apply(Nil$.MODULE$), create, Utils$.MODULE$.stringToSeq(super.sparkSession().sqlContext().conf().disabledV2StreamingMicroBatchReaders()), (scala.collection.mutable.Map) Map$.MODULE$.apply(Nil$.MODULE$), (scala.collection.mutable.Map) Map$.MODULE$.apply(Nil$.MODULE$)));
                sources_$eq(transform.collect(new MicroBatchExecution$$anonfun$logicalPlan$lzycompute$1(null)));
                triggerExecutor_$eq(getTrigger());
                TriggerExecutor triggerExecutor = triggerExecutor();
                uniqueSources_$eq(triggerExecutor instanceof SingleBatchExecutor ? ((IterableOnceOps) ((IterableOps) sources().distinct()).map(sparkDataStream -> {
                    if (!(sparkDataStream instanceof SupportsAdmissionControl)) {
                        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(sparkDataStream), ReadLimit.allAvailable());
                    }
                    SupportsAdmissionControl supportsAdmissionControl = (SupportsAdmissionControl) sparkDataStream;
                    ReadLimit defaultReadLimit = supportsAdmissionControl.getDefaultReadLimit();
                    ReadLimit allAvailable = ReadLimit.allAvailable();
                    if (defaultReadLimit != null ? !defaultReadLimit.equals(allAvailable) : allAvailable != null) {
                        this.logWarning(() -> {
                            return "The read limit " + defaultReadLimit + " for " + supportsAdmissionControl + " is ignored when Trigger.Once is used.";
                        });
                    }
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(supportsAdmissionControl), ReadLimit.allAvailable());
                })).toMap($less$colon$less$.MODULE$.refl()) : triggerExecutor instanceof MultiBatchExecutor ? ((IterableOnceOps) ((IterableOps) ((IterableOps) sources().distinct()).map(sparkDataStream2 -> {
                    if (sparkDataStream2 instanceof SupportsTriggerAvailableNow) {
                        return (SupportsTriggerAvailableNow) sparkDataStream2;
                    }
                    if (sparkDataStream2 instanceof Source) {
                        return new AvailableNowSourceWrapper((Source) sparkDataStream2);
                    }
                    if (sparkDataStream2 instanceof MicroBatchStream) {
                        return new AvailableNowMicroBatchStreamWrapper((MicroBatchStream) sparkDataStream2);
                    }
                    throw new MatchError(sparkDataStream2);
                })).map(supportsTriggerAvailableNow -> {
                    supportsTriggerAvailableNow.prepareForTriggerAvailableNow();
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(supportsTriggerAvailableNow), supportsTriggerAvailableNow.getDefaultReadLimit());
                })).toMap($less$colon$less$.MODULE$.refl()) : ((IterableOnceOps) ((IterableOps) sources().distinct()).map(sparkDataStream3 -> {
                    if (!(sparkDataStream3 instanceof SupportsAdmissionControl)) {
                        return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(sparkDataStream3), ReadLimit.allAvailable());
                    }
                    SupportsAdmissionControl supportsAdmissionControl = (SupportsAdmissionControl) sparkDataStream3;
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(supportsAdmissionControl), supportsAdmissionControl.getDefaultReadLimit());
                })).toMap($less$colon$less$.MODULE$.refl()));
                SupportsWrite sink = sink();
                if (sink instanceof SupportsWrite) {
                    SupportsWrite supportsWrite = sink;
                    writeToMicroBatchDataSourceV1 = new WriteToMicroBatchDataSource(this.plan.catalogAndIdent().map(tuple2 -> {
                        if (tuple2 == null) {
                            throw new MatchError(tuple2);
                        }
                        return DataSourceV2Relation$.MODULE$.create(supportsWrite, new Some((TableCatalog) tuple2._1()), new Some((Identifier) tuple2._2()));
                    }), supportsWrite, transform, id().toString(), this.extraOptions, outputMode(), WriteToMicroBatchDataSource$.MODULE$.apply$default$7());
                } else {
                    if (!(sink instanceof Sink)) {
                        throw new IllegalArgumentException("unknown sink type for " + sink());
                    }
                    writeToMicroBatchDataSourceV1 = new WriteToMicroBatchDataSourceV1(this.plan.catalogTable(), (Sink) sink, transform, id().toString(), this.extraOptions, outputMode(), WriteToMicroBatchDataSourceV1$.MODULE$.apply$default$7());
                }
                this.logicalPlan = writeToMicroBatchDataSourceV1;
                this.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.logicalPlan;
    }

    @Override // org.apache.spark.sql.execution.streaming.StreamExecution, org.apache.spark.sql.execution.streaming.ProgressReporter
    public LogicalPlan logicalPlan() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? logicalPlan$lzycompute() : this.logicalPlan;
    }

    private boolean isCurrentBatchConstructed() {
        return this.isCurrentBatchConstructed;
    }

    private void isCurrentBatchConstructed_$eq(boolean z) {
        this.isCurrentBatchConstructed = z;
    }

    @Override // org.apache.spark.sql.streaming.StreamingQuery
    public void stop() {
        state().set(TERMINATED$.MODULE$);
        if (queryExecutionThread().isAlive()) {
            super.sparkSession().sparkContext().cancelJobGroup(runId().toString());
            interruptAndAwaitExecutionThreadTermination();
            super.sparkSession().sparkContext().cancelJobGroup(runId().toString());
        }
        logInfo(() -> {
            return "Query " + this.prettyIdString() + " was stopped";
        });
    }

    private WatermarkPropagator watermarkPropagator() {
        return this.watermarkPropagator;
    }

    @Override // org.apache.spark.sql.execution.streaming.StreamExecution
    public void cleanup() {
        super.cleanup();
        asyncLogPurgeShutdown();
        logInfo(() -> {
            return "Async log purge executor pool for query " + this.prettyIdString() + " has been shutdown";
        });
    }

    @Override // org.apache.spark.sql.execution.streaming.StreamExecution, org.apache.spark.sql.execution.streaming.ProgressReporter
    public void startTrigger() {
        startTrigger();
        StreamingQueryStatus currentStatus = currentStatus();
        currentStatus_$eq(currentStatus.copy(currentStatus.copy$default$1(), currentStatus.copy$default$2(), true));
    }

    @Override // org.apache.spark.sql.execution.streaming.StreamExecution
    public void runActivatedStream(SparkSession sparkSession) {
        boolean streamingNoDataMicroBatchesEnabled = sparkSession.sessionState().conf().streamingNoDataMicroBatchesEnabled();
        triggerExecutor().execute(() -> {
            if (this.isActive()) {
                this.errorNotifier().throwErrorIfExists();
                BooleanRef create = BooleanRef.create(false);
                this.startTrigger();
                this.reportTimeTaken("triggerExecution", () -> {
                    if (this.currentBatchId() < 0) {
                        AcceptsLatestSeenOffsetHandler$.MODULE$.setLatestSeenOffsetOnSources(this.offsetLog().getLatest().map(tuple2 -> {
                            return (OffsetSeq) tuple2._2();
                        }), this.sources());
                        this.populateStartOffsets(sparkSession);
                        this.logInfo(() -> {
                            return "Stream started from " + this.committedOffsets();
                        });
                    }
                    this.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession().sparkContext().setJobDescription(this.getBatchDescriptionString());
                    if (!this.isCurrentBatchConstructed()) {
                        this.isCurrentBatchConstructed_$eq(this.constructNextBatch(streamingNoDataMicroBatchesEnabled));
                    }
                    this.recordTriggerOffsets(this.committedOffsets(), this.availableOffsets(), this.latestOffsets());
                    create.elem = this.isNewDataAvailable();
                    StreamingQueryStatus currentStatus = this.currentStatus();
                    this.currentStatus_$eq(currentStatus.copy(currentStatus.copy$default$1(), this.isNewDataAvailable(), currentStatus.copy$default$3()));
                    if (!this.isCurrentBatchConstructed()) {
                        this.updateStatusMessage("Waiting for data to arrive");
                        return;
                    }
                    if (create.elem) {
                        this.updateStatusMessage("Processing new data");
                    } else {
                        this.updateStatusMessage("No new data but cleaning up state");
                    }
                    this.runBatch(sparkSession);
                });
                this.finishTrigger(create.elem, this.isCurrentBatchConstructed());
                this.withProgressLocked(() -> {
                    this.awaitProgressLockCondition().signalAll();
                });
                if (this.isCurrentBatchConstructed()) {
                    this.currentBatchId_$eq(this.currentBatchId() + 1);
                    this.isCurrentBatchConstructed_$eq(false);
                } else if (this.triggerExecutor() instanceof MultiBatchExecutor) {
                    this.logInfo(() -> {
                        return "Finished processing all available data for the trigger, terminating this Trigger.AvailableNow query";
                    });
                    this.state().set(TERMINATED$.MODULE$);
                } else {
                    Thread.sleep(this.pollingDelayMs());
                }
            }
            this.updateStatusMessage("Waiting for next trigger");
            return this.isActive();
        });
    }

    public Option<OffsetSeq> validateOffsetLogAndGetPrevOffset(long j) {
        return j != 0 ? new Some(offsetLog().get(j - 1).getOrElse(() -> {
            this.logError(() -> {
                long j2 = j - 1;
                return "The offset log for batch " + j2 + " doesn't exist, which is required to restart the query from the latest batch " + j2 + " from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.";
            });
            throw new IllegalStateException("batch " + (j - 1) + " doesn't exist");
        })) : None$.MODULE$;
    }

    private void populateStartOffsets(SparkSession sparkSession) {
        Tuple2 tuple2;
        Tuple2 tuple22;
        sinkCommitProgress_$eq(None$.MODULE$);
        Some latest = offsetLog().getLatest();
        if (!(latest instanceof Some) || (tuple2 = (Tuple2) latest.value()) == null) {
            if (!None$.MODULE$.equals(latest)) {
                throw new MatchError(latest);
            }
            logInfo(() -> {
                return "Starting new streaming query.";
            });
            currentBatchId_$eq(0L);
            watermarkTracker_$eq(WatermarkTracker$.MODULE$.apply(sparkSession.conf()));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        long _1$mcJ$sp = tuple2._1$mcJ$sp();
        OffsetSeq offsetSeq = (OffsetSeq) tuple2._2();
        currentBatchId_$eq(_1$mcJ$sp);
        isCurrentBatchConstructed_$eq(true);
        availableOffsets_$eq(offsetSeq.toStreamProgress(sources()));
        validateOffsetLogAndGetPrevOffset(_1$mcJ$sp).foreach(offsetSeq2 -> {
            $anonfun$populateStartOffsets$1(this, offsetSeq2);
            return BoxedUnit.UNIT;
        });
        offsetSeq.metadata().foreach(offsetSeqMetadata -> {
            $anonfun$populateStartOffsets$2(this, sparkSession, offsetSeqMetadata);
            return BoxedUnit.UNIT;
        });
        Some latest2 = commitLog().getLatest();
        if ((latest2 instanceof Some) && (tuple22 = (Tuple2) latest2.value()) != null) {
            long _1$mcJ$sp2 = tuple22._1$mcJ$sp();
            CommitMetadata commitMetadata = (CommitMetadata) tuple22._2();
            if (_1$mcJ$sp == _1$mcJ$sp2) {
                availableOffsets().foreach(tuple23 -> {
                    if (tuple23 != null) {
                        SparkDataStream sparkDataStream = (SparkDataStream) tuple23._1();
                        org.apache.spark.sql.connector.read.streaming.Offset offset = (org.apache.spark.sql.connector.read.streaming.Offset) tuple23._2();
                        if (sparkDataStream instanceof Source) {
                            Source source = (Source) sparkDataStream;
                            if (offset instanceof Offset) {
                                return source.getBatch(this.committedOffsets().get((SparkDataStream) source).map(offset2 -> {
                                    return (Offset) offset2;
                                }), (Offset) offset);
                            }
                        }
                    }
                    return BoxedUnit.UNIT;
                });
                currentBatchId_$eq(_1$mcJ$sp2 + 1);
                isCurrentBatchConstructed_$eq(false);
                committedOffsets_$eq(committedOffsets().m1615$plus$plus((IterableOnce<Tuple2<SparkDataStream, org.apache.spark.sql.connector.read.streaming.Offset>>) availableOffsets()));
                watermarkTracker().setWatermark(package$.MODULE$.max(watermarkTracker().currentWatermark(), commitMetadata.nextBatchWatermarkMs()));
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            } else if (_1$mcJ$sp2 == _1$mcJ$sp - 1) {
                availableOffsets().foreach(tuple24 -> {
                    if (tuple24 != null) {
                        SparkDataStream sparkDataStream = (SparkDataStream) tuple24._1();
                        org.apache.spark.sql.connector.read.streaming.Offset offset = (org.apache.spark.sql.connector.read.streaming.Offset) tuple24._2();
                        if (sparkDataStream instanceof Source) {
                            Source source = (Source) sparkDataStream;
                            if (offset instanceof Offset) {
                                Offset offset2 = (Offset) offset;
                                Option<Offset> map = this.committedOffsets().get((SparkDataStream) source).map(offset3 -> {
                                    return (Offset) offset3;
                                });
                                return BoxesRunTime.unboxToBoolean(map.map(offset4 -> {
                                    return BoxesRunTime.boxToBoolean($anonfun$populateStartOffsets$7(offset2, offset4));
                                }).getOrElse(() -> {
                                    return true;
                                })) ? source.getBatch(map, offset2) : BoxedUnit.UNIT;
                            }
                        }
                    }
                    return BoxedUnit.UNIT;
                });
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            } else if (_1$mcJ$sp2 < _1$mcJ$sp - 1) {
                logWarning(() -> {
                    return "Batch completion log latest batch id is " + _1$mcJ$sp2 + ", which is not trailing batchid " + _1$mcJ$sp2 + " by one";
                });
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
            }
        } else {
            if (!None$.MODULE$.equals(latest2)) {
                throw new MatchError(latest2);
            }
            logInfo(() -> {
                return "no commit log present";
            });
            BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
        }
        logInfo(() -> {
            long currentBatchId = this.currentBatchId();
            StreamProgress committedOffsets = this.committedOffsets();
            this.availableOffsets();
            return "Resuming at batch " + currentBatchId + " with committed offsets " + currentBatchId + " and available offsets " + committedOffsets;
        });
        BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
    }

    private boolean isNewDataAvailable() {
        return availableOffsets().exists(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$isNewDataAvailable$1(this, tuple2));
        });
    }

    private org.apache.spark.sql.connector.read.streaming.Offset getStartOffset(SparkDataStream sparkDataStream) {
        Option<org.apache.spark.sql.connector.read.streaming.Offset> option = availableOffsets().get(sparkDataStream);
        if (sparkDataStream instanceof Source) {
            return (org.apache.spark.sql.connector.read.streaming.Offset) option.orNull($less$colon$less$.MODULE$.refl());
        }
        if (!(sparkDataStream instanceof MicroBatchStream)) {
            throw new MatchError(sparkDataStream);
        }
        MicroBatchStream microBatchStream = (MicroBatchStream) sparkDataStream;
        return (org.apache.spark.sql.connector.read.streaming.Offset) option.map(offset -> {
            return microBatchStream.deserializeOffset(offset.json());
        }).getOrElse(() -> {
            return microBatchStream.initialOffset();
        });
    }

    private boolean constructNextBatch(boolean z) {
        Object obj = new Object();
        try {
            return BoxesRunTime.unboxToBoolean(withProgressLocked(() -> {
                if (this.isCurrentBatchConstructed()) {
                    throw new NonLocalReturnControl.mcZ.sp(obj, true);
                }
                Tuple2 unzip = ((IterableOps) this.uniqueSources().toSeq().map(tuple2 -> {
                    if (tuple2 != null) {
                        AvailableNowDataStreamWrapper availableNowDataStreamWrapper = (SparkDataStream) tuple2._1();
                        ReadLimit readLimit = (ReadLimit) tuple2._2();
                        if (availableNowDataStreamWrapper instanceof AvailableNowDataStreamWrapper) {
                            AvailableNowDataStreamWrapper availableNowDataStreamWrapper2 = availableNowDataStreamWrapper;
                            this.updateStatusMessage("Getting offsets from " + availableNowDataStreamWrapper2);
                            SparkDataStream delegate = availableNowDataStreamWrapper2.delegate();
                            return (Tuple2) this.reportTimeTaken("latestOffset", () -> {
                                return new Tuple2(new Tuple2(delegate, Option$.MODULE$.apply(availableNowDataStreamWrapper2.latestOffset(this.getStartOffset(delegate), readLimit))), new Tuple2(delegate, Option$.MODULE$.apply(availableNowDataStreamWrapper2.reportLatestOffset())));
                            });
                        }
                    }
                    if (tuple2 != null) {
                        SupportsAdmissionControl supportsAdmissionControl = (SparkDataStream) tuple2._1();
                        ReadLimit readLimit2 = (ReadLimit) tuple2._2();
                        if (supportsAdmissionControl instanceof SupportsAdmissionControl) {
                            SupportsAdmissionControl supportsAdmissionControl2 = supportsAdmissionControl;
                            this.updateStatusMessage("Getting offsets from " + supportsAdmissionControl2);
                            return (Tuple2) this.reportTimeTaken("latestOffset", () -> {
                                return new Tuple2(new Tuple2(supportsAdmissionControl2, Option$.MODULE$.apply(supportsAdmissionControl2.latestOffset(this.getStartOffset(supportsAdmissionControl2), readLimit2))), new Tuple2(supportsAdmissionControl2, Option$.MODULE$.apply(supportsAdmissionControl2.reportLatestOffset())));
                            });
                        }
                    }
                    if (tuple2 != null) {
                        SparkDataStream sparkDataStream = (SparkDataStream) tuple2._1();
                        if (sparkDataStream instanceof Source) {
                            Source source = (Source) sparkDataStream;
                            this.updateStatusMessage("Getting offsets from " + source);
                            return (Tuple2) this.reportTimeTaken("getOffset", () -> {
                                Option<Offset> offset = source.getOffset();
                                return new Tuple2(new Tuple2(source, offset), new Tuple2(source, offset));
                            });
                        }
                    }
                    if (tuple2 != null) {
                        MicroBatchStream microBatchStream = (SparkDataStream) tuple2._1();
                        if (microBatchStream instanceof MicroBatchStream) {
                            MicroBatchStream microBatchStream2 = microBatchStream;
                            this.updateStatusMessage("Getting offsets from " + microBatchStream2);
                            return (Tuple2) this.reportTimeTaken("latestOffset", () -> {
                                org.apache.spark.sql.connector.read.streaming.Offset latestOffset = microBatchStream2.latestOffset();
                                return new Tuple2(new Tuple2(microBatchStream2, Option$.MODULE$.apply(latestOffset)), new Tuple2(microBatchStream2, Option$.MODULE$.apply(latestOffset)));
                            });
                        }
                    }
                    if (tuple2 != null) {
                        throw new IllegalStateException("Unexpected source: " + ((SparkDataStream) tuple2._1()));
                    }
                    throw new MatchError(tuple2);
                })).unzip(Predef$.MODULE$.$conforms());
                if (unzip == null) {
                    throw new MatchError(unzip);
                }
                Tuple2 tuple22 = new Tuple2((Seq) unzip._1(), (Seq) unzip._2());
                Seq seq = (Seq) tuple22._1();
                Seq seq2 = (Seq) tuple22._2();
                this.availableOffsets_$eq(this.availableOffsets().m1615$plus$plus((IterableOnce<Tuple2<SparkDataStream, org.apache.spark.sql.connector.read.streaming.Offset>>) ((IterableOnceOps) ((IterableOps) seq.filter(tuple23 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$constructNextBatch$7(tuple23));
                })).map(tuple24 -> {
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tuple24._1()), ((Option) tuple24._2()).get());
                })).toMap($less$colon$less$.MODULE$.refl())));
                this.latestOffsets_$eq(this.latestOffsets().m1615$plus$plus((IterableOnce<Tuple2<SparkDataStream, org.apache.spark.sql.connector.read.streaming.Offset>>) ((IterableOnceOps) ((IterableOps) seq2.filter(tuple25 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$constructNextBatch$9(tuple25));
                })).map(tuple26 -> {
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tuple26._1()), ((Option) tuple26._2()).get());
                })).toMap($less$colon$less$.MODULE$.refl())));
                OffsetSeqMetadata offsetSeqMetadata = this.offsetSeqMetadata();
                this.offsetSeqMetadata_$eq(offsetSeqMetadata.copy(this.watermarkTracker().currentWatermark(), this.super$triggerClock().getTimeMillis(), offsetSeqMetadata.copy$default$3()));
                boolean z2 = z && Option$.MODULE$.apply(this.lastExecution()).exists(incrementalExecution -> {
                    return BoxesRunTime.boxToBoolean($anonfun$constructNextBatch$11(this, incrementalExecution));
                });
                boolean z3 = this.isNewDataAvailable() || z2;
                this.logTrace(() -> {
                    return "noDataBatchesEnabled = " + z + ", lastExecutionRequiresAnotherBatch = " + z2 + ", isNewDataAvailable = " + this.isNewDataAvailable() + ", shouldConstructNextBatch = " + z3;
                });
                if (z3) {
                    this.updateStatusMessage("Writing offsets to log");
                    this.reportTimeTaken("walCommit", () -> {
                        this.markMicroBatchStart();
                        this.cleanUpLastExecutedMicroBatch();
                        if (this.minLogEntriesToMaintain() < this.currentBatchId()) {
                            if (this.useAsyncPurge()) {
                                this.purgeAsync();
                            } else {
                                this.purge(this.currentBatchId() - this.minLogEntriesToMaintain());
                            }
                        }
                    });
                    this.noNewData_$eq(false);
                } else {
                    this.noNewData_$eq(true);
                    this.awaitProgressLockCondition().signalAll();
                }
                return z3;
            }));
        } catch (NonLocalReturnControl e) {
            if (e.key() == obj) {
                return e.value$mcZ$sp();
            }
            throw e;
        }
    }

    public void commitSources(OffsetSeq offsetSeq) {
        offsetSeq.toStreamProgress(sources()).foreach(tuple2 -> {
            $anonfun$commitSources$1(tuple2);
            return BoxedUnit.UNIT;
        });
    }

    private void runBatch(SparkSession sparkSession) {
        UnaryNode withNewBatchId;
        logDebug(() -> {
            return "Running batch " + this.currentBatchId();
        });
        scala.collection.mutable.Map $plus$plus = ((MapOps) Map$.MODULE$.empty()).$plus$plus((IterableOnce) reportTimeTaken("getBatch", () -> {
            return this.availableOffsets().flatMap(tuple2 -> {
                org.apache.spark.sql.connector.read.streaming.Offset offset;
                if (tuple2 != null) {
                    SparkDataStream sparkDataStream = (SparkDataStream) tuple2._1();
                    org.apache.spark.sql.connector.read.streaming.Offset offset2 = (org.apache.spark.sql.connector.read.streaming.Offset) tuple2._2();
                    if (sparkDataStream instanceof Source) {
                        Source source = (Source) sparkDataStream;
                        if (offset2 instanceof Offset) {
                            Offset offset3 = (Offset) offset2;
                            if (BoxesRunTime.unboxToBoolean(this.committedOffsets().get((SparkDataStream) source).map(offset4 -> {
                                return BoxesRunTime.boxToBoolean($anonfun$runBatch$4(offset3, offset4));
                            }).getOrElse(() -> {
                                return true;
                            }))) {
                                Option<Offset> map = this.committedOffsets().get((SparkDataStream) source).map(offset5 -> {
                                    return (Offset) offset5;
                                });
                                Dataset<Row> batch = source.getBatch(map, offset3);
                                Predef$.MODULE$.assert(batch.isStreaming(), () -> {
                                    return "DataFrame returned by getBatch from " + source + " did not have isStreaming=true\n" + batch.queryExecution().logical();
                                });
                                this.logDebug(() -> {
                                    return "Retrieving data from " + source + ": " + map + " -> " + offset3;
                                });
                                return new Some(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(source), batch.logicalPlan()));
                            }
                        }
                    }
                }
                if (tuple2 != null) {
                    MicroBatchStream microBatchStream = (SparkDataStream) tuple2._1();
                    org.apache.spark.sql.connector.read.streaming.Offset offset6 = (org.apache.spark.sql.connector.read.streaming.Offset) tuple2._2();
                    if (microBatchStream instanceof MicroBatchStream) {
                        MicroBatchStream microBatchStream2 = microBatchStream;
                        if (BoxesRunTime.unboxToBoolean(this.committedOffsets().get((SparkDataStream) microBatchStream2).map(offset7 -> {
                            return BoxesRunTime.boxToBoolean($anonfun$runBatch$9(offset6, offset7));
                        }).getOrElse(() -> {
                            return true;
                        }))) {
                            Option map2 = this.committedOffsets().get((SparkDataStream) microBatchStream2).map(offset8 -> {
                                return microBatchStream2.deserializeOffset(offset8.json());
                            });
                            if (offset6 instanceof SerializedOffset) {
                                offset = microBatchStream2.deserializeOffset(((SerializedOffset) offset6).json());
                            } else {
                                if (offset6 == null) {
                                    throw new MatchError(offset6);
                                }
                                offset = offset6;
                            }
                            org.apache.spark.sql.connector.read.streaming.Offset offset9 = offset;
                            org.apache.spark.sql.connector.read.streaming.Offset offset10 = (org.apache.spark.sql.connector.read.streaming.Offset) map2.getOrElse(() -> {
                                return microBatchStream2.initialOffset();
                            });
                            this.logDebug(() -> {
                                return "Retrieving data from " + microBatchStream2 + ": " + map2 + " -> " + offset9;
                            });
                            return new Some(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(microBatchStream2), new OffsetHolder(offset10, offset9)));
                        }
                    }
                }
                return None$.MODULE$;
            });
        }));
        LogicalPlan transform = logicalPlan().transform(new MicroBatchExecution$$anonfun$2(this, $plus$plus));
        newData_$eq($plus$plus.toMap($less$colon$less$.MODULE$.refl()));
        LogicalPlan transformAllExpressionsWithPruning = transform.transformAllExpressionsWithPruning(treePatternBits -> {
            return BoxesRunTime.boxToBoolean($anonfun$runBatch$14(treePatternBits));
        }, transform.transformAllExpressionsWithPruning$default$2(), new MicroBatchExecution$$anonfun$4(this));
        Table sink = sink();
        if (sink instanceof Sink) {
            withNewBatchId = ((WriteToMicroBatchDataSourceV1) transformAllExpressionsWithPruning).withNewBatchId(currentBatchId());
        } else {
            if (!(sink instanceof SupportsWrite)) {
                throw new IllegalArgumentException("unknown sink type for " + sink());
            }
            withNewBatchId = ((WriteToMicroBatchDataSource) transformAllExpressionsWithPruning).withNewBatchId(currentBatchId());
        }
        UnaryNode unaryNode = withNewBatchId;
        sparkSession.sparkContext().setLocalProperty(MicroBatchExecution$.MODULE$.BATCH_ID_KEY(), Long.toString(currentBatchId()));
        sparkSession.sparkContext().setLocalProperty(StreamExecution$.MODULE$.IS_CONTINUOUS_PROCESSING(), Boolean.toString(false));
        reportTimeTaken("queryPlanning", () -> {
            this.lastExecution_$eq(new IncrementalExecution(sparkSession, (LogicalPlan) unaryNode, this.outputMode(), this.checkpointFile("state"), this.id(), this.runId(), this.currentBatchId(), this.offsetLog().offsetSeqMetadataForBatchId(this.currentBatchId() - 1), this.offsetSeqMetadata(), this.watermarkPropagator(), this.lastExecution() == null));
            return this.lastExecution().executedPlan();
        });
        markMicroBatchExecutionStart();
        Dataset dataset = new Dataset(lastExecution(), ExpressionEncoder$.MODULE$.apply(lastExecution().analyzed().schema()));
        Option option = (Option) reportTimeTaken("addBatch", () -> {
            return (Option) SQLExecution$.MODULE$.withNewExecutionId(this.lastExecution(), SQLExecution$.MODULE$.withNewExecutionId$default$2(), () -> {
                Table sink2 = this.sink();
                if (sink2 instanceof Sink) {
                    ((Sink) sink2).addBatch(this.currentBatchId(), dataset);
                    this.plan.catalogTable().foreach(catalogTable -> {
                        $anonfun$runBatch$18(this, catalogTable);
                        return BoxedUnit.UNIT;
                    });
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    if (!(sink2 instanceof SupportsWrite)) {
                        throw new MatchError(sink2);
                    }
                    dataset.collect();
                }
                SparkPlan executedPlan = this.lastExecution().executedPlan();
                return executedPlan instanceof WriteToDataSourceV2Exec ? ((WriteToDataSourceV2Exec) executedPlan).commitProgress() : None$.MODULE$;
            });
        });
        withProgressLocked(() -> {
            this.sinkCommitProgress_$eq(option);
            this.markMicroBatchEnd();
        });
        logDebug(() -> {
            return "Completed batch " + this.currentBatchId();
        });
    }

    public void markMicroBatchStart() {
        if (!offsetLog().add(currentBatchId(), availableOffsets().toOffsetSeq(sources(), offsetSeqMetadata()))) {
            throw QueryExecutionErrors$.MODULE$.concurrentStreamLogUpdate(currentBatchId());
        }
        logInfo(() -> {
            long currentBatchId = this.currentBatchId();
            this.offsetSeqMetadata().toString();
            return "Committed offsets for batch " + currentBatchId + ". Metadata " + currentBatchId;
        });
    }

    public void markMicroBatchExecutionStart() {
    }

    public void markMicroBatchEnd() {
        watermarkTracker().updateWatermark(lastExecution().executedPlan());
        reportTimeTaken("commitOffsets", () -> {
            if (!this.commitLog().add(this.currentBatchId(), new CommitMetadata(this.watermarkTracker().currentWatermark()))) {
                throw QueryExecutionErrors$.MODULE$.concurrentStreamLogUpdate(this.currentBatchId());
            }
        });
        committedOffsets_$eq(committedOffsets().m1615$plus$plus((IterableOnce<Tuple2<SparkDataStream, org.apache.spark.sql.connector.read.streaming.Offset>>) availableOffsets()));
    }

    public void cleanUpLastExecutedMicroBatch() {
        if (currentBatchId() != 0) {
            Option<OffsetSeq> option = offsetLog().get(currentBatchId() - 1);
            if (!option.isDefined()) {
                throw new IllegalStateException("batch " + (currentBatchId() - 1) + " doesn't exist");
            }
            commitSources((OffsetSeq) option.get());
            watermarkPropagator().purge(currentBatchId() - 2);
        }
    }

    public <T> T withProgressLocked(Function0<T> function0) {
        awaitProgressLock().lock();
        try {
            return (T) function0.apply();
        } finally {
            awaitProgressLock().unlock();
        }
    }

    public static final /* synthetic */ boolean $anonfun$getTrigger$3(MicroBatchExecution microBatchExecution, SparkDataStream sparkDataStream) {
        boolean z = sparkDataStream instanceof SupportsTriggerAvailableNow;
        if (!z) {
            microBatchExecution.logWarning(() -> {
                return "source [" + sparkDataStream + "] does not support Trigger.AvailableNow. Falling back to single batch execution. Note that this may not guarantee processing new data if there is an uncommitted batch. Please consult with data source developer to support Trigger.AvailableNow.";
            });
        }
        return z;
    }

    public static final /* synthetic */ void $anonfun$populateStartOffsets$1(MicroBatchExecution microBatchExecution, OffsetSeq offsetSeq) {
        microBatchExecution.committedOffsets_$eq(offsetSeq.toStreamProgress(microBatchExecution.sources()));
    }

    public static final /* synthetic */ void $anonfun$populateStartOffsets$2(MicroBatchExecution microBatchExecution, SparkSession sparkSession, OffsetSeqMetadata offsetSeqMetadata) {
        OffsetSeqMetadata$.MODULE$.setSessionConf(offsetSeqMetadata, sparkSession.conf());
        microBatchExecution.offsetSeqMetadata_$eq(OffsetSeqMetadata$.MODULE$.apply(offsetSeqMetadata.batchWatermarkMs(), offsetSeqMetadata.batchTimestampMs(), sparkSession.conf()));
        microBatchExecution.watermarkTracker_$eq(WatermarkTracker$.MODULE$.apply(sparkSession.conf()));
        microBatchExecution.watermarkTracker().setWatermark(offsetSeqMetadata.batchWatermarkMs());
    }

    public static final /* synthetic */ boolean $anonfun$populateStartOffsets$7(Offset offset, Offset offset2) {
        return offset2 != null ? offset2.equals(offset) : offset == null;
    }

    public static final /* synthetic */ boolean $anonfun$isNewDataAvailable$2(org.apache.spark.sql.connector.read.streaming.Offset offset, org.apache.spark.sql.connector.read.streaming.Offset offset2) {
        return offset2 != null ? !offset2.equals(offset) : offset != null;
    }

    public static final /* synthetic */ boolean $anonfun$isNewDataAvailable$1(MicroBatchExecution microBatchExecution, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        SparkDataStream sparkDataStream = (SparkDataStream) tuple2._1();
        org.apache.spark.sql.connector.read.streaming.Offset offset = (org.apache.spark.sql.connector.read.streaming.Offset) tuple2._2();
        return BoxesRunTime.unboxToBoolean(microBatchExecution.committedOffsets().get(sparkDataStream).map(offset2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$isNewDataAvailable$2(offset, offset2));
        }).getOrElse(() -> {
            return true;
        }));
    }

    public static final /* synthetic */ boolean $anonfun$constructNextBatch$7(Tuple2 tuple2) {
        if (tuple2 != null) {
            return ((Option) tuple2._2()).nonEmpty();
        }
        throw new MatchError(tuple2);
    }

    public static final /* synthetic */ boolean $anonfun$constructNextBatch$9(Tuple2 tuple2) {
        if (tuple2 != null) {
            return ((Option) tuple2._2()).nonEmpty();
        }
        throw new MatchError(tuple2);
    }

    public static final /* synthetic */ boolean $anonfun$constructNextBatch$11(MicroBatchExecution microBatchExecution, IncrementalExecution incrementalExecution) {
        return incrementalExecution.shouldRunAnotherBatch(microBatchExecution.offsetSeqMetadata());
    }

    public static final /* synthetic */ void $anonfun$commitSources$1(Tuple2 tuple2) {
        if (tuple2 != null) {
            SparkDataStream sparkDataStream = (SparkDataStream) tuple2._1();
            org.apache.spark.sql.connector.read.streaming.Offset offset = (org.apache.spark.sql.connector.read.streaming.Offset) tuple2._2();
            if (sparkDataStream instanceof Source) {
                Source source = (Source) sparkDataStream;
                if (offset instanceof Offset) {
                    source.commit((Offset) offset);
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
            }
        }
        if (tuple2 != null) {
            MicroBatchStream microBatchStream = (SparkDataStream) tuple2._1();
            org.apache.spark.sql.connector.read.streaming.Offset offset2 = (org.apache.spark.sql.connector.read.streaming.Offset) tuple2._2();
            if (microBatchStream instanceof MicroBatchStream) {
                MicroBatchStream microBatchStream2 = microBatchStream;
                microBatchStream2.commit(microBatchStream2.deserializeOffset(offset2.json()));
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        throw new IllegalArgumentException("Unknown source is found at constructNextBatch: " + ((SparkDataStream) tuple2._1()));
    }

    public static final /* synthetic */ boolean $anonfun$runBatch$4(Offset offset, org.apache.spark.sql.connector.read.streaming.Offset offset2) {
        return offset2 != null ? !offset2.equals(offset) : offset != null;
    }

    public static final /* synthetic */ boolean $anonfun$runBatch$9(org.apache.spark.sql.connector.read.streaming.Offset offset, org.apache.spark.sql.connector.read.streaming.Offset offset2) {
        return offset2 != null ? !offset2.equals(offset) : offset != null;
    }

    public static final /* synthetic */ boolean $anonfun$runBatch$14(TreePatternBits treePatternBits) {
        return treePatternBits.containsPattern(TreePattern$.MODULE$.CURRENT_LIKE());
    }

    public static final /* synthetic */ void $anonfun$runBatch$18(MicroBatchExecution microBatchExecution, CatalogTable catalogTable) {
        microBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession().catalog().refreshTable(catalogTable.identifier().quotedString());
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public MicroBatchExecution(SparkSession sparkSession, Trigger trigger, Clock clock, Map<String, String> map, WriteToStream writeToStream) {
        super(sparkSession, writeToStream.name(), writeToStream.resolvedCheckpointLocation(), writeToStream.inputQuery(), writeToStream.sink(), trigger, clock, writeToStream.outputMode(), writeToStream.deleteCheckpointOnStop());
        this.extraOptions = map;
        this.plan = writeToStream;
        AsyncLogPurge.$init$(this);
        this.errorNotifier = new ErrorNotifier();
        this.sources = scala.package$.MODULE$.Seq().empty();
        this.isCurrentBatchConstructed = false;
        this.watermarkPropagator = WatermarkPropagator$.MODULE$.apply(super.sparkSession().sessionState().conf());
        Statics.releaseFence();
    }
}
