package org.apache.camel.component.kafka.consumer.support;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.class */
public class KafkaRecordProcessorFacade {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordProcessorFacade.class);
    private final KafkaConsumer camelKafkaConsumer;
    private final Map<String, Long> lastProcessedOffset;
    private final String threadId;
    private final KafkaRecordProcessor kafkaRecordProcessor;
    private final CommitManager commitManager;

    public KafkaRecordProcessorFacade(KafkaConsumer kafkaConsumer, Map<String, Long> map, String str, CommitManager commitManager) {
        this.camelKafkaConsumer = kafkaConsumer;
        this.lastProcessedOffset = map;
        this.threadId = str;
        this.commitManager = commitManager;
        this.kafkaRecordProcessor = buildKafkaRecordProcessor(commitManager);
    }

    private boolean isStopping() {
        return this.camelKafkaConsumer.isStopping();
    }

    public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> consumerRecords) {
        logRecords(consumerRecords);
        Iterator it = consumerRecords.partitions().iterator();
        ProcessingResult newUnprocessed = ProcessingResult.newUnprocessed();
        while (it.hasNext() && !isStopping()) {
            newUnprocessed = ProcessingResult.newUnprocessed();
            TopicPartition topicPartition = (TopicPartition) it.next();
            List<ConsumerRecord<Object, Object>> records = consumerRecords.records(topicPartition);
            Iterator<ConsumerRecord<Object, Object>> it2 = records.iterator();
            logRecordsInPartition(records, topicPartition);
            while (!newUnprocessed.isBreakOnErrorHit() && it2.hasNext() && !isStopping()) {
                newUnprocessed = processRecord(topicPartition, it.hasNext(), it2.hasNext(), newUnprocessed, this.kafkaRecordProcessor, it2.next());
            }
            if (!newUnprocessed.isBreakOnErrorHit()) {
                LOG.debug("Committing offset on successful execution");
                this.commitManager.commitOffset(topicPartition, newUnprocessed.getPartitionLastOffset());
            }
        }
        return newUnprocessed;
    }

    private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> list, TopicPartition topicPartition) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Records count {} received for partition {}", Integer.valueOf(list.size()), topicPartition);
        }
    }

    private void logRecords(ConsumerRecords<Object, Object> consumerRecords) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Last poll on thread {} resulted on {} records to process", this.threadId, Integer.valueOf(consumerRecords.count()));
        }
    }

    private ProcessingResult processRecord(TopicPartition topicPartition, boolean z, boolean z2, ProcessingResult processingResult, KafkaRecordProcessor kafkaRecordProcessor, ConsumerRecord<Object, Object> consumerRecord) {
        logRecord(consumerRecord);
        Exchange createExchange = this.camelKafkaConsumer.createExchange(false);
        ProcessingResult processExchange = kafkaRecordProcessor.processExchange(createExchange, topicPartition, z, z2, consumerRecord, processingResult, this.camelKafkaConsumer.getExceptionHandler());
        if (!processExchange.isBreakOnErrorHit()) {
            this.lastProcessedOffset.put(KafkaRecordProcessor.serializeOffsetKey(topicPartition), Long.valueOf(processExchange.getPartitionLastOffset()));
        }
        this.camelKafkaConsumer.releaseExchange(createExchange, false);
        return processExchange;
    }

    private void logRecord(ConsumerRecord<Object, Object> consumerRecord) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", new Object[]{Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()});
        }
    }

    private KafkaRecordProcessor buildKafkaRecordProcessor(CommitManager commitManager) {
        return new KafkaRecordProcessor(this.camelKafkaConsumer.m2getEndpoint().getConfiguration(), this.camelKafkaConsumer.getProcessor(), commitManager);
    }
}
