package org.opensearch.spark.sql.streaming;

import java.util.Set;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SQLExecution$;
import org.apache.spark.sql.execution.streaming.MetadataLog;
import org.apache.spark.sql.execution.streaming.Sink;
import org.apache.spark.sql.types.StructType;
import org.opensearch.hadoop.cfg.Settings;
import scala.Predef$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: OpenSearchSparkSqlStreamingSink.scala */
@ScalaSignature(bytes = "\u0006\u0005U4A!\u0003\u0006\u0001+!A\u0011\u0006\u0001B\u0001B\u0003%!\u0006\u0003\u0005/\u0001\t\u0005\t\u0015!\u00030\u0011\u00159\u0004\u0001\"\u00019\u0011\u001di\u0004A1A\u0005\nyBaa\u0012\u0001!\u0002\u0013y\u0004b\u0002%\u0001\u0005\u0004%I!\u0013\u0005\u0007-\u0002\u0001\u000b\u0011\u0002&\t\u000b]\u0003A\u0011\t-\u0003?=\u0003XM\\*fCJ\u001c\u0007n\u00159be.\u001c\u0016\u000f\\*ue\u0016\fW.\u001b8h'&t7N\u0003\u0002\f\u0019\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003\u001b9\t1a]9m\u0015\ty\u0001#A\u0003ta\u0006\u00148N\u0003\u0002\u0012%\u0005Qq\u000e]3og\u0016\f'o\u00195\u000b\u0003M\t1a\u001c:h\u0007\u0001\u00192\u0001\u0001\f\u001f!\t9B$D\u0001\u0019\u0015\tI\"$\u0001\u0003mC:<'\"A\u000e\u0002\t)\fg/Y\u0005\u0003;a\u0011aa\u00142kK\u000e$\bCA\u0010(\u001b\u0005\u0001#BA\u0006\"\u0015\t\u00113%A\u0005fq\u0016\u001cW\u000f^5p]*\u0011Q\u0002\n\u0006\u0003\u001f\u0015R!A\n\n\u0002\r\u0005\u0004\u0018m\u00195f\u0013\tA\u0003E\u0001\u0003TS:\\\u0017\u0001D:qCJ\\7+Z:tS>t\u0007CA\u0016-\u001b\u0005\u0019\u0013BA\u0017$\u00051\u0019\u0006/\u0019:l'\u0016\u001c8/[8o\u0003!\u0019X\r\u001e;j]\u001e\u001c\bC\u0001\u00196\u001b\u0005\t$B\u0001\u001a4\u0003\r\u0019gm\u001a\u0006\u0003iA\ta\u0001[1e_>\u0004\u0018B\u0001\u001c2\u0005!\u0019V\r\u001e;j]\u001e\u001c\u0018A\u0002\u001fj]&$h\bF\u0002:wq\u0002\"A\u000f\u0001\u000e\u0003)AQ!K\u0002A\u0002)BQAL\u0002A\u0002=\na\u0001\\8hO\u0016\u0014X#A \u0011\u0005\u0001+U\"A!\u000b\u0005\t\u001b\u0015a\u00027pO\u001eLgn\u001a\u0006\u0003\t\u0016\nqaY8n[>t7/\u0003\u0002G\u0003\n\u0019Aj\\4\u0002\u000f1|wmZ3sA\u0005AqO]5uK2{w-F\u0001K!\ry2*T\u0005\u0003\u0019\u0002\u00121\"T3uC\u0012\fG/\u0019'pOB\u0019a*U*\u000e\u0003=S\u0011\u0001U\u0001\u0006g\u000e\fG.Y\u0005\u0003%>\u0013Q!\u0011:sCf\u0004\"A\u000f+\n\u0005US!\u0001F(qK:\u001cV-\u0019:dQNKgn[*uCR,8/A\u0005xe&$X\rT8hA\u0005A\u0011\r\u001a3CCR\u001c\u0007\u000eF\u0002Z9\u0006\u0004\"A\u0014.\n\u0005m{%\u0001B+oSRDQ!\u0018\u0005A\u0002y\u000bqAY1uG\"LE\r\u0005\u0002O?&\u0011\u0001m\u0014\u0002\u0005\u0019>tw\rC\u0003c\u0011\u0001\u00071-\u0001\u0003eCR\f\u0007C\u00013s\u001d\t)\u0007O\u0004\u0002g_:\u0011qM\u001c\b\u0003Q6t!!\u001b7\u000e\u0003)T!a\u001b\u000b\u0002\rq\u0012xn\u001c;?\u0013\u0005\u0019\u0012B\u0001\u0014\u0013\u0013\tyQ%\u0003\u0002\u000eI%\u0011\u0011oI\u0001\ba\u0006\u001c7.Y4f\u0013\t\u0019HOA\u0005ECR\fgI]1nK*\u0011\u0011o\t")
/* loaded from: input_file:org/opensearch/spark/sql/streaming/OpenSearchSparkSqlStreamingSink.class */
public class OpenSearchSparkSqlStreamingSink implements Sink {
    private final Settings settings;
    private final Log logger;
    private final MetadataLog<OpenSearchSinkStatus[]> writeLog;

