package com.paypal.dione.spark.index;

import com.paypal.dione.spark.metrics.StatsReporter;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Array$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.Tuple4;
import scala.collection.LinearSeqOptimized;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.math.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyRef;

/* compiled from: IndexReader.scala */
/* loaded from: input_file:com/paypal/dione/spark/index/IndexReader$.class */
public final class IndexReader$ implements Serializable {
    public static IndexReader$ MODULE$;
    private final Logger logger;

    static {
        new IndexReader$();
    }

    private Logger logger() {
        return this.logger;
    }

    public Dataset<Row> read(Dataset<Row> dataset, IndexReader indexReader) {
        Dataset<Row> drop = dataset.drop((Seq) indexReader.fieldsSchema().map(structField -> {
            return structField.name();
        }, Seq$.MODULE$.canBuildFrom()));
        return (new StringOps(Predef$.MODULE$.augmentString(dataset.sparkSession().conf().get("indexer.reader.chunks", "false"))).toBoolean() ? preparePartitionsInChunks(drop) : drop.repartition(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("data_filename")})).sortWithinPartitions("data_filename", Predef$.MODULE$.wrapRefArray(new String[]{"data_offset", "data_sub_offset"}))).select(Predef$.MODULE$.wrapRefArray((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(drop.columns())).map(str -> {
            return functions$.MODULE$.col(str);
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Column.class))))).mapPartitions(iterator -> {
            return indexReader.mapPartitions(iterator);
        }, RowEncoder$.MODULE$.apply(StructType$.MODULE$.apply((Seq) drop.schema().$plus$plus(indexReader.fieldsSchema(), Seq$.MODULE$.canBuildFrom()))));
    }

    public StatsReporter getReporter(SparkSession sparkSession) {
        String str = sparkSession.conf().get("indexer.reader.reporter", (String) null);
        return str == null ? new StatsReporter.SoftReporter() : "none".equals(str) ? new StatsReporter() : (StatsReporter) Class.forName(str).getConstructor(new Class[0]).newInstance(new Object[0]);
    }

    public Dataset<Row> preparePartitionsInChunks(Dataset<Row> dataset) {
        SparkSession sparkSession = dataset.sparkSession();
        boolean z = new StringOps(Predef$.MODULE$.augmentString(sparkSession.conf().get("indexer.reader.debug", "false"))).toBoolean();
        Dataset<Row> sortWithinPartitions = sumPartitionBytes(dataset.repartition(package$.MODULE$.max(new StringOps(Predef$.MODULE$.augmentString(sparkSession.conf().get("spark.sql.shuffle.partitions"))).toInt() / new StringOps(Predef$.MODULE$.augmentString(sparkSession.conf().get("indexer.reader.repartitionRatio", "100"))).toInt(), 1), Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.hash(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("data_filename")}))})).sortWithinPartitions("data_filename", Predef$.MODULE$.wrapRefArray(new String[]{"data_offset", "data_sub_offset"}))).withColumn("__chunk", sparkSession.implicits().StringToColumn(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"__sum"}))).$(Nil$.MODULE$).$div(BoxesRunTime.boxToInteger(new StringOps(Predef$.MODULE$.augmentString(sparkSession.conf().get("indexer.reader.maxBytes", BoxesRunTime.boxToInteger(20971520).toString()))).toInt())).cast("int")).repartition(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.hash(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.spark_partition_id()})), functions$.MODULE$.hash(Predef$.MODULE$.wrapRefArray(new Column[]{sparkSession.implicits().StringToColumn(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"__chunk"}))).$(Nil$.MODULE$)}))})).sortWithinPartitions("data_filename", Predef$.MODULE$.wrapRefArray(new String[]{"data_offset", "data_sub_offset"}));
        if (!z) {
            return sortWithinPartitions.select(Predef$.MODULE$.wrapRefArray((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(dataset.columns())).map(str -> {
                return functions$.MODULE$.col(str);
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Column.class)))));
        }
        printChunksStats(sortWithinPartitions);
        return sortWithinPartitions;
    }

    public Dataset<Row> sumPartitionBytes(Dataset<Row> dataset) {
        LazyRef lazyRef = new LazyRef();
        int i = new StringOps(Predef$.MODULE$.augmentString(dataset.sparkSession().conf().get("indexer.reader.newFileCostInBytes", BoxesRunTime.boxToInteger(1048576).toString()))).toInt();
        return dataset.mapPartitions(iterator -> {
            return iterator.scanLeft(this.Accumulator$3(lazyRef).apply(Row$.MODULE$.apply(Nil$.MODULE$), (String) null, 0L, 0L), (indexReader$Accumulator$1, row) -> {
                Tuple2 tuple2 = new Tuple2(indexReader$Accumulator$1, row);
                if (tuple2 != null) {
                    IndexReader$Accumulator$1 indexReader$Accumulator$1 = (IndexReader$Accumulator$1) tuple2._1();
                    Row row = (Row) tuple2._2();
                    if (indexReader$Accumulator$1 != null) {
                        String file = indexReader$Accumulator$1.file();
                        long accSum = indexReader$Accumulator$1.accSum();
                        long count = indexReader$Accumulator$1.count();
                        String obj = row.getAs("data_filename").toString();
                        long unboxToInt = accSum + BoxesRunTime.unboxToInt(row.getAs("data_size")) + ((obj != null ? !obj.equals(file) : file != null) ? i : 0);
                        return this.Accumulator$3(lazyRef).apply(Row$.MODULE$.fromSeq((Seq) row.toSeq().$plus$plus(Seq$.MODULE$.apply(Predef$.MODULE$.wrapLongArray(new long[]{unboxToInt})), Seq$.MODULE$.canBuildFrom())), obj, unboxToInt, count + 1);
                    }
                }
                throw new MatchError(tuple2);
            }).drop(1).map(indexReader$Accumulator$12 -> {
                return indexReader$Accumulator$12.row();
            });
        }, RowEncoder$.MODULE$.apply(dataset.schema().add("__sum", LongType$.MODULE$)));
    }

    private void printChunksStats(Dataset<Row> dataset) {
        SparkSession sparkSession = dataset.sparkSession();
        dataset.explain();
        logger().info(new StringBuilder(18).append("number of chunks: ").append(dataset.select(Predef$.MODULE$.wrapRefArray(new Column[]{sparkSession.implicits().StringToColumn(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"__chunk"}))).$(Nil$.MODULE$)})).distinct().count()).toString());
        logger().info("chunks distribution:");
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Tuple2[]) dataset.rdd().mapPartitions(iterator -> {
            Tuple2 tuple2 = (Tuple2) ((LinearSeqOptimized) iterator.toList().map(row -> {
                return BoxesRunTime.boxToInteger($anonfun$printChunksStats$2(row));
            }, List$.MODULE$.canBuildFrom())).foldLeft(new Tuple2.mcJI.sp(0L, 0), (tuple22, obj) -> {
                return $anonfun$printChunksStats$3(tuple22, BoxesRunTime.unboxToInt(obj));
            });
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Tuple2.mcJI.sp spVar = new Tuple2.mcJI.sp(tuple2._1$mcJ$sp(), tuple2._2$mcI$sp());
            return scala.package$.MODULE$.Iterator().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2.mcJI.sp(spVar._1$mcJ$sp(), spVar._2$mcI$sp())}));
        }, dataset.rdd().mapPartitions$default$2(), ClassTag$.MODULE$.apply(Tuple2.class)).collect())).foreach(tuple2 -> {
            $anonfun$printChunksStats$4(tuple2);
            return BoxedUnit.UNIT;
        });
    }

    public IndexReader apply(SparkSession sparkSession, SparkIndexer sparkIndexer, StructType structType, boolean z) {
        return new IndexReader(sparkSession, sparkIndexer, structType, z);
    }

    public Option<Tuple4<SparkSession, SparkIndexer, StructType, Object>> unapply(IndexReader indexReader) {
        return indexReader == null ? None$.MODULE$ : new Some(new Tuple4(indexReader.spark(), indexReader.sparkIndexer(), indexReader.fieldsSchema(), BoxesRunTime.boxToBoolean(indexReader.ignoreFailures())));
    }

    private Object readResolve() {
        return MODULE$;
    }

    private static final /* synthetic */ IndexReader$Accumulator$2$ Accumulator$lzycompute$1(LazyRef lazyRef) {
        IndexReader$Accumulator$2$ indexReader$Accumulator$2$;
        synchronized (lazyRef) {
            indexReader$Accumulator$2$ = lazyRef.initialized() ? (IndexReader$Accumulator$2$) lazyRef.value() : (IndexReader$Accumulator$2$) lazyRef.initialize(new IndexReader$Accumulator$2$());
        }
        return indexReader$Accumulator$2$;
    }

    private final IndexReader$Accumulator$2$ Accumulator$3(LazyRef lazyRef) {
        return lazyRef.initialized() ? (IndexReader$Accumulator$2$) lazyRef.value() : Accumulator$lzycompute$1(lazyRef);
    }

    public static final /* synthetic */ int $anonfun$printChunksStats$2(Row row) {
        return BoxesRunTime.unboxToInt(row.getAs("data_size"));
    }

    public static final /* synthetic */ Tuple2 $anonfun$printChunksStats$3(Tuple2 tuple2, int i) {
        Tuple2 tuple22 = new Tuple2(tuple2, BoxesRunTime.boxToInteger(i));
        if (tuple22 != null) {
            Tuple2 tuple23 = (Tuple2) tuple22._1();
            int _2$mcI$sp = tuple22._2$mcI$sp();
            if (tuple23 != null) {
                return new Tuple2.mcJI.sp(tuple23._1$mcJ$sp() + _2$mcI$sp, tuple23._2$mcI$sp() + 1);
            }
        }
        throw new MatchError(tuple22);
    }

    public static final /* synthetic */ void $anonfun$printChunksStats$4(Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        long _1$mcJ$sp = tuple2._1$mcJ$sp();
        MODULE$.logger().info(new StringBuilder(28).append("partition size, length: [").append(_1$mcJ$sp).append(", ").append(tuple2._2$mcI$sp()).append("]").toString());
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    private IndexReader$() {
        MODULE$ = this;
        this.logger = LoggerFactory.getLogger(getClass());
    }
}
