package org.apache.camel.component.kafka.consumer.support.batching;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.Synchronization;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.class */
final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordBatchingProcessor.class);
    private final KafkaConfiguration configuration;
    private final Processor processor;
    private final CommitManager commitManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor$CommitSynchronization.class */
    public final class CommitSynchronization implements Synchronization {
        private final ExceptionHandler exceptionHandler;
        private ProcessingResult result;

        public CommitSynchronization(ExceptionHandler exceptionHandler) {
            this.exceptionHandler = exceptionHandler;
        }

        public void onComplete(Exchange exchange) {
            List list = (List) exchange.getMessage().getBody(List.class);
            if (list == null || list.isEmpty()) {
                KafkaRecordBatchingProcessor.LOG.warn("The exchange is {}", list == null ? "not of the expected type (null)" : "empty");
                return;
            }
            KafkaRecordBatchingProcessor.LOG.debug("Calling commit on {} exchanges using {}", Integer.valueOf(list.size()), KafkaRecordBatchingProcessor.this.commitManager.getClass().getSimpleName());
            KafkaRecordBatchingProcessor.this.commitManager.commit();
            this.result = new ProcessingResult(false, false);
        }

        public void onFailure(Exchange exchange) {
            Exception exception = exchange.getException();
            if (exception != null) {
                this.exceptionHandler.handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, exception);
            } else {
                KafkaRecordBatchingProcessor.LOG.warn("Skipping auto-commit on the batch because processing the exchanged has failed and the error was not correctly handled");
            }
            this.result = new ProcessingResult(false, true);
        }
    }

    public KafkaRecordBatchingProcessor(KafkaConfiguration kafkaConfiguration, Processor processor, CommitManager commitManager) {
        this.configuration = kafkaConfiguration;
        this.processor = processor;
        this.commitManager = commitManager;
    }

    public Exchange toExchange(KafkaConsumer kafkaConsumer, TopicPartition topicPartition, ConsumerRecord<Object, Object> consumerRecord) {
        Exchange createExchange = kafkaConsumer.createExchange(false);
        Message message = createExchange.getMessage();
        setupExchangeMessage(message, consumerRecord);
        propagateHeaders(this.configuration, consumerRecord, createExchange);
        if (this.configuration.isAllowManualCommit()) {
            message.setHeader(KafkaConstants.MANUAL_COMMIT, this.commitManager.getManualCommit(createExchange, topicPartition, consumerRecord));
        }
        return createExchange;
    }

    public ProcessingResult processExchange(KafkaConsumer kafkaConsumer, ConsumerRecords<Object, Object> consumerRecords) {
        ArrayList arrayList = new ArrayList(consumerRecords.count());
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord<Object, Object> consumerRecord = (ConsumerRecord) it.next();
            arrayList.add(toExchange(kafkaConsumer, new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord));
        }
        Exchange createExchange = kafkaConsumer.createExchange(false);
        createExchange.getMessage().setBody(arrayList);
        try {
            if (this.configuration.isAllowManualCommit()) {
                ProcessingResult manualCommitResultProcessing = manualCommitResultProcessing(kafkaConsumer, createExchange);
                kafkaConsumer.releaseExchange(createExchange, false);
                return manualCommitResultProcessing;
            }
            ProcessingResult autoCommitResultProcessing = autoCommitResultProcessing(kafkaConsumer, createExchange);
            kafkaConsumer.releaseExchange(createExchange, false);
            return autoCommitResultProcessing;
        } catch (Throwable th) {
            kafkaConsumer.releaseExchange(createExchange, false);
            throw th;
        }
    }

    private ProcessingResult autoCommitResultProcessing(KafkaConsumer kafkaConsumer, Exchange exchange) {
        CommitSynchronization commitSynchronization = new CommitSynchronization(kafkaConsumer.getExceptionHandler());
        exchange.getExchangeExtension().addOnCompletion(commitSynchronization);
        try {
            this.processor.process(exchange);
        } catch (Exception e) {
            exchange.setException(e);
        }
        return commitSynchronization.result;
    }

    private ProcessingResult manualCommitResultProcessing(KafkaConsumer kafkaConsumer, Exchange exchange) {
        ProcessingResult processingResult;
        try {
            this.processor.process(exchange);
        } catch (Exception e) {
            exchange.setException(e);
        }
        if (exchange.getException() != null) {
            LOG.debug("An exception was thrown for batch records");
            processingResult = new ProcessingResult(false, processException(exchange, kafkaConsumer.getExceptionHandler()));
        } else {
            processingResult = new ProcessingResult(false, false);
        }
        return processingResult;
    }

    private boolean processException(Exchange exchange, ExceptionHandler exceptionHandler) {
        exceptionHandler.handleException("Error during processing", exchange, exchange.getException());
        return true;
    }
}
