package org.apache.paimon.flink.action.cdc.mongodb;

import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.class */
public class MongoDBSyncTableAction extends ActionBase {
    public final Configuration mongodbConfig;
    public final String database;
    public final String table;
    public final List<String> partitionKeys;
    public final Map<String, String> tableConfig;

    public MongoDBSyncTableAction(Map<String, String> map, String str, String str2, String str3, List<String> list, Map<String, String> map2, Map<String, String> map3) {
        super(str, map2);
        this.mongodbConfig = Configuration.fromMap(map);
        this.database = str2;
        this.table = str3;
        this.partitionKeys = list;
        this.tableConfig = map3;
    }

    public void build(StreamExecutionEnvironment streamExecutionEnvironment) throws Exception {
        FileStoreTable fileStoreTable;
        Preconditions.checkArgument(this.mongodbConfig.contains(MongoDBSourceOptions.COLLECTION), String.format("mongodb-conf [%s] must be specified.", MongoDBSourceOptions.COLLECTION.key()));
        MongoDBSource<String> buildMongodbSource = MongoDBActionUtils.buildMongodbSource(this.mongodbConfig, ((String) this.mongodbConfig.get(MongoDBSourceOptions.DATABASE)) + "\\." + ((String) this.mongodbConfig.get(MongoDBSourceOptions.COLLECTION)));
        boolean caseSensitive = this.catalog.caseSensitive();
        if (!caseSensitive) {
            validateCaseInsensitive();
        }
        MongodbSchema mongodbSchema = MongodbSchema.getMongodbSchema(this.mongodbConfig);
        this.catalog.createDatabase(this.database, true);
        Identifier identifier = new Identifier(this.database, this.table);
        EventParser.Factory factory = RichCdcMultiplexRecordEventParser::new;
        Schema buildPaimonSchema = MongoDBActionUtils.buildPaimonSchema(mongodbSchema, this.partitionKeys, this.tableConfig, caseSensitive);
        try {
            fileStoreTable = (FileStoreTable) this.catalog.getTable(identifier);
        } catch (Exception e) {
            this.catalog.createTable(identifier, buildPaimonSchema, false);
            fileStoreTable = (FileStoreTable) this.catalog.getTable(identifier);
        }
        CdcSinkBuilder withCatalogLoader = new CdcSinkBuilder().withInput(streamExecutionEnvironment.fromSource(buildMongodbSource, WatermarkStrategy.noWatermarks(), "MongoDB Source").flatMap(new MongoDBRecordParser(caseSensitive, this.mongodbConfig))).withParserFactory(factory).withTable(fileStoreTable).withIdentifier(identifier).withCatalogLoader(catalogLoader());
        String str = this.tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (str != null) {
            withCatalogLoader.withParallelism(Integer.valueOf(Integer.parseInt(str)));
        }
        withCatalogLoader.build();
    }

    private void validateCaseInsensitive() {
        Preconditions.checkArgument(this.database.equals(this.database.toLowerCase()), String.format("Database name [%s] cannot contain upper case in case-insensitive catalog.", this.database));
        Preconditions.checkArgument(this.table.equals(this.table.toLowerCase()), String.format("Collection prefix [%s] cannot contain upper case in case-insensitive catalog.", this.table));
        for (String str : this.partitionKeys) {
            Preconditions.checkArgument(str.equals(str.toLowerCase()), String.format("Partition keys [%s] cannot contain upper case in case-insensitive catalog.", this.partitionKeys));
        }
    }

    @Override // org.apache.paimon.flink.action.Action
    public void run() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        build(executionEnvironment);
        execute(executionEnvironment, String.format("MongoDB-Paimon Database Sync: %s", this.database));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/cdc/EventParser$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/paimon/flink/sink/cdc/EventParser;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return RichCdcMultiplexRecordEventParser::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
