package org.apache.paimon.flink.action;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/action/CompactDatabaseAction.class */
public class CompactDatabaseAction extends ActionBase {
    private static final Logger LOG = LoggerFactory.getLogger(CompactDatabaseAction.class);
    private final Pattern includingPattern;
    private final Pattern excludingPattern;
    private final String database;
    private final Map<String, Table> tableMap;

    public CompactDatabaseAction(String str, String str2, @Nullable String str3, @Nullable String str4, Map<String, String> map) {
        super(str, map);
        this.tableMap = new HashMap();
        this.database = str2;
        this.includingPattern = Pattern.compile(str3 == null ? ".*" : str3);
        this.excludingPattern = str4 == null ? null : Pattern.compile(str4);
    }

    private boolean shouldCompactionTable(String str) {
        boolean matches = this.includingPattern.matcher(str).matches();
        if (this.excludingPattern != null) {
            matches = matches && !this.excludingPattern.matcher(str).matches();
        }
        if (!matches) {
            LOG.debug("Source table '{}' is excluded.", str);
        }
        return matches;
    }

    public void build(StreamExecutionEnvironment streamExecutionEnvironment) {
        try {
            Pattern compile = Pattern.compile(this.database);
            for (String str : this.catalog.listDatabases()) {
                if (compile.matcher(str).matches()) {
                    for (String str2 : this.catalog.listTables(str)) {
                        String format = String.format("%s.%s", str, str2);
                        if (shouldCompactionTable(format)) {
                            Table table = this.catalog.getTable(Identifier.create(str, str2));
                            if (table instanceof FileStoreTable) {
                                this.tableMap.put(format, table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false")));
                            } else {
                                LOG.error(String.format("Only FileStoreTable supports compact action. The table type is '%s'.", table.getClass().getName()));
                            }
                        } else {
                            LOG.debug("The table {} is excluded.", format);
                        }
                    }
                }
            }
            boolean z = StreamExecutionEnvironmentUtils.getConfiguration(streamExecutionEnvironment).get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
            for (Map.Entry<String, Table> entry : this.tableMap.entrySet()) {
                FileStoreTable fileStoreTable = (FileStoreTable) entry.getValue();
                switch (fileStoreTable.bucketMode()) {
                    case UNAWARE:
                        buildForUnawareBucketCompaction(streamExecutionEnvironment, entry.getKey(), (AppendOnlyFileStoreTable) entry.getValue(), z);
                        break;
                    case FIXED:
                    case DYNAMIC:
                    default:
                        buildForTraditionalCompaction(streamExecutionEnvironment, entry.getKey(), fileStoreTable, z);
                        break;
                }
            }
        } catch (Catalog.DatabaseNotExistException | Catalog.TableNotExistException e) {
            throw new RuntimeException(e);
        }
    }

    private void buildForTraditionalCompaction(StreamExecutionEnvironment streamExecutionEnvironment, String str, FileStoreTable fileStoreTable, boolean z) {
        new CompactorSinkBuilder(fileStoreTable).withInput(new CompactorSourceBuilder(str, fileStoreTable).withEnv(streamExecutionEnvironment).withContinuousMode(z).build()).build();
    }

    private void buildForUnawareBucketCompaction(StreamExecutionEnvironment streamExecutionEnvironment, String str, AppendOnlyFileStoreTable appendOnlyFileStoreTable, boolean z) {
        UnawareBucketCompactionTopoBuilder unawareBucketCompactionTopoBuilder = new UnawareBucketCompactionTopoBuilder(streamExecutionEnvironment, str, appendOnlyFileStoreTable);
        unawareBucketCompactionTopoBuilder.withContinuousMode(z);
        unawareBucketCompactionTopoBuilder.build();
    }

    @Override // org.apache.paimon.flink.action.Action
    public void run() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        build(executionEnvironment);
        execute(executionEnvironment, "Compact database job");
    }
}
