package org.apache.paimon.flink.action.cdc.mysql;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import java.lang.invoke.SerializedLambda;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.ComputedColumnUtils;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.mergetree.compact.aggregate.FieldListaggAgg;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.class */
public class MySqlSyncTableAction extends ActionBase {
    private final String database;
    private final String table;
    private final Configuration mySqlConfig;
    private final List<String> partitionKeys;
    private final List<String> primaryKeys;
    private Map<String, String> tableConfig;
    private List<String> computedColumnArgs;
    private TypeMapping typeMapping;

    public MySqlSyncTableAction(String str, String str2, String str3, Map<String, String> map) {
        this(str, str2, str3, Collections.emptyMap(), map);
    }

    public MySqlSyncTableAction(String str, String str2, String str3, Map<String, String> map, Map<String, String> map2) {
        super(str, map);
        this.partitionKeys = new ArrayList();
        this.primaryKeys = new ArrayList();
        this.tableConfig = new HashMap();
        this.computedColumnArgs = new ArrayList();
        this.typeMapping = TypeMapping.defaultMapping();
        this.database = str2;
        this.table = str3;
        this.mySqlConfig = Configuration.fromMap(map2);
    }

    public MySqlSyncTableAction withPartitionKeys(String... strArr) {
        return withPartitionKeys(Arrays.asList(strArr));
    }

    public MySqlSyncTableAction withPartitionKeys(List<String> list) {
        this.partitionKeys.addAll(list);
        return this;
    }

    public MySqlSyncTableAction withPrimaryKeys(String... strArr) {
        return withPrimaryKeys(Arrays.asList(strArr));
    }

    public MySqlSyncTableAction withPrimaryKeys(List<String> list) {
        this.primaryKeys.addAll(list);
        return this;
    }

    public MySqlSyncTableAction withTableConfig(Map<String, String> map) {
        this.tableConfig = map;
        return this;
    }

    public MySqlSyncTableAction withComputedColumnArgs(List<String> list) {
        this.computedColumnArgs = list;
        return this;
    }

    public MySqlSyncTableAction withTypeMapping(TypeMapping typeMapping) {
        this.typeMapping = typeMapping;
        return this;
    }

    public void build(StreamExecutionEnvironment streamExecutionEnvironment) throws Exception {
        FileStoreTable fileStoreTable;
        Preconditions.checkArgument(this.mySqlConfig.contains(MySqlSourceOptions.TABLE_NAME), String.format("mysql-conf [%s] must be specified.", MySqlSourceOptions.TABLE_NAME.key()));
        boolean caseSensitive = this.catalog.caseSensitive();
        if (!caseSensitive) {
            validateCaseInsensitive();
        }
        MySqlSchemasInfo mySqlTableInfos = MySqlActionUtils.getMySqlTableInfos(this.mySqlConfig, monitorTablePredication(), new ArrayList(), this.typeMapping);
        validateMySqlTableInfos(mySqlTableInfos);
        this.catalog.createDatabase(this.database, true);
        MySqlTableInfo mergeAll = mySqlTableInfos.mergeAll();
        Identifier identifier = new Identifier(this.database, this.table);
        List<ComputedColumn> buildComputedColumns = ComputedColumnUtils.buildComputedColumns(this.computedColumnArgs, mergeAll.schema().typeMapping());
        Schema buildPaimonSchema = MySqlActionUtils.buildPaimonSchema(mergeAll, this.partitionKeys, this.primaryKeys, buildComputedColumns, this.tableConfig, caseSensitive);
        try {
            fileStoreTable = (FileStoreTable) this.catalog.getTable(identifier);
            if (buildComputedColumns.size() > 0) {
                List list = (List) buildComputedColumns.stream().map((v0) -> {
                    return v0.columnName();
                }).collect(Collectors.toList());
                List<String> fieldNames = fileStoreTable.schema().fieldNames();
                Preconditions.checkArgument(new HashSet(fieldNames).containsAll(list), " Exists Table should contain all computed columns %s, but are %s.", list, fieldNames);
            }
            MySqlActionUtils.assertSchemaCompatible(fileStoreTable.schema(), buildPaimonSchema);
        } catch (Catalog.TableNotExistException e) {
            this.catalog.createTable(identifier, buildPaimonSchema, false);
            fileStoreTable = (FileStoreTable) this.catalog.getTable(identifier);
        }
        MySqlSource<String> buildMySqlSource = MySqlActionUtils.buildMySqlSource(this.mySqlConfig, (String) mySqlTableInfos.pkTables().stream().map(identifier2 -> {
            return identifier2.getDatabaseName() + "\\." + identifier2.getObjectName();
        }).collect(Collectors.joining("|")));
        String str = (String) this.mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
        ZoneId systemDefault = str == null ? ZoneId.systemDefault() : ZoneId.of(str);
        TypeMapping typeMapping = this.typeMapping;
        CdcSinkBuilder withCatalogLoader = new CdcSinkBuilder().withInput(streamExecutionEnvironment.fromSource(buildMySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")).withParserFactory(() -> {
            return new MySqlDebeziumJsonEventParser(systemDefault, caseSensitive, buildComputedColumns, typeMapping);
        }).withTable(fileStoreTable).withIdentifier(identifier).withCatalogLoader(catalogLoader());
        String str2 = this.tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (str2 != null) {
            withCatalogLoader.withParallelism(Integer.valueOf(Integer.parseInt(str2)));
        }
        withCatalogLoader.build();
    }

