package org.springframework.kafka.streams.messaging;

import java.util.ArrayList;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-kafka-3.0.2.jar:org/springframework/kafka/streams/messaging/MessagingProcessor.class */
public class MessagingProcessor<Kin, Vin, Kout, Vout> extends ContextualProcessor<Kin, Vin, Kout, Vout> {
    private final MessagingFunction function;
    private final MessagingMessageConverter converter;

    public MessagingProcessor(MessagingFunction messagingFunction, MessagingMessageConverter messagingMessageConverter) {
        Assert.notNull(messagingFunction, "'function' cannot be null");
        Assert.notNull(messagingMessageConverter, "'converter' cannot be null");
        this.function = messagingFunction;
        this.converter = messagingMessageConverter;
    }

    public void process(Record<Kin, Vin> record) {
        ProcessorContext context = context();
        RecordMetadata recordMetadata = (RecordMetadata) context.recordMetadata().orElse(null);
        Assert.state(recordMetadata != null, "No record metadata present");
        Headers headers = record.headers();
        Message<?> exchange = this.function.exchange(this.converter.toMessage(new ConsumerRecord<>(recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), record.timestamp(), TimestampType.NO_TIMESTAMP_TYPE, 0, 0, record.key(), record.value(), headers, (Optional<Integer>) Optional.empty()), null, null, null));
        ArrayList arrayList = new ArrayList();
        headers.forEach(header -> {
            arrayList.add(header.key());
        });
        arrayList.forEach(str -> {
            headers.remove(str);
        });
        this.converter.fromMessage(exchange, "dummy").headers().forEach(header2 -> {
            if (header2.key().equals(KafkaHeaders.TOPIC)) {
                return;
            }
            headers.add(header2);
        });
        context.forward(new Record(exchange.getHeaders().get(KafkaHeaders.KEY), exchange.getPayload(), record.timestamp(), headers));
    }

    public void close() {
    }
}
