/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.mongodb;

import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils;
import org.apache.paimon.flink.action.cdc.mongodb.MongoDBRecordParser;
import org.apache.paimon.flink.action.cdc.mongodb.MongodbSchema;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

public class MongoDBSyncTableAction
extends ActionBase {
    public final Configuration mongodbConfig;
    public final String database;
    public final String table;
    public final List<String> partitionKeys;
    public final Map<String, String> tableConfig;

    public MongoDBSyncTableAction(Map<String, String> mongodbConfig, String warehouse, String database, String table, List<String> partitionKeys, Map<String, String> catalogConfig, Map<String, String> tableConfig) {
        super(warehouse, catalogConfig);
        this.mongodbConfig = Configuration.fromMap(mongodbConfig);
        this.database = database;
        this.table = table;
        this.partitionKeys = partitionKeys;
        this.tableConfig = tableConfig;
    }

    public void build(StreamExecutionEnvironment env) throws Exception {
        FileStoreTable table;
        Preconditions.checkArgument(this.mongodbConfig.contains(MongoDBSourceOptions.COLLECTION), String.format("mongodb-conf [%s] must be specified.", MongoDBSourceOptions.COLLECTION.key()));
        String tableList = (String)this.mongodbConfig.get(MongoDBSourceOptions.DATABASE) + "\\." + (String)this.mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
        MongoDBSource<String> source = MongoDBActionUtils.buildMongodbSource(this.mongodbConfig, tableList);
        boolean caseSensitive = this.catalog.caseSensitive();
        if (!caseSensitive) {
            this.validateCaseInsensitive();
        }
        MongodbSchema mongodbSchema = MongodbSchema.getMongodbSchema(this.mongodbConfig);
        this.catalog.createDatabase(this.database, true);
        Identifier identifier = new Identifier(this.database, this.table);
        EventParser.Factory parserFactory = RichCdcMultiplexRecordEventParser::new;
        Schema fromMongodb = MongoDBActionUtils.buildPaimonSchema(mongodbSchema, this.partitionKeys, this.tableConfig, caseSensitive);
        try {
            table = (FileStoreTable)this.catalog.getTable(identifier);
        }
        catch (Exception e) {
            this.catalog.createTable(identifier, fromMongodb, false);
            table = (FileStoreTable)this.catalog.getTable(identifier);
        }
        CdcSinkBuilder sinkBuilder = new CdcSinkBuilder().withInput(env.fromSource(source, WatermarkStrategy.noWatermarks(), "MongoDB Source").flatMap((FlatMapFunction)new MongoDBRecordParser(caseSensitive, this.mongodbConfig))).withParserFactory(parserFactory).withTable(table).withIdentifier(identifier).withCatalogLoader(this.catalogLoader());
        String sinkParallelism = this.tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (sinkParallelism != null) {
            sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
        }
        sinkBuilder.build();
    }

    private void validateCaseInsensitive() {
        Preconditions.checkArgument(this.database.equals(this.database.toLowerCase()), String.format("Database name [%s] cannot contain upper case in case-insensitive catalog.", this.database));
        Preconditions.checkArgument(this.table.equals(this.table.toLowerCase()), String.format("Collection prefix [%s] cannot contain upper case in case-insensitive catalog.", this.table));
        for (String part : this.partitionKeys) {
            Preconditions.checkArgument(part.equals(part.toLowerCase()), String.format("Partition keys [%s] cannot contain upper case in case-insensitive catalog.", this.partitionKeys));
        }
    }

    @Override
    public void run() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.build(env);
        this.execute(env, String.format("MongoDB-Paimon Database Sync: %s", this.database));
    }
}

