package org.apache.seatunnel.flink.sink;

import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.KafkaValidator;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.seatunnel.common.PropertiesUtil;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.flink.util.SchemaUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

/* loaded from: input_file:org/apache/seatunnel/flink/sink/KafkaTable.class */
public class KafkaTable implements FlinkStreamSink<Row, Row> {
    private static final long serialVersionUID = 3980751499724935230L;
    private Config config;
    private Properties kafkaParams = new Properties();
    private String topic;

    @Nullable
    public DataStreamSink<Row> outputStream(FlinkEnvironment flinkEnvironment, DataStream<Row> dataStream) {
        StreamTableEnvironment streamTableEnvironment = flinkEnvironment.getStreamTableEnvironment();
        insert(streamTableEnvironment, streamTableEnvironment.fromDataStream(dataStream));
        return null;
    }

    private void insert(TableEnvironment tableEnvironment, Table table) {
        Schema schema = getSchema(table.getSchema().getFieldTypes(), table.getSchema().getFieldNames());
        String uniqueTableName = SchemaUtil.getUniqueTableName();
        tableEnvironment.connect(getKafkaConnect()).withSchema(schema).withFormat(setFormat()).inAppendMode().createTemporaryTable(uniqueTableName);
        table.insertInto(uniqueTableName);
    }

    private Schema getSchema(TypeInformation<?>[] typeInformationArr, String[] strArr) {
        Schema schema = new Schema();
        for (int i = 0; i < typeInformationArr.length; i++) {
            schema.field(strArr[i], typeInformationArr[i]);
        }
        return schema;
    }

    private Kafka getKafkaConnect() {
        Kafka version = new Kafka().version(KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL);
        version.topic(this.topic);
        version.properties(this.kafkaParams);
        return version;
    }

    private FormatDescriptor setFormat() {
        return new Json().failOnMissingField(false).deriveSchema();
    }

    public void setConfig(Config config) {
        this.config = config;
    }

    public Config getConfig() {
        return this.config;
    }

    public CheckResult checkConfig() {
        return CheckConfigUtil.checkAllExists(this.config, new String[]{ConsumerProtocol.TOPICS_KEY_NAME});
    }

    public void prepare(FlinkEnvironment flinkEnvironment) {
        this.topic = this.config.getString(ConsumerProtocol.TOPICS_KEY_NAME);
        PropertiesUtil.setProperties(this.config, this.kafkaParams, "producer.", false);
        this.kafkaParams.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.kafkaParams.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
    }
}
