package org.apache.seatunnel.connectors.seatunnel.kafka.sink;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.class */
public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaSinkState> {
    private final SinkWriter.Context context;
    private String transactionPrefix;
    private long lastCheckpointId = 0;
    private SeaTunnelRowType seaTunnelRowType;
    private final KafkaProduceSender<byte[], byte[]> kafkaProducerSender;
    private final SeaTunnelRowSerializer<byte[], byte[]> seaTunnelRowSerializer;
    private static final int PREFIX_RANGE = 10000;

    public KafkaSinkWriter(SinkWriter.Context context, SeaTunnelRowType seaTunnelRowType, Config config, List<KafkaSinkState> list) {
        this.context = context;
        this.seaTunnelRowType = seaTunnelRowType;
        if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS.key())) {
            MessageContentPartitioner.setAssignPartitions(config.getStringList(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS.key()));
        }
        if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX.key())) {
            this.transactionPrefix = config.getString(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX.key());
        } else {
            this.transactionPrefix = String.format("SeaTunnel%04d", Integer.valueOf(new Random().nextInt(PREFIX_RANGE)));
        }
        restoreState(list);
        this.seaTunnelRowSerializer = getSerializer(config, seaTunnelRowType);
        if (!KafkaSemantics.EXACTLY_ONCE.equals(getKafkaSemantics(config))) {
            this.kafkaProducerSender = new KafkaNoTransactionSender(getKafkaProperties(config));
            return;
        }
        this.kafkaProducerSender = new KafkaTransactionSender(this.transactionPrefix, getKafkaProperties(config));
        if (!list.isEmpty()) {
            this.kafkaProducerSender.abortTransaction(list.get(0).getCheckpointId() + 1);
        }
        this.kafkaProducerSender.beginTransaction(generateTransactionId(this.transactionPrefix, this.lastCheckpointId + 1));
    }

    public void write(SeaTunnelRow seaTunnelRow) {
        this.kafkaProducerSender.send(this.seaTunnelRowSerializer.serializeRow(seaTunnelRow));
    }

    public List<KafkaSinkState> snapshotState(long j) {
        List<KafkaSinkState> snapshotState = this.kafkaProducerSender.snapshotState(j);
        this.lastCheckpointId = j;
        this.kafkaProducerSender.beginTransaction(generateTransactionId(this.transactionPrefix, this.lastCheckpointId + 1));
        return snapshotState;
    }

    public Optional<KafkaCommitInfo> prepareCommit() {
        return this.kafkaProducerSender.prepareCommit();
    }

    public void abortPrepare() {
        this.kafkaProducerSender.abortTransaction();
    }

    public void close() {
        try {
            this.kafkaProducerSender.close();
        } catch (Exception e) {
            throw new KafkaConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, "Close kafka sink writer error", e);
        }
    }

    private Properties getKafkaProperties(Config config) {
        Properties properties = new Properties();
        if (CheckConfigUtil.isValidParam(config, org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG.key())) {
            config.getObject(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG.key()).forEach((str, configValue) -> {
                properties.put(str, configValue.unwrapped());
            });
        }
        if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS.key())) {
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.seatunnel.connectors.seatunnel.kafka.sink.MessageContentPartitioner");
        }
        properties.put("bootstrap.servers", config.getString("bootstrap.servers"));
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        return properties;
    }

    private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config config, SeaTunnelRowType seaTunnelRowType) {
        MessageFormat messageFormat = (MessageFormat) ReadonlyConfig.fromConfig(config).get(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT);
        String str = org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
        if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER.key())) {
            str = config.getString(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER.key());
        }
        String str2 = null;
        if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC.key())) {
            str2 = config.getString(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC.key());
        }
        return config.hasPath(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION.key()) ? DefaultSeaTunnelRowSerializer.create(str2, Integer.valueOf(config.getInt(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION.key())), seaTunnelRowType, messageFormat, str) : DefaultSeaTunnelRowSerializer.create(str2, getPartitionKeyFields(config, seaTunnelRowType), seaTunnelRowType, messageFormat, str);
    }

    private KafkaSemantics getKafkaSemantics(Config config) {
        return config.hasPath("semantics") ? (KafkaSemantics) config.getEnum(KafkaSemantics.class, "semantics") : KafkaSemantics.NON;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String generateTransactionId(String str, long j) {
        return str + "-" + j;
    }

    private void restoreState(List<KafkaSinkState> list) {
        if (list.isEmpty()) {
            return;
        }
        this.transactionPrefix = list.get(0).getTransactionIdPrefix();
        this.lastCheckpointId = list.get(0).getCheckpointId();
    }

    private List<String> getPartitionKeyFields(Config config, SeaTunnelRowType seaTunnelRowType) {
        if (!config.hasPath(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY_FIELDS.key())) {
            return Collections.emptyList();
        }
        List<String> stringList = config.getStringList(org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY_FIELDS.key());
        List asList = Arrays.asList(seaTunnelRowType.getFieldNames());
        for (String str : stringList) {
            if (!asList.contains(str)) {
                throw new KafkaConnectorException((SeaTunnelErrorCode) CommonErrorCode.ILLEGAL_ARGUMENT, String.format("Partition key field not found: %s, rowType: %s", str, asList));
            }
        }
        return stringList;
    }
}
