/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.ComputedColumnUtils;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema;
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;

public class KafkaSyncTableAction
extends ActionBase {
    private final Configuration kafkaConfig;
    private final String database;
    private final String table;
    private final List<String> partitionKeys;
    private final List<String> primaryKeys;
    private final List<String> computedColumnArgs;
    private final Map<String, String> paimonConfig;

    public KafkaSyncTableAction(Map<String, String> kafkaConfig, String warehouse, String database, String table, List<String> partitionKeys, List<String> primaryKeys, Map<String, String> catalogConfig, Map<String, String> paimonConfig) {
        this(kafkaConfig, warehouse, database, table, partitionKeys, primaryKeys, Collections.emptyList(), catalogConfig, paimonConfig);
    }

    public KafkaSyncTableAction(Map<String, String> kafkaConfig, String warehouse, String database, String table, List<String> partitionKeys, List<String> primaryKeys, List<String> computedColumnArgs, Map<String, String> catalogConfig, Map<String, String> paimonConfig) {
        super(warehouse, catalogConfig);
        this.kafkaConfig = Configuration.fromMap(kafkaConfig);
        this.database = database;
        this.table = table;
        this.partitionKeys = partitionKeys;
        this.primaryKeys = primaryKeys;
        this.computedColumnArgs = computedColumnArgs;
        this.paimonConfig = paimonConfig;
    }

    public void build(StreamExecutionEnvironment env) throws Exception {
        FileStoreTable table;
        KafkaSource<String> source = KafkaActionUtils.buildKafkaSource(this.kafkaConfig);
        String topic = (String)((List)this.kafkaConfig.get(KafkaConnectorOptions.TOPIC)).get(0);
        KafkaSchema kafkaSchema = KafkaSchema.getKafkaSchema(this.kafkaConfig, topic);
        this.catalog.createDatabase(this.database, true);
        boolean caseSensitive = this.catalog.caseSensitive();
        Identifier identifier = new Identifier(this.database, this.table);
        List<ComputedColumn> computedColumns = ComputedColumnUtils.buildComputedColumns(this.computedColumnArgs, kafkaSchema.fields());
        Schema fromCanal = KafkaActionUtils.buildPaimonSchema(kafkaSchema, this.partitionKeys, this.primaryKeys, computedColumns, this.paimonConfig, caseSensitive);
        try {
            table = (FileStoreTable)this.catalog.getTable(identifier);
            KafkaActionUtils.assertSchemaCompatible(table.schema(), fromCanal);
        }
        catch (Catalog.TableNotExistException e) {
            this.catalog.createTable(identifier, fromCanal, false);
            table = (FileStoreTable)this.catalog.getTable(identifier);
        }
        DataFormat format = DataFormat.getDataFormat(this.kafkaConfig);
        RecordParser recordParser = format.createParser(caseSensitive, new TableNameConverter(caseSensitive), computedColumns);
        EventParser.Factory parserFactory = RichCdcMultiplexRecordEventParser::new;
        CdcSinkBuilder sinkBuilder = new CdcSinkBuilder().withInput(env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").flatMap((FlatMapFunction)recordParser)).withParserFactory(parserFactory).withTable(table).withIdentifier(identifier).withCatalogLoader(this.catalogLoader());
        String sinkParallelism = this.paimonConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (sinkParallelism != null) {
            sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
        }
        sinkBuilder.build();
    }

    @Override
    public void run() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.build(env);
        env.execute(String.format("Kafka-Paimon Table Sync: %s.%s", this.database, this.table));
    }
}

