package org.apache.iceberg.spark.actions;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFunctionCatalog;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SortOrderUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.distributions.Distributions;
import org.apache.spark.sql.connector.distributions.OrderedDistribution;
import org.apache.spark.sql.connector.expressions.SortOrder;
import org.apache.spark.sql.connector.write.RequiresDistributionAndOrdering;
import org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$;
import scala.Option;

/* loaded from: input_file:org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.class */
abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
    public static final String COMPRESSION_FACTOR = "compression-factor";
    public static final double COMPRESSION_FACTOR_DEFAULT = 1.0d;
    private double compressionFactor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iceberg/spark/actions/SparkShufflingDataRewriter$OrderedWrite.class */
    public static class OrderedWrite implements RequiresDistributionAndOrdering {
        private final OrderedDistribution distribution;
        private final SortOrder[] ordering;
        private final int numShufflePartitions;

        OrderedWrite(SortOrder[] sortOrderArr, int i) {
            this.distribution = Distributions.ordered(sortOrderArr);
            this.ordering = sortOrderArr;
            this.numShufflePartitions = i;
        }

        public Distribution requiredDistribution() {
            return this.distribution;
        }

        public boolean distributionStrictlyRequired() {
            return true;
        }

        public int requiredNumPartitions() {
            return this.numShufflePartitions;
        }

        public SortOrder[] requiredOrdering() {
            return this.ordering;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SparkShufflingDataRewriter(SparkSession sparkSession, Table table) {
        super(sparkSession, table);
    }

    protected abstract org.apache.iceberg.SortOrder sortOrder();

    protected abstract Dataset<Row> sortedDF(Dataset<Row> dataset, Function<Dataset<Row>, Dataset<Row>> function);

    @Override // org.apache.iceberg.actions.SizeBasedDataRewriter, org.apache.iceberg.actions.SizeBasedFileRewriter, org.apache.iceberg.actions.FileRewriter
    public Set<String> validOptions() {
        return ImmutableSet.builder().addAll((Iterable) super.validOptions()).add((ImmutableSet.Builder) COMPRESSION_FACTOR).build();
    }

    @Override // org.apache.iceberg.actions.SizeBasedDataRewriter, org.apache.iceberg.actions.SizeBasedFileRewriter, org.apache.iceberg.actions.FileRewriter
    public void init(Map<String, String> map) {
        super.init(map);
        this.compressionFactor = compressionFactor(map);
    }

    @Override // org.apache.iceberg.spark.actions.SparkSizeBasedDataRewriter
    public void doRewrite(String str, List<FileScanTask> list) {
        sortedDF(spark().read().format("iceberg").option(SparkReadOptions.SCAN_TASK_SET_ID, str).load(str), sortFunction(list)).write().format("iceberg").option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, str).option("target-file-size-bytes", writeMaxFileSize()).option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false").mode(DataOperations.APPEND).save(str);
    }

    private Function<Dataset<Row>, Dataset<Row>> sortFunction(List<FileScanTask> list) {
        SortOrder[] ordering = Spark3Util.toOrdering(outputSortOrder(list));
        int numShufflePartitions = numShufflePartitions(list);
        return dataset -> {
            return transformPlan(dataset, logicalPlan -> {
                return sortPlan(logicalPlan, ordering, numShufflePartitions);
            });
        };
    }

    private LogicalPlan sortPlan(LogicalPlan logicalPlan, SortOrder[] sortOrderArr, int i) {
        SparkFunctionCatalog sparkFunctionCatalog = SparkFunctionCatalog.get();
        return DistributionAndOrderingUtils$.MODULE$.prepareQuery(new OrderedWrite(sortOrderArr, i), logicalPlan, Option.apply(sparkFunctionCatalog));
    }

    private Dataset<Row> transformPlan(Dataset<Row> dataset, Function<LogicalPlan, LogicalPlan> function) {
        return new Dataset<>(spark(), function.apply(dataset.logicalPlan()), dataset.encoder());
    }

    private org.apache.iceberg.SortOrder outputSortOrder(List<FileScanTask> list) {
        return !list.get(0).spec().equals(table().spec()) ? SortOrderUtil.buildSortOrder(table(), sortOrder()) : sortOrder();
    }

    private int numShufflePartitions(List<FileScanTask> list) {
        return Math.max(1, (int) numOutputFiles((long) (inputSize(list) * this.compressionFactor)));
    }

    private double compressionFactor(Map<String, String> map) {
        double propertyAsDouble = PropertyUtil.propertyAsDouble(map, COMPRESSION_FACTOR, 1.0d);
        Preconditions.checkArgument(propertyAsDouble > 0.0d, "'%s' is set to %s but must be > 0", COMPRESSION_FACTOR, Double.valueOf(propertyAsDouble));
        return propertyAsDouble;
    }
}
