package org.apache.camel.component.kafka.consumer.support.batching;

import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.util.StopWatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.class */
final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordBatchingProcessor.class);
    private final KafkaConfiguration configuration;
    private final Processor processor;
    private final CommitManager commitManager;
    private final StopWatch timeoutWatch = new StopWatch();
    private final StopWatch intervalWatch = new StopWatch();
    private final Queue<Exchange> exchangeList;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor$CommitSynchronization.class */
    public final class CommitSynchronization implements Synchronization {
        private final ExceptionHandler exceptionHandler;
        private final int size;

        public CommitSynchronization(ExceptionHandler exceptionHandler, int i) {
            this.exceptionHandler = exceptionHandler;
            this.size = i;
        }

        public void onComplete(Exchange exchange) {
            KafkaRecordBatchingProcessor.LOG.debug("Calling commit on {} exchanges using {}", Integer.valueOf(this.size), KafkaRecordBatchingProcessor.this.commitManager.getClass().getSimpleName());
            KafkaRecordBatchingProcessor.this.commitManager.commit();
        }

        public void onFailure(Exchange exchange) {
            Exception exception = exchange.getException();
            if (exception != null) {
                this.exceptionHandler.handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, exception);
            } else {
                KafkaRecordBatchingProcessor.LOG.warn("Skipping auto-commit on the batch because processing the exchanged has failed and the error was not correctly handled");
            }
        }
    }

    public KafkaRecordBatchingProcessor(KafkaConfiguration kafkaConfiguration, Processor processor, CommitManager commitManager) {
        this.configuration = kafkaConfiguration;
        this.processor = processor;
        this.commitManager = commitManager;
        this.exchangeList = new ArrayBlockingQueue(kafkaConfiguration.getMaxPollRecords().intValue());
    }

    public Exchange toExchange(KafkaConsumer kafkaConsumer, TopicPartition topicPartition, ConsumerRecord<Object, Object> consumerRecord) {
        Exchange createExchange = kafkaConsumer.createExchange(false);
        Message message = createExchange.getMessage();
        setupExchangeMessage(message, consumerRecord);
        propagateHeaders(this.configuration, consumerRecord, createExchange);
        if (this.configuration.isAllowManualCommit()) {
            message.setHeader(KafkaConstants.MANUAL_COMMIT, this.commitManager.getManualCommit(createExchange, topicPartition, consumerRecord));
        }
        return createExchange;
    }

    public ProcessingResult processExchange(KafkaConsumer kafkaConsumer, ConsumerRecords<Object, Object> consumerRecords) {
        LOG.debug("There's {} records to process ... max poll is set to {}", Integer.valueOf(consumerRecords.count()), this.configuration.getMaxPollRecords());
        if (this.exchangeList.isEmpty()) {
            this.timeoutWatch.takenAndRestart();
        }
        if (hasExpiredRecords(consumerRecords)) {
            LOG.debug("The polling timeout has expired with {} records in cache. Dispatching the incomplete batch for processing", Integer.valueOf(this.exchangeList.size()));
            processBatch(kafkaConsumer);
            this.exchangeList.clear();
            return ProcessingResult.newUnprocessed();
        }
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord<Object, Object> consumerRecord = (ConsumerRecord) it.next();
            this.exchangeList.add(toExchange(kafkaConsumer, new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord));
            if (this.exchangeList.size() >= this.configuration.getMaxPollRecords().intValue()) {
                processBatch(kafkaConsumer);
                this.exchangeList.clear();
            }
        }
        return ProcessingResult.newUnprocessed();
    }

    private boolean hasExpiredRecords(ConsumerRecords<Object, Object> consumerRecords) {
        if (this.exchangeList.isEmpty()) {
            return false;
        }
        return (consumerRecords.isEmpty() && (this.timeoutWatch.taken() > this.configuration.getPollTimeoutMs().longValue() ? 1 : (this.timeoutWatch.taken() == this.configuration.getPollTimeoutMs().longValue() ? 0 : -1)) >= 0) || (this.configuration.getBatchingIntervalMs() != null && (this.intervalWatch.taken() > ((long) this.configuration.getBatchingIntervalMs().intValue()) ? 1 : (this.intervalWatch.taken() == ((long) this.configuration.getBatchingIntervalMs().intValue()) ? 0 : -1)) >= 0);
    }

    private void processBatch(KafkaConsumer kafkaConsumer) {
        this.intervalWatch.restart();
        Exchange createExchange = kafkaConsumer.createExchange(false);
        Message message = createExchange.getMessage();
        List<Exchange> list = this.exchangeList.stream().toList();
        message.setBody(list);
        try {
            if (this.configuration.isAllowManualCommit()) {
                Exchange exchange = list.isEmpty() ? null : list.get(list.size() - 1);
                if (exchange != null) {
                    message.setHeader(KafkaConstants.MANUAL_COMMIT, exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT));
                }
                manualCommitResultProcessing(kafkaConsumer, createExchange);
            } else {
                autoCommitResultProcessing(kafkaConsumer, createExchange, list.size());
            }
        } finally {
            kafkaConsumer.releaseExchange(createExchange, false);
        }
    }

    private void autoCommitResultProcessing(KafkaConsumer kafkaConsumer, Exchange exchange, int i) {
        ExceptionHandler exceptionHandler = kafkaConsumer.getExceptionHandler();
        exchange.getExchangeExtension().addOnCompletion(new CommitSynchronization(exceptionHandler, i));
        try {
            this.processor.process(exchange);
        } catch (Exception e) {
            exchange.setException(e);
        }
        if (exchange.getException() != null) {
            processException(exchange, exceptionHandler);
        }
    }

    private void manualCommitResultProcessing(KafkaConsumer kafkaConsumer, Exchange exchange) {
        try {
            this.processor.process(exchange);
        } catch (Exception e) {
            exchange.setException(e);
        }
        if (exchange.getException() != null) {
            processException(exchange, kafkaConsumer.getExceptionHandler());
        }
    }

    private void processException(Exchange exchange, ExceptionHandler exceptionHandler) {
        exceptionHandler.handleException("Error during processing", exchange, exchange.getException());
    }
}
