package org.apache.seatunnel.spark.batch;

import java.util.Iterator;
import java.util.List;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.ConfigRuntimeException;
import org.apache.seatunnel.env.Execution;
import org.apache.seatunnel.plugin.Plugin;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.spark.BaseSparkSink;
import org.apache.seatunnel.spark.BaseSparkSource;
import org.apache.seatunnel.spark.BaseSparkTransform;
import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/* loaded from: input_file:org/apache/seatunnel/spark/batch/SparkBatchExecution.class */
public class SparkBatchExecution implements Execution<SparkBatchSource, BaseSparkTransform, SparkBatchSink> {
    private final SparkEnvironment environment;
    private Config config = ConfigFactory.empty();

    public SparkBatchExecution(SparkEnvironment sparkEnvironment) {
        this.environment = sparkEnvironment;
    }

    public static void registerTempView(String str, Dataset<Row> dataset) {
        dataset.createOrReplaceTempView(str);
    }

    public static void registerInputTempView(BaseSparkSource<Dataset<Row>> baseSparkSource, SparkEnvironment sparkEnvironment) {
        Config config = baseSparkSource.getConfig();
        if (!config.hasPath(Plugin.RESULT_TABLE_NAME)) {
            throw new ConfigRuntimeException("Plugin[" + baseSparkSource.getClass().getName() + "] must be registered as dataset/table, please set \"" + Plugin.RESULT_TABLE_NAME + "\" config");
        }
        registerTempView(config.getString(Plugin.RESULT_TABLE_NAME), baseSparkSource.getData(sparkEnvironment));
    }

    public static Dataset<Row> transformProcess(SparkEnvironment sparkEnvironment, BaseSparkTransform baseSparkTransform, Dataset<Row> dataset) {
        Dataset<Row> dataset2;
        Config config = baseSparkTransform.getConfig();
        if (config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
            dataset2 = sparkEnvironment.getSparkSession().read().table(config.getString(Plugin.SOURCE_TABLE_NAME));
        } else {
            dataset2 = dataset;
        }
        return baseSparkTransform.process(dataset2, sparkEnvironment);
    }

    public static void registerTransformTempView(BaseSparkTransform baseSparkTransform, Dataset<Row> dataset) {
        Config config = baseSparkTransform.getConfig();
        if (config.hasPath(Plugin.RESULT_TABLE_NAME)) {
            registerTempView(config.getString(Plugin.RESULT_TABLE_NAME), dataset);
        }
    }

    public static void sinkProcess(SparkEnvironment sparkEnvironment, BaseSparkSink<?> baseSparkSink, Dataset<Row> dataset) {
        Dataset<Row> dataset2;
        Config config = baseSparkSink.getConfig();
        if (config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
            dataset2 = sparkEnvironment.getSparkSession().read().table(config.getString(Plugin.SOURCE_TABLE_NAME));
        } else {
            dataset2 = dataset;
        }
        baseSparkSink.output(dataset2, sparkEnvironment);
    }

    @Override // org.apache.seatunnel.env.Execution
    public void start(List<SparkBatchSource> list, List<BaseSparkTransform> list2, List<SparkBatchSink> list3) {
        list.forEach(sparkBatchSource -> {
            registerInputTempView(sparkBatchSource, this.environment);
        });
        if (list.isEmpty()) {
            return;
        }
        Dataset<Row> data = list.get(0).getData(this.environment);
        for (BaseSparkTransform baseSparkTransform : list2) {
            if (data.takeAsList(1).size() > 0) {
                data = transformProcess(this.environment, baseSparkTransform, data);
                registerTransformTempView(baseSparkTransform, data);
            }
        }
        Iterator<SparkBatchSink> it2 = list3.iterator();
        while (it2.hasNext()) {
            sinkProcess(this.environment, it2.next(), data);
        }
    }

    @Override // org.apache.seatunnel.plugin.Plugin
    public void setConfig(Config config) {
        this.config = config;
    }

    @Override // org.apache.seatunnel.plugin.Plugin
    public Config getConfig() {
        return this.config;
    }

    @Override // org.apache.seatunnel.plugin.Plugin
    public CheckResult checkConfig() {
        return CheckResult.success();
    }

    @Override // org.apache.seatunnel.plugin.Plugin
    public void prepare(Void r2) {
    }
}
