package org.apache.paimon.flink.action.cdc.kafka;

import java.lang.invoke.SerializedLambda;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.ComputedColumnUtils;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.class */
public class KafkaSyncTableAction extends ActionBase {
    private final Configuration kafkaConfig;
    private final String database;
    private final String table;
    private final List<String> partitionKeys;
    private final List<String> primaryKeys;
    private final List<String> computedColumnArgs;
    private final Map<String, String> paimonConfig;

    public KafkaSyncTableAction(Map<String, String> map, String str, String str2, String str3, List<String> list, List<String> list2, Map<String, String> map2, Map<String, String> map3) {
        this(map, str, str2, str3, list, list2, Collections.emptyList(), map2, map3);
    }

    public KafkaSyncTableAction(Map<String, String> map, String str, String str2, String str3, List<String> list, List<String> list2, List<String> list3, Map<String, String> map2, Map<String, String> map3) {
        super(str, map2);
        this.kafkaConfig = Configuration.fromMap(map);
        this.database = str2;
        this.table = str3;
        this.partitionKeys = list;
        this.primaryKeys = list2;
        this.computedColumnArgs = list3;
        this.paimonConfig = map3;
    }

    public void build(StreamExecutionEnvironment streamExecutionEnvironment) throws Exception {
        FileStoreTable fileStoreTable;
        KafkaSource<String> buildKafkaSource = KafkaActionUtils.buildKafkaSource(this.kafkaConfig);
        KafkaSchema kafkaSchema = KafkaSchema.getKafkaSchema(this.kafkaConfig, (String) ((List) this.kafkaConfig.get(KafkaConnectorOptions.TOPIC)).get(0));
        this.catalog.createDatabase(this.database, true);
        boolean caseSensitive = this.catalog.caseSensitive();
        Identifier identifier = new Identifier(this.database, this.table);
        List<ComputedColumn> buildComputedColumns = ComputedColumnUtils.buildComputedColumns(this.computedColumnArgs, kafkaSchema.fields());
        Schema buildPaimonSchema = KafkaActionUtils.buildPaimonSchema(kafkaSchema, this.partitionKeys, this.primaryKeys, buildComputedColumns, this.paimonConfig, caseSensitive);
        try {
            fileStoreTable = (FileStoreTable) this.catalog.getTable(identifier);
            KafkaActionUtils.assertSchemaCompatible(fileStoreTable.schema(), buildPaimonSchema);
        } catch (Catalog.TableNotExistException e) {
            this.catalog.createTable(identifier, buildPaimonSchema, false);
            fileStoreTable = (FileStoreTable) this.catalog.getTable(identifier);
        }
        RecordParser createParser = DataFormat.getDataFormat(this.kafkaConfig).createParser(caseSensitive, new TableNameConverter(caseSensitive), buildComputedColumns);
        CdcSinkBuilder withCatalogLoader = new CdcSinkBuilder().withInput(streamExecutionEnvironment.fromSource(buildKafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source").flatMap(createParser)).withParserFactory(RichCdcMultiplexRecordEventParser::new).withTable(fileStoreTable).withIdentifier(identifier).withCatalogLoader(catalogLoader());
        String str = this.paimonConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (str != null) {
            withCatalogLoader.withParallelism(Integer.valueOf(Integer.parseInt(str)));
        }
        withCatalogLoader.build();
    }

    @Override // org.apache.paimon.flink.action.Action
    public void run() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        build(executionEnvironment);
        executionEnvironment.execute(String.format("Kafka-Paimon Table Sync: %s.%s", this.database, this.table));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/cdc/EventParser$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/paimon/flink/sink/cdc/EventParser;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return RichCdcMultiplexRecordEventParser::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
