package org.apache.kylin.engine.spark;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.cuboid.CuboidUtil;
import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.engine.spark.SparkCubingByLayer;
import org.apache.kylin.engine.spark.SparkFunction;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:org/apache/kylin/engine/spark/SparkCubingByLayerForOpt.class */
public class SparkCubingByLayerForOpt extends AbstractApplication implements Serializable {
    protected static final Logger logger = LoggerFactory.getLogger(SparkCubingByLayerForOpt.class);
    public static final Option OPTION_CUBE_NAME;
    public static final Option OPTION_SEGMENT_ID;
    public static final Option OPTION_META_URL;
    public static final Option OPTION_OUTPUT_PATH;
    public static final Option OPTION_INPUT_PATH;
    public static final Option OPTION_CUBOID_MODE;
    private Options options = new Options();
    private static final Iterable<Tuple2<ByteArray, Object[]>> EMTPY_ITERATOR;

    /* loaded from: input_file:org/apache/kylin/engine/spark/SparkCubingByLayerForOpt$CuboidFlatMap.class */
    public static class CuboidFlatMap extends SparkFunction.PairFlatMapFunctionBase<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> {
        private String cubeName;
        private String segmentId;
        private String metaUrl;
        private CubeSegment cubeSegment;
        private CubeDesc cubeDesc;
        private NDCuboidBuilder ndCuboidBuilder;
        private RowKeySplitter rowKeySplitter;
        private SerializableConfiguration conf;
        private CuboidScheduler cuboidScheduler;

        public CuboidFlatMap(String str, String str2, String str3, SerializableConfiguration serializableConfiguration, CuboidScheduler cuboidScheduler) {
            this.cubeName = str;
            this.segmentId = str2;
            this.metaUrl = str3;
            this.conf = serializableConfiguration;
            this.cuboidScheduler = cuboidScheduler;
        }

