package org.apache.paimon.flink.action;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/action/SortCompactAction.class */
public class SortCompactAction extends CompactAction {
    private String sortStrategy;
    private List<String> orderColumns;

    public SortCompactAction(String str, String str2, String str3, Map<String, String> map) {
        super(str, str2, str3, map);
        Preconditions.checkArgument(this.table instanceof AppendOnlyFileStoreTable, "Only sort compaction works with append-only table for now.");
        this.table = this.table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
    }

    @Override // org.apache.paimon.flink.action.CompactAction, org.apache.paimon.flink.action.Action
    public void run() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        build(executionEnvironment);
        execute(executionEnvironment, "Sort Compact Job");
    }

    @Override // org.apache.paimon.flink.action.CompactAction
    public void build(StreamExecutionEnvironment streamExecutionEnvironment) {
        if (streamExecutionEnvironment.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) != RuntimeExecutionMode.BATCH) {
            throw new IllegalArgumentException("Only support batch mode yet, please set -Dexecution.runtime-mode=BATCH");
        }
        FileStoreTable fileStoreTable = (FileStoreTable) this.table;
        if (!(fileStoreTable instanceof AppendOnlyFileStoreTable)) {
            throw new IllegalArgumentException("Sort Compact only supports append-only table yet");
        }
        if (fileStoreTable.bucketMode() != BucketMode.UNAWARE) {
            throw new IllegalArgumentException("Sort Compact only supports bucket=-1 yet.");
        }
        Map<String, String> options = fileStoreTable.options();
        FlinkSourceBuilder flinkSourceBuilder = new FlinkSourceBuilder(ObjectIdentifier.of(this.catalogName, this.identifier.getDatabaseName(), this.identifier.getObjectName()), fileStoreTable);
        if (getPartitions() != null) {
            flinkSourceBuilder.withPredicate(PredicateBuilder.or((Predicate[]) getPartitions().stream().map(map -> {
                return PredicateBuilder.partition(map, this.table.rowType());
            }).toArray(i -> {
                return new Predicate[i];
            })));
        }
        String str = options.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (str != null) {
            flinkSourceBuilder.withParallelism(Integer.valueOf(Integer.parseInt(str)));
        }
        DataStream<RowData> sort = TableSorter.getSorter(streamExecutionEnvironment, flinkSourceBuilder.withEnv(streamExecutionEnvironment).withContinuousMode(false).build(), fileStoreTable, this.sortStrategy, this.orderColumns).sort();
        FlinkSinkBuilder flinkSinkBuilder = new FlinkSinkBuilder(fileStoreTable);
        flinkSinkBuilder.withInput(sort).withOverwritePartition(new HashMap());
        String str2 = options.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (str2 != null) {
            flinkSinkBuilder.withParallelism(Integer.valueOf(Integer.parseInt(str2)));
        }
        flinkSinkBuilder.build();
    }

    public void withOrderStrategy(String str) {
        this.sortStrategy = str;
    }

    public void withOrderColumns(List<String> list) {
        this.orderColumns = list;
    }
}
