package io.smallrye.reactive.messaging.kafka.fault;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.ConfigurationCleaner;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.reactive.messaging.Metadata;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.class */
public class KafkaDeadLetterQueue implements KafkaFailureHandler {
    public static final String DEAD_LETTER_EXCEPTION_CLASS_NAME = "dead-letter-exception-class-name";
    public static final String DEAD_LETTER_CAUSE_CLASS_NAME = "dead-letter-cause-class-name";
    public static final String DEAD_LETTER_REASON = "dead-letter-reason";
    public static final String DEAD_LETTER_CAUSE = "dead-letter-cause";
    public static final String DEAD_LETTER_TOPIC = "dead-letter-topic";
    public static final String DEAD_LETTER_OFFSET = "dead-letter-offset";
    public static final String DEAD_LETTER_PARTITION = "dead-letter-partition";
    private final String channel;
    private final KafkaProducer producer;
    private final String topic;
    private final BiConsumer<Throwable, Boolean> reportFailure;

    @ApplicationScoped
    @Identifier(KafkaFailureHandler.Strategy.DEAD_LETTER_QUEUE)
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue$Factory.class */
    public static class Factory implements KafkaFailureHandler.Factory {

        @Inject
        KafkaCDIEvents kafkaCDIEvents;

        @Override // io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler.Factory
        public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Vertx vertx, KafkaConsumer<?, ?> kafkaConsumer, BiConsumer<Throwable, Boolean> biConsumer) {
            HashMap hashMap = new HashMap();
            Map<String, ?> configuration = kafkaConsumer.configuration();
            configuration.forEach((str, obj) -> {
                hashMap.put(str, (String) obj);
            });
            String str2 = (String) hashMap.remove("key.deserializer");
            String str3 = (String) hashMap.remove("value.deserializer");
            hashMap.remove("interceptor.classes");
            hashMap.put("key.serializer", kafkaConnectorIncomingConfiguration.getDeadLetterQueueKeySerializer().orElse(KafkaDeadLetterQueue.getMirrorSerializer(str2)));
            hashMap.put("value.serializer", kafkaConnectorIncomingConfiguration.getDeadLetterQueueValueSerializer().orElse(KafkaDeadLetterQueue.getMirrorSerializer(str3)));
            hashMap.put("client.id", kafkaConnectorIncomingConfiguration.getDeadLetterQueueProducerClientId().orElse("kafka-dead-letter-topic-producer-" + configuration.get("client.id")));
            ConfigurationCleaner.cleanupProducerConfiguration(hashMap);
            String orElse = kafkaConnectorIncomingConfiguration.getDeadLetterQueueTopic().orElse("dead-letter-topic-" + kafkaConnectorIncomingConfiguration.getChannel());
            KafkaLogging.log.deadLetterConfig(orElse, (String) hashMap.get("key.serializer"), (String) hashMap.get("value.serializer"));
            ReactiveKafkaProducer reactiveKafkaProducer = new ReactiveKafkaProducer(hashMap, orElse, 10000, null, null);
            this.kafkaCDIEvents.producer().fire(reactiveKafkaProducer.unwrap());
            return new KafkaDeadLetterQueue(kafkaConnectorIncomingConfiguration.getChannel(), orElse, reactiveKafkaProducer, biConsumer);
        }
    }

    public KafkaDeadLetterQueue(String str, String str2, KafkaProducer kafkaProducer, BiConsumer<Throwable, Boolean> biConsumer) {
        this.channel = str;
        this.topic = str2;
        this.producer = kafkaProducer;
        this.reportFailure = biConsumer;
    }

    private static String getMirrorSerializer(String str) {
        return str == null ? StringSerializer.class.getName() : str.replace("Deserializer", "Serializer");
    }

    private String getThrowableMessage(Throwable th) {
        String message = th.getMessage();
        if (message == null) {
            message = th.toString();
        }
        return message;
    }

    @Override // io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler
    public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> incomingKafkaRecord, Throwable th, Metadata metadata) {
        OutgoingKafkaRecordMetadata outgoingKafkaRecordMetadata = metadata != null ? (OutgoingKafkaRecordMetadata) metadata.get(OutgoingKafkaRecordMetadata.class).orElse(null) : null;
        String str = this.topic;
        if (outgoingKafkaRecordMetadata != null && outgoingKafkaRecordMetadata.getTopic() != null) {
            str = outgoingKafkaRecordMetadata.getTopic();
        }
        Object key = incomingKafkaRecord.getKey();
        if (outgoingKafkaRecordMetadata != null && outgoingKafkaRecordMetadata.getKey() != null) {
            key = outgoingKafkaRecordMetadata.getKey();
        }
        Integer num = null;
        if (outgoingKafkaRecordMetadata != null && outgoingKafkaRecordMetadata.getPartition() >= 0) {
            num = Integer.valueOf(outgoingKafkaRecordMetadata.getPartition());
        }
        ProducerRecord<?, ?> producerRecord = new ProducerRecord<>(str, num, key, incomingKafkaRecord.getPayload());
        addHeader(producerRecord, DEAD_LETTER_EXCEPTION_CLASS_NAME, th.getClass().getName());
        addHeader(producerRecord, DEAD_LETTER_REASON, getThrowableMessage(th));
        if (th.getCause() != null) {
            addHeader(producerRecord, DEAD_LETTER_CAUSE_CLASS_NAME, th.getCause().getClass().getName());
            addHeader(producerRecord, DEAD_LETTER_CAUSE, getThrowableMessage(th.getCause()));
        }
        addHeader(producerRecord, DEAD_LETTER_TOPIC, incomingKafkaRecord.getTopic());
        addHeader(producerRecord, DEAD_LETTER_PARTITION, Integer.toString(incomingKafkaRecord.getPartition()));
        addHeader(producerRecord, DEAD_LETTER_OFFSET, Long.toString(incomingKafkaRecord.getOffset()));
        incomingKafkaRecord.getHeaders().forEach(header -> {
            producerRecord.headers().add(header);
        });
        if (outgoingKafkaRecordMetadata != null && outgoingKafkaRecordMetadata.getHeaders() != null) {
            outgoingKafkaRecordMetadata.getHeaders().forEach(header2 -> {
                producerRecord.headers().add(header2);
            });
        }
        KafkaLogging.log.messageNackedDeadLetter(this.channel, str);
        Uni chain = this.producer.send(producerRecord).onFailure().invoke(obj -> {
            this.reportFailure.accept((Throwable) obj, true);
        }).onItem().ignore().andContinueWithNull().chain(() -> {
            return Uni.createFrom().completionStage(incomingKafkaRecord.ack());
        });
        Objects.requireNonNull(incomingKafkaRecord);
        return chain.emitOn(incomingKafkaRecord::runOnMessageContext);
    }

    void addHeader(ProducerRecord<?, ?> producerRecord, String str, String str2) {
        producerRecord.headers().add(str, str2.getBytes(StandardCharsets.UTF_8));
    }

    @Override // io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler
    public void terminate() {
        this.producer.close();
    }
}