        @Override // org.apache.kylin.engine.spark.SparkFunction.FunctionBase
        protected void doInit() {
            KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(this.conf, this.metaUrl);
            KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(loadKylinConfigFromHdfs);
            Throwable th = null;
            try {
                try {
                    CubeInstance cube = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(this.cubeName);
                    this.cubeSegment = cube.getSegmentById(this.segmentId);
                    this.cubeDesc = cube.getDescriptor();
                    this.rowKeySplitter = new RowKeySplitter(this.cubeSegment);
                    this.ndCuboidBuilder = new NDCuboidBuilder(this.cubeSegment, new RowKeyEncoderProvider(this.cubeSegment));
                    if (andUnsetThreadLocalConfig != null) {
                        if (0 == 0) {
                            andUnsetThreadLocalConfig.close();
                            return;
                        }
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (andUnsetThreadLocalConfig != null) {
                    if (th != null) {
                        try {
                            andUnsetThreadLocalConfig.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        andUnsetThreadLocalConfig.close();
                    }
                }
                throw th4;
            }
        }

        @Override // org.apache.kylin.engine.spark.SparkFunction.PairFlatMapFunctionBase
        public Iterator<Tuple2<ByteArray, Object[]>> doCall(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
            byte[] array = ((ByteArray) tuple2._1()).array();
            long parseCuboid = this.rowKeySplitter.parseCuboid(array);
            List spanningCuboid = this.cuboidScheduler.getSpanningCuboid(parseCuboid);
            if (spanningCuboid == null || spanningCuboid.size() == 0) {
                return SparkCubingByLayerForOpt.EMTPY_ITERATOR.iterator();
            }
            this.rowKeySplitter.split(array);
            Cuboid findForMandatory = Cuboid.findForMandatory(this.cubeDesc, parseCuboid);
            ArrayList arrayList = new ArrayList(spanningCuboid.size());
            Iterator it = spanningCuboid.iterator();
            while (it.hasNext()) {
                arrayList.add(new Tuple2(this.ndCuboidBuilder.buildKey2(findForMandatory, Cuboid.findForMandatory(this.cubeDesc, ((Long) it.next()).longValue()), this.rowKeySplitter.getSplitBuffers()), tuple2._2()));
            }
            return arrayList.iterator();
        }
    }

    public SparkCubingByLayerForOpt() {
        this.options.addOption(OPTION_CUBOID_MODE);
        this.options.addOption(OPTION_INPUT_PATH);
        this.options.addOption(OPTION_CUBE_NAME);
        this.options.addOption(OPTION_SEGMENT_ID);
        this.options.addOption(OPTION_META_URL);
        this.options.addOption(OPTION_OUTPUT_PATH);
    }

    protected Options getOptions() {
        return this.options;
    }

    protected void execute(OptionsHelper optionsHelper) throws Exception {
        String optionValue = optionsHelper.getOptionValue(OPTION_META_URL);
        String optionValue2 = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
        String optionValue3 = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
        String optionValue4 = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
        String optionValue5 = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
        String optionValue6 = optionsHelper.getOptionValue(OPTION_CUBOID_MODE);
        SparkConf kryoSerializerInConf = SparkUtil.setKryoSerializerInConf();
        kryoSerializerInConf.setAppName("Kylin_Cubing_For_Optimize_" + optionValue3 + "_With_Spark");
        KylinSparkJobListener kylinSparkJobListener = new KylinSparkJobListener();
        JavaSparkContext javaSparkContext = new JavaSparkContext(kryoSerializerInConf);
        Throwable th = null;
        try {
            try {
                SparkUtil.modifySparkHadoopConfiguration(javaSparkContext.sc(), AbstractHadoopJob.loadKylinConfigFromHdfs(new SerializableConfiguration(javaSparkContext.hadoopConfiguration()), optionValue));
                javaSparkContext.sc().addSparkListener(kylinSparkJobListener);
                SerializableConfiguration serializableConfiguration = new SerializableConfiguration(javaSparkContext.hadoopConfiguration());
                KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(serializableConfiguration, optionValue);
                CubeInstance cube = CubeManager.getInstance(loadKylinConfigFromHdfs).getCube(optionValue3);
                CubeDesc cubeDesc = CubeDescManager.getInstance(loadKylinConfigFromHdfs).getCubeDesc(cube.getDescName());
                CubeSegment segmentById = cube.getSegmentById(optionValue4);
                CubeStatsReader cubeStatsReader = new CubeStatsReader(segmentById, loadKylinConfigFromHdfs);
                Job job = Job.getInstance(serializableConfiguration.get());
                SparkUtil.setHadoopConfForCuboid(job, segmentById, optionValue);
                StorageLevel fromString = StorageLevel.fromString(loadKylinConfigFromHdfs.getSparkStorageLevel());
                JavaPairRDD<ByteArray, Object[]> cuboIdRDDFromHdfs = SparkUtil.getCuboIdRDDFromHdfs(javaSparkContext, optionValue, optionValue3, segmentById, optionValue2, cubeDesc.getMeasures().size(), serializableConfiguration);
                int longestDepth = CuboidUtil.getLongestDepth(segmentById.getCubeInstance().getCuboidsByMode(optionValue6));
                logger.info("cuboidMode" + optionValue6);
                logger.info("maxLevel" + longestDepth);
                CuboidScheduler cuboidSchedulerByMode = CuboidSchedulerUtil.getCuboidSchedulerByMode(segmentById, optionValue6);
                JavaPairRDD<ByteArray, Object[]>[] javaPairRDDArr = new JavaPairRDD[longestDepth + 1];
                javaPairRDDArr[0] = cuboIdRDDFromHdfs;
                SparkCubingByLayer.BaseCuboidReducerFunction2 baseCuboidReducerFunction2 = new SparkCubingByLayer.BaseCuboidReducerFunction2(optionValue3, optionValue, serializableConfiguration);
                boolean z = true;
                boolean[] zArr = new boolean[cubeDesc.getMeasures().size()];
                for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
                    zArr[i] = !((MeasureDesc) cubeDesc.getMeasures().get(i)).getFunction().getMeasureType().onlyAggrInBaseCuboid();
                    z = z && zArr[i];
                }
                if (!z) {
                    baseCuboidReducerFunction2 = new SparkCubingByLayer.CuboidReducerFunction2(optionValue3, optionValue, serializableConfiguration, zArr);
                }
                for (int i2 = 1; i2 <= longestDepth; i2++) {
                    javaPairRDDArr[i2] = javaPairRDDArr[i2 - 1].flatMapToPair(new CuboidFlatMap(optionValue3, optionValue4, optionValue, serializableConfiguration, cuboidSchedulerByMode)).reduceByKey(baseCuboidReducerFunction2, SparkUtil.estimateLayerPartitionNum(i2, cubeStatsReader, loadKylinConfigFromHdfs));
                    javaPairRDDArr[i2].persist(fromString);
                    saveToHDFS(javaPairRDDArr[i2], optionValue, optionValue3, segmentById, optionValue5, i2, job);
                    javaPairRDDArr[i2 - 1].unpersist(false);
                }
                javaPairRDDArr[longestDepth].unpersist(false);
                logger.info("Finished on calculating needed cuboids For Optimize.");
                logger.info("HDFS: Number of bytes written=" + kylinSparkJobListener.metrics.getBytesWritten());
                if (javaSparkContext != null) {
                    if (0 == 0) {
                        javaSparkContext.close();
                        return;
                    }
                    try {
                        javaSparkContext.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (javaSparkContext != null) {
                if (th != null) {
                    try {
                        javaSparkContext.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    javaSparkContext.close();
                }
            }
            throw th4;
        }
    }

    protected void saveToHDFS(JavaPairRDD<ByteArray, Object[]> javaPairRDD, final String str, final String str2, CubeSegment cubeSegment, String str3, int i, Job job) throws Exception {
        String cuboidOutputPathsByLevel = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(str3, i);
        final SerializableConfiguration serializableConfiguration = new SerializableConfiguration(job.getConfiguration());
        MRUtil.getBatchCubingOutputSide2(cubeSegment).getOutputFormat().configureJobOutput(job, cuboidOutputPathsByLevel, cubeSegment, cubeSegment.getCuboidScheduler(), i);
        javaPairRDD.mapToPair(new SparkFunction.PairFunctionBase<Tuple2<ByteArray, Object[]>, Text, Text>() { // from class: org.apache.kylin.engine.spark.SparkCubingByLayerForOpt.1
            private BufferedMeasureCodec codec;

            @Override // org.apache.kylin.engine.spark.SparkFunction.FunctionBase
            protected void doInit() {
                KylinConfig loadKylinConfigFromHdfs = AbstractHadoopJob.loadKylinConfigFromHdfs(serializableConfiguration, str);
                KylinConfig.SetAndUnsetThreadLocalConfig andUnsetThreadLocalConfig = KylinConfig.setAndUnsetThreadLocalConfig(loadKylinConfigFromHdfs);
                Throwable th = null;
                try {
                    try {
                        this.codec = new BufferedMeasureCodec(CubeDescManager.getInstance(loadKylinConfigFromHdfs).getCubeDesc(str2).getMeasures());
                        if (andUnsetThreadLocalConfig != null) {
                            if (0 == 0) {
                                andUnsetThreadLocalConfig.close();
                                return;
                            }
                            try {
                                andUnsetThreadLocalConfig.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (andUnsetThreadLocalConfig != null) {
                        if (th != null) {
                            try {
                                andUnsetThreadLocalConfig.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            andUnsetThreadLocalConfig.close();
                        }
                    }
                    throw th4;
                }
            }

            @Override // org.apache.kylin.engine.spark.SparkFunction.PairFunctionBase
            public Tuple2<Text, Text> doCall(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
                ByteBuffer encode = this.codec.encode((Object[]) tuple2._2());
                Text text = new Text();
                text.set(encode.array(), 0, encode.position());
                return new Tuple2<>(new Text(((ByteArray) tuple2._1()).array()), text);
            }
        }).saveAsNewAPIHadoopDataset(job.getConfiguration());
        logger.info("Persisting RDD for level " + i + " into " + cuboidOutputPathsByLevel);
    }

    static {
        OptionBuilder.withArgName("cubename");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Name");
        OPTION_CUBE_NAME = OptionBuilder.create("cubename");
        OptionBuilder.withArgName("segment");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube Segment Id");
        OPTION_SEGMENT_ID = OptionBuilder.create("segmentId");
        OptionBuilder.withArgName("metaUrl");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("HDFS metadata url");
        OPTION_META_URL = OptionBuilder.create("metaUrl");
        OptionBuilder.withArgName("output");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Cube output path");
        OPTION_OUTPUT_PATH = OptionBuilder.create("output");
        OptionBuilder.withArgName("input");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("Hive Intermediate Table PATH");
        OPTION_INPUT_PATH = OptionBuilder.create("input");
        OptionBuilder.withArgName("cuboidMode");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(true);
        OptionBuilder.withDescription("CoboId Mode ");
        OPTION_CUBOID_MODE = OptionBuilder.create("cuboidMode");
        EMTPY_ITERATOR = new ArrayList(0);
    }
}