    private void validateCaseInsensitive() {
        Preconditions.checkArgument(this.database.equals(this.database.toLowerCase()), String.format("Database name [%s] cannot contain upper case in case-insensitive catalog.", this.database));
        Preconditions.checkArgument(this.table.equals(this.table.toLowerCase()), String.format("Table name [%s] cannot contain upper case in case-insensitive catalog.", this.table));
        for (String str : this.partitionKeys) {
            Preconditions.checkArgument(str.equals(str.toLowerCase()), String.format("Partition keys [%s] cannot contain upper case in case-insensitive catalog.", this.partitionKeys));
        }
        for (String str2 : this.primaryKeys) {
            Preconditions.checkArgument(str2.equals(str2.toLowerCase()), String.format("Primary keys [%s] cannot contain upper case in case-insensitive catalog.", this.primaryKeys));
        }
    }

    private void validateMySqlTableInfos(MySqlSchemasInfo mySqlSchemasInfo) {
        List<Identifier> nonPkTables = mySqlSchemasInfo.nonPkTables();
        Preconditions.checkArgument(nonPkTables.isEmpty(), "Source tables of MySQL table synchronization job cannot contain table which doesn't have primary keys.\nThey are: %s", nonPkTables.stream().map((v0) -> {
            return v0.getFullName();
        }).collect(Collectors.joining(FieldListaggAgg.DELIMITER)));
        Preconditions.checkArgument(!mySqlSchemasInfo.pkTables().isEmpty(), "No table satisfies the given database name and table name.");
    }

    private Predicate<String> monitorTablePredication() {
        return str -> {
            return Pattern.compile((String) this.mySqlConfig.get(MySqlSourceOptions.TABLE_NAME)).matcher(str).matches();
        };
    }

    @Override // org.apache.paimon.flink.action.Action
    public void run() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        build(executionEnvironment);
        execute(executionEnvironment, String.format("MySQL-Paimon Table Sync: %s.%s", this.database, this.table));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1018582831:
                if (implMethodName.equals("lambda$build$9982ce0c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/cdc/EventParser$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/paimon/flink/sink/cdc/EventParser;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction") && serializedLambda.getImplMethodSignature().equals("(Ljava/time/ZoneId;ZLjava/util/List;Lorg/apache/paimon/flink/action/cdc/TypeMapping;)Lorg/apache/paimon/flink/sink/cdc/EventParser;")) {
                    ZoneId zoneId = (ZoneId) serializedLambda.getCapturedArg(0);
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(1)).booleanValue();
                    List list = (List) serializedLambda.getCapturedArg(2);
                    TypeMapping typeMapping = (TypeMapping) serializedLambda.getCapturedArg(3);
                    return () -> {
                        return new MySqlDebeziumJsonEventParser(zoneId, booleanValue, list, typeMapping);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
