/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.spark.sql.streaming;

import java.io.Serializable;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.TaskContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SQLExecution$;
import org.apache.spark.sql.execution.streaming.MetadataLog;
import org.apache.spark.sql.execution.streaming.Sink;
import org.apache.spark.sql.types.StructType;
import org.opensearch.hadoop.cfg.Settings;
import org.opensearch.spark.sql.streaming.JobState;
import org.opensearch.spark.sql.streaming.NullMetadataLog;
import org.opensearch.spark.sql.streaming.OpenSearchCommitProtocol;
import org.opensearch.spark.sql.streaming.OpenSearchSinkMetadataLog;
import org.opensearch.spark.sql.streaming.OpenSearchSinkStatus;
import org.opensearch.spark.sql.streaming.OpenSearchStreamQueryWriter;
import org.opensearch.spark.sql.streaming.SparkSqlStreamingConfigs$;
import org.opensearch.spark.sql.streaming.TaskCommit;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.Predef$;
import scala.collection.Iterator;
import scala.collection.immutable.Seq;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0005U4A!\u0003\u0006\u0001+!A\u0011\u0006\u0001B\u0001B\u0003%!\u0006\u0003\u0005/\u0001\t\u0005\t\u0015!\u00030\u0011\u00159\u0004\u0001\"\u00019\u0011\u001di\u0004A1A\u0005\nyBaa\u0012\u0001!\u0002\u0013y\u0004b\u0002%\u0001\u0005\u0004%I!\u0013\u0005\u0007-\u0002\u0001\u000b\u0011\u0002&\t\u000b]\u0003A\u0011\t-\u0003?=\u0003XM\\*fCJ\u001c\u0007n\u00159be.\u001c\u0016\u000f\\*ue\u0016\fW.\u001b8h'&t7N\u0003\u0002\f\u0019\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003\u001b9\t1a]9m\u0015\ty\u0001#A\u0003ta\u0006\u00148N\u0003\u0002\u0012%\u0005Qq\u000e]3og\u0016\f'o\u00195\u000b\u0003M\t1a\u001c:h\u0007\u0001\u00192\u0001\u0001\f\u001f!\t9B$D\u0001\u0019\u0015\tI\"$\u0001\u0003mC:<'\"A\u000e\u0002\t)\fg/Y\u0005\u0003;a\u0011aa\u00142kK\u000e$\bCA\u0010(\u001b\u0005\u0001#BA\u0006\"\u0015\t\u00113%A\u0005fq\u0016\u001cW\u000f^5p]*\u0011Q\u0002\n\u0006\u0003\u001f\u0015R!A\n\n\u0002\r\u0005\u0004\u0018m\u00195f\u0013\tA\u0003E\u0001\u0003TS:\\\u0017\u0001D:qCJ\\7+Z:tS>t\u0007CA\u0016-\u001b\u0005\u0019\u0013BA\u0017$\u00051\u0019\u0006/\u0019:l'\u0016\u001c8/[8o\u0003!\u0019X\r\u001e;j]\u001e\u001c\bC\u0001\u00196\u001b\u0005\t$B\u0001\u001a4\u0003\r\u0019gm\u001a\u0006\u0003iA\ta\u0001[1e_>\u0004\u0018B\u0001\u001c2\u0005!\u0019V\r\u001e;j]\u001e\u001c\u0018A\u0002\u001fj]&$h\bF\u0002:wq\u0002\"A\u000f\u0001\u000e\u0003)AQ!K\u0002A\u0002)BQAL\u0002A\u0002=\na\u0001\\8hO\u0016\u0014X#A \u0011\u0005\u0001+U\"A!\u000b\u0005\t\u001b\u0015a\u00027pO\u001eLgn\u001a\u0006\u0003\t\u0016\nqaY8n[>t7/\u0003\u0002G\u0003\n\u0019Aj\\4\u0002\u000f1|wmZ3sA\u0005AqO]5uK2{w-F\u0001K!\ry2*T\u0005\u0003\u0019\u0002\u00121\"T3uC\u0012\fG/\u0019'pOB\u0019a*U*\u000e\u0003=S\u0011\u0001U\u0001\u0006g\u000e\fG.Y\u0005\u0003%>\u0013Q!\u0011:sCf\u0004\"A\u000f+\n\u0005US!\u0001F(qK:\u001cV-\u0019:dQNKgn[*uCR,8/A\u0005xe&$X\rT8hA\u0005A\u0011\r\u001a3CCR\u001c\u0007\u000eF\u0002Z9\u0006\u0004\"A\u0014.\n\u0005m{%\u0001B+oSRDQ!\u0018\u0005A\u0002y\u000bqAY1uG\"LE\r\u0005\u0002O?&\u0011\u0001m\u0014\u0002\u0005\u0019>tw\rC\u0003c\u0011\u0001\u00071-\u0001\u0003eCR\f\u0007C\u00013s\u001d\t)\u0007O\u0004\u0002g_:\u0011qM\u001c\b\u0003Q6t!!\u001b7\u000e\u0003)T!a\u001b\u000b\u0002\rq\u0012xn\u001c;?\u0013\u0005\u0019\u0012B\u0001\u0014\u0013\u0013\tyQ%\u0003\u0002\u000eI%\u0011\u0011oI\u0001\ba\u0006\u001c7.Y4f\u0013\t\u0019HOA\u0005ECR\fgI]1nK*\u0011\u0011o\t")
public class OpenSearchSparkSqlStreamingSink
implements Sink {
    private final Settings settings;
    private final Log logger;
    private final MetadataLog<OpenSearchSinkStatus[]> writeLog;

    public String name() {
        return Sink.name$((Sink)this);
    }

    public StructType schema() {
        return Sink.schema$((Sink)this);
    }

    public Set<TableCapability> capabilities() {
        return Sink.capabilities$((Sink)this);
    }

    private Log logger() {
        return this.logger;
    }

    private MetadataLog<OpenSearchSinkStatus[]> writeLog() {
        return this.writeLog;
    }

    public void addBatch(long batchId, Dataset<Row> data) {
        if (batchId <= BoxesRunTime.unboxToLong((Object)this.writeLog().getLatest().map((Function1 & Serializable)x$1 -> BoxesRunTime.boxToLong((long)x$1._1$mcJ$sp())).getOrElse((Function0)(JFunction0.mcJ.sp & Serializable)() -> -1L))) {
            this.logger().info((Object)new StringBuilder(35).append("Skipping already committed batch [").append(batchId).append("]").toString());
        } else {
            OpenSearchCommitProtocol commitProtocol = new OpenSearchCommitProtocol(this.writeLog());
            QueryExecution queryExecution = data.queryExecution();
            StructType schema = data.schema();
            SQLExecution$.MODULE$.withNewExecutionId(queryExecution, SQLExecution$.MODULE$.withNewExecutionId$default$2(), (Function0)(JFunction0.mcV.sp & Serializable)() -> {
                SparkSession sparkSession = queryExecution.sparkSession();
                String queryName = (String)SparkSqlStreamingConfigs$.MODULE$.getQueryName($this.settings).getOrElse((Function0 & Serializable)() -> UUID.randomUUID().toString());
                JobState jobState = new JobState(queryName, batchId);
                commitProtocol.initJob(jobState);
                try {
                    String serializedSettings = $this.settings.save();
                    TaskCommit[] taskCommits = (TaskCommit[])sparkSession.sparkContext().runJob(queryExecution.toRdd(), (Function2 & Serializable)(taskContext, iter) -> new OpenSearchStreamQueryWriter(serializedSettings, schema, commitProtocol).run((TaskContext)taskContext, (Iterator<InternalRow>)iter), ClassTag$.MODULE$.apply(TaskCommit.class));
                    commitProtocol.commitJob(jobState, (Seq<TaskCommit>)Predef$.MODULE$.copyArrayToImmutableIndexedSeq((Object)taskCommits));
                }
                catch (Throwable t) {
                    commitProtocol.abortJob(jobState);
                    throw t;
                }
            });
        }
    }

    public OpenSearchSparkSqlStreamingSink(SparkSession sparkSession, Settings settings) {
        Object object;
        this.settings = settings;
        Sink.$init$((Sink)this);
        this.logger = LogFactory.getLog(OpenSearchSparkSqlStreamingSink.class);
        if (SparkSqlStreamingConfigs$.MODULE$.getSinkLogEnabled(settings)) {
            String logPath = SparkSqlStreamingConfigs$.MODULE$.constructCommitLogPath(settings);
            this.logger().info((Object)new StringBuilder(20).append("Using log path of [").append(logPath).append("]").toString());
            object = new OpenSearchSinkMetadataLog(settings, sparkSession, logPath);
        } else {
            this.logger().warn((Object)"OpenSearchSparkSqlStreamingSink is continuing without write commit log. Be advised that data may be duplicated!");
            object = new NullMetadataLog<OpenSearchSinkStatus[]>();
        }
        this.writeLog = object;
    }
}

