/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.support.converter;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.function.Function;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.jspecify.annotations.Nullable;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.support.AbstractKafkaHeaderMapper;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.JsonKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;

public class MessagingMessageConverter
implements RecordMessageConverter {
    protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
    private final Function<Message<?>, @Nullable Integer> partitionProvider;
    private boolean generateMessageId = false;
    private boolean generateTimestamp = false;
    private KafkaHeaderMapper headerMapper;
    private boolean rawRecordHeader;
    private @Nullable SmartMessageConverter messagingConverter;

    public MessagingMessageConverter() {
        this(msg -> (Integer)msg.getHeaders().get((Object)"kafka_partitionId", Integer.class));
    }

    public MessagingMessageConverter(Function<Message<?>, @Nullable Integer> partitionProvider) {
        Assert.notNull(partitionProvider, (String)"'partitionProvider' cannot be null");
        this.headerMapper = JacksonPresent.isJackson3Present() ? new JsonKafkaHeaderMapper() : (JacksonPresent.isJackson2Present() ? new DefaultKafkaHeaderMapper() : new SimpleKafkaHeaderMapper());
        this.partitionProvider = partitionProvider;
    }

    public void setGenerateMessageId(boolean generateMessageId) {
        this.generateMessageId = generateMessageId;
    }

    public void setGenerateTimestamp(boolean generateTimestamp) {
        this.generateTimestamp = generateTimestamp;
    }

    public void setHeaderMapper(KafkaHeaderMapper headerMapper) {
        this.headerMapper = headerMapper;
    }

    public void setRawRecordHeader(boolean rawRecordHeader) {
        this.rawRecordHeader = rawRecordHeader;
    }

    protected MessageConverter getMessagingConverter() {
        return this.messagingConverter;
    }

    public void setMessagingConverter(@Nullable SmartMessageConverter messagingConverter) {
        this.messagingConverter = messagingConverter;
        if (messagingConverter != null && this.headerMapper instanceof AbstractKafkaHeaderMapper) {
            ((AbstractKafkaHeaderMapper)this.headerMapper).addRawMappedHeader("contentType", true);
        }
    }

    @Override
    public Message<?> toMessage(ConsumerRecord<?, ?> record, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer, @Nullable Type type) {
        Class clazz;
        Object payload;
        KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp);
        Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
        if (record.headers() != null) {
            this.mapOrAddHeaders(record, rawHeaders);
        }
        String ttName = record.timestampType() != null ? record.timestampType().name() : null;
        this.commonHeaders(acknowledgment, consumer, rawHeaders, record.key(), record.topic(), record.partition(), record.offset(), ttName, record.timestamp());
        if (this.rawRecordHeader) {
            rawHeaders.put("kafka_data", record);
        }
        Message message = MessageBuilder.createMessage((Object)this.extractAndConvertValue(record, type), (MessageHeaders)kafkaMessageHeaders);
        if (this.messagingConverter != null && !message.getPayload().equals(KafkaNull.INSTANCE) && (payload = this.messagingConverter.fromMessage(message, clazz = type instanceof Class ? (Class)type : (type instanceof ParameterizedType ? (Class)((ParameterizedType)type).getRawType() : Object.class), (Object)type)) != null) {
            message = new GenericMessage(payload, message.getHeaders());
        }
        return message;
    }

    private void mapOrAddHeaders(ConsumerRecord<?, ?> record, Map<String, Object> rawHeaders) {
        if (this.headerMapper != null) {
            this.headerMapper.toHeaders(record.headers(), rawHeaders);
        } else {
            this.logger.debug(() -> "No header mapper is available; Jackson is required for the default mapper; headers (if present) are not mapped but provided raw in kafka_nativeHeaders");
            rawHeaders.put("kafka_nativeHeaders", record.headers());
            Header contentType = record.headers().lastHeader("contentType");
            if (contentType != null) {
                rawHeaders.put("contentType", new String(contentType.value(), StandardCharsets.UTF_8));
            }
        }
    }

    @Override
    public ProducerRecord<?, ?> fromMessage(Message<?> messageArg, @Nullable String defaultTopic) {
        Message converted;
        Message message = messageArg;
        if (this.messagingConverter != null && (converted = this.messagingConverter.toMessage(message.getPayload(), message.getHeaders())) != null) {
            message = converted;
        }
        MessageHeaders headers = message.getHeaders();
        Object topicHeader = headers.get((Object)"kafka_topic");
        String topic = null;
        if (topicHeader instanceof byte[]) {
            topic = new String((byte[])topicHeader, StandardCharsets.UTF_8);
        } else if (topicHeader instanceof String) {
            topic = (String)topicHeader;
        } else if (topicHeader == null) {
            Assert.state((defaultTopic != null ? 1 : 0) != 0, (String)"With no topic header, a defaultTopic is required");
        } else {
            throw new IllegalStateException("kafka_topic must be a String or byte[], not " + String.valueOf(topicHeader.getClass()));
        }
        Integer partition = this.partitionProvider.apply(message);
        Object key = headers.get((Object)"kafka_messageKey");
        Object payload = this.convertPayload(message);
        Long timestamp = (Long)headers.get((Object)"kafka_timestamp", Long.class);
        Headers recordHeaders = this.initialRecordHeaders(message);
        if (this.headerMapper != null) {
            this.headerMapper.fromHeaders(headers, recordHeaders);
        }
        return new ProducerRecord(topic == null ? defaultTopic : topic, partition, timestamp, key, payload, (Iterable)recordHeaders);
    }

    protected Headers initialRecordHeaders(Message<?> message) {
        return new RecordHeaders();
    }

    protected @Nullable Object convertPayload(Message<?> message) {
        Object payload = message.getPayload();
        if (payload instanceof KafkaNull) {
            return null;
        }
        return payload;
    }

    protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, @Nullable Type type) {
        return record.value() == null ? KafkaNull.INSTANCE : record.value();
    }
}