    public String name() {
        return Sink.name$(this);
    }

    public StructType schema() {
        return Sink.schema$(this);
    }

    public Set<TableCapability> capabilities() {
        return Sink.capabilities$(this);
    }

    private Log logger() {
        return this.logger;
    }

    private MetadataLog<OpenSearchSinkStatus[]> writeLog() {
        return this.writeLog;
    }

    public void addBatch(long j, Dataset<Row> dataset) {
        if (j <= BoxesRunTime.unboxToLong(writeLog().getLatest().map(tuple2 -> {
            return BoxesRunTime.boxToLong(tuple2._1$mcJ$sp());
        }).getOrElse(() -> {
            return -1L;
        }))) {
            logger().info(new StringBuilder(35).append("Skipping already committed batch [").append(j).append("]").toString());
            return;
        }
        OpenSearchCommitProtocol openSearchCommitProtocol = new OpenSearchCommitProtocol(writeLog());
        QueryExecution queryExecution = dataset.queryExecution();
        StructType schema = dataset.schema();
        SQLExecution$.MODULE$.withNewExecutionId(queryExecution, SQLExecution$.MODULE$.withNewExecutionId$default$2(), () -> {
            SparkSession sparkSession = queryExecution.sparkSession();
            JobState jobState = new JobState((String) SparkSqlStreamingConfigs$.MODULE$.getQueryName(this.settings).getOrElse(() -> {
                return UUID.randomUUID().toString();
            }), j);
            openSearchCommitProtocol.initJob(jobState);
            try {
                String save = this.settings.save();
                openSearchCommitProtocol.commitJob(jobState, Predef$.MODULE$.copyArrayToImmutableIndexedSeq((TaskCommit[]) sparkSession.sparkContext().runJob(queryExecution.toRdd(), (taskContext, iterator) -> {
                    return new OpenSearchStreamQueryWriter(save, schema, openSearchCommitProtocol).run(taskContext, iterator);
                }, ClassTag$.MODULE$.apply(TaskCommit.class))));
            } catch (Throwable th) {
                openSearchCommitProtocol.abortJob(jobState);
                throw th;
            }
        });
    }

    public OpenSearchSparkSqlStreamingSink(SparkSession sparkSession, Settings settings) {
        MetadataLog<OpenSearchSinkStatus[]> nullMetadataLog;
        this.settings = settings;
        Sink.$init$(this);
        this.logger = LogFactory.getLog(OpenSearchSparkSqlStreamingSink.class);
        if (SparkSqlStreamingConfigs$.MODULE$.getSinkLogEnabled(settings)) {
            String constructCommitLogPath = SparkSqlStreamingConfigs$.MODULE$.constructCommitLogPath(settings);
            logger().info(new StringBuilder(20).append("Using log path of [").append(constructCommitLogPath).append("]").toString());
            nullMetadataLog = new OpenSearchSinkMetadataLog(settings, sparkSession, constructCommitLogPath);
        } else {
            logger().warn("OpenSearchSparkSqlStreamingSink is continuing without write commit log. Be advised that data may be duplicated!");
            nullMetadataLog = new NullMetadataLog<>();
        }
        this.writeLog = nullMetadataLog;
    }
}
