package org.apache.camel.component.kafka;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.CommitManagers;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade;
import org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.util.IOHelper;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/camel/component/kafka/KafkaFetchRecords.class */
public class KafkaFetchRecords implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaFetchRecords.class);
    private final KafkaConsumer kafkaConsumer;
    private Consumer consumer;
    private final String topicName;
    private final Pattern topicPattern;
    private final String threadId;
    private final Properties kafkaProps;
    private final PollExceptionStrategy pollExceptionStrategy;
    private final BridgeExceptionHandlerToErrorHandler bridge;
    private CommitManager commitManager;
    private boolean reconnect;
    private boolean connected;
    private final Map<String, Long> lastProcessedOffset = new HashMap();
    private final ReentrantLock lock = new ReentrantLock();
    private boolean retry = true;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaFetchRecords(KafkaConsumer kafkaConsumer, PollExceptionStrategy pollExceptionStrategy, BridgeExceptionHandlerToErrorHandler bridgeExceptionHandlerToErrorHandler, String str, Pattern pattern, String str2, Properties properties) {
        this.kafkaConsumer = kafkaConsumer;
        this.pollExceptionStrategy = pollExceptionStrategy;
        this.bridge = bridgeExceptionHandlerToErrorHandler;
        this.topicName = str;
        this.topicPattern = pattern;
        this.threadId = str + "-Thread " + str2;
        this.kafkaProps = properties;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (!isKafkaConsumerRunnable()) {
            return;
        }
        do {
            try {
                if (!isConnected()) {
                    createConsumer();
                    this.commitManager = CommitManagers.createCommitManager(this.consumer, this.kafkaConsumer, this.threadId, getPrintableTopic());
                    initializeConsumer();
                    setConnected(true);
                }
                startPolling();
            } catch (Exception e) {
                setConnected(false);
                LOG.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer due {}", e.getMessage(), e);
            }
            if (!isRetrying() && !isReconnect()) {
                break;
            }
        } while (isKafkaConsumerRunnable());
        if (LOG.isInfoEnabled()) {
            LOG.info("Terminating KafkaConsumer thread: {} receiving from {}", this.threadId, getPrintableTopic());
        }
        safeUnsubscribe();
        IOHelper.close(this.consumer);
    }

    protected void createConsumer() {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
            LOG.info("{} Kafka consumer thread ID {} with poll timeout of {} ms", new Object[]{this.consumer == null ? "Connecting" : "Reconnecting", this.threadId, Long.valueOf(this.kafkaConsumer.m2getEndpoint().getConfiguration().getPollTimeoutMs().longValue())});
            this.consumer = this.kafkaConsumer.m2getEndpoint().getKafkaClientFactory().getConsumer(this.kafkaProps);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    private void initializeConsumer() {
        subscribe();
        setConnected(false);
        setRetry(true);
    }

    private void subscribe() {
        PartitionAssignmentListener partitionAssignmentListener = new PartitionAssignmentListener(this.threadId, this.kafkaConsumer.m2getEndpoint().getConfiguration(), this.consumer, this.lastProcessedOffset, this::isRunnable, this.commitManager);
        if (LOG.isInfoEnabled()) {
            LOG.info("Subscribing {} to {}", this.threadId, getPrintableTopic());
        }
        if (this.topicPattern != null) {
            this.consumer.subscribe(this.topicPattern, partitionAssignmentListener);
        } else {
            this.consumer.subscribe(Arrays.asList(this.topicName.split(",")), partitionAssignmentListener);
        }
    }

    protected void startPolling() {
        try {
            try {
                try {
                    this.lock.lock();
                    long longValue = this.kafkaConsumer.m2getEndpoint().getConfiguration().getPollTimeoutMs().longValue();
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Polling {} from {} with timeout: {}", new Object[]{this.threadId, getPrintableTopic(), Long.valueOf(longValue)});
                    }
                    KafkaRecordProcessorFacade kafkaRecordProcessorFacade = new KafkaRecordProcessorFacade(this.kafkaConsumer, this.lastProcessedOffset, this.threadId, this.commitManager);
                    Duration ofMillis = Duration.ofMillis(longValue);
                    while (isKafkaConsumerRunnable() && isRetrying() && isConnected()) {
                        ConsumerRecords<Object, Object> poll = this.consumer.poll(ofMillis);
                        this.commitManager.processAsyncCommits();
                        if (kafkaRecordProcessorFacade.processPolledRecords(poll).isBreakOnErrorHit()) {
                            LOG.debug("We hit an error ... setting flags to force reconnect");
                            setReconnect(true);
                            setConnected(false);
                            setRetry(false);
                        }
                    }
                    if (!isConnected()) {
                        LOG.debug("Not reconnecting, check whether to auto-commit or not ...");
                        this.commitManager.commit();
                    }
                    safeUnsubscribe();
                    this.lock.unlock();
                    if (isRetrying()) {
                        return;
                    }
                    LOG.debug("Closing consumer {}", this.threadId);
                    safeUnsubscribe();
                    IOHelper.close(this.consumer);
                } catch (WakeupException e) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("The kafka consumer was woken up while polling on thread {} for {}", this.threadId, getPrintableTopic());
                    }
                    safeUnsubscribe();
                    this.lock.unlock();
                    if (isRetrying()) {
                        return;
                    }
                    LOG.debug("Closing consumer {}", this.threadId);
                    safeUnsubscribe();
                    IOHelper.close(this.consumer);
                }
            } catch (InterruptException e2) {
                this.kafkaConsumer.getExceptionHandler().handleException("Interrupted while consuming " + this.threadId + " from kafka topic", e2);
                this.commitManager.commit();
                LOG.info("Unsubscribing {} from {}", this.threadId, getPrintableTopic());
                safeUnsubscribe();
                Thread.currentThread().interrupt();
                this.lock.unlock();
                if (isRetrying()) {
                    return;
                }
                LOG.debug("Closing consumer {}", this.threadId);
                safeUnsubscribe();
                IOHelper.close(this.consumer);
            } catch (Exception e3) {
                if (LOG.isDebugEnabled()) {
                    LOG.warn("Exception {} caught while polling {} from kafka {} at offset {}: {}", new Object[]{e3.getClass().getName(), this.threadId, getPrintableTopic(), this.lastProcessedOffset, e3.getMessage(), e3});
                } else {
                    LOG.warn("Exception {} caught while polling {} from kafka {} at offset {}: {}", new Object[]{e3.getClass().getName(), this.threadId, getPrintableTopic(), this.lastProcessedOffset, e3.getMessage()});
                }
                handleAccordingToStrategy(-1L, e3);
                this.lock.unlock();
                if (isRetrying()) {
                    return;
                }
                LOG.debug("Closing consumer {}", this.threadId);
                safeUnsubscribe();
                IOHelper.close(this.consumer);
            }
        } catch (Throwable th) {
            this.lock.unlock();
            if (!isRetrying()) {
                LOG.debug("Closing consumer {}", this.threadId);
                safeUnsubscribe();
                IOHelper.close(this.consumer);
            }
            throw th;
        }
    }

    private void handleAccordingToStrategy(long j, Exception exc) {
        PollOnError handleException = this.pollExceptionStrategy.handleException(exc);
        if (PollOnError.RETRY == handleException) {
            handlePollRetry();
            return;
        }
        if (PollOnError.RECONNECT == handleException) {
            handlePollReconnect();
            return;
        }
        if (PollOnError.ERROR_HANDLER == handleException) {
            handlePollErrorHandler(j, exc);
        } else if (PollOnError.DISCARD == handleException) {
            handlePollDiscard(j);
        } else if (PollOnError.STOP == handleException) {
            handlePollStop();
        }
    }

    private void safeUnsubscribe() {
        String printableTopic = getPrintableTopic();
        try {
            this.consumer.unsubscribe();
        } catch (IllegalStateException e) {
            LOG.warn("The consumer is likely already closed. Skipping the unsubscription from {}", printableTopic);
        } catch (Exception e2) {
            this.kafkaConsumer.getExceptionHandler().handleException("Error unsubscribing thread " + this.threadId + " from kafka " + printableTopic, e2);
        }
    }

    private String getPrintableTopic() {
        return this.topicPattern != null ? "topic pattern " + this.topicPattern : "topic " + this.topicName;
    }

    private void handlePollStop() {
        LOG.warn("Requesting the consumer to stop based on polling exception strategy");
        setRetry(false);
        setConnected(false);
    }

    private void handlePollDiscard(long j) {
        LOG.warn("Requesting the consumer to discard the message and continue to the next based on polling exception strategy");
        seekToNextOffset(j);
    }

    private void handlePollErrorHandler(long j, Exception exc) {
        LOG.warn("Deferring processing to the exception handler based on polling exception strategy");
        this.bridge.handleException(exc);
        seekToNextOffset(j);
    }

    private void handlePollReconnect() {
        LOG.warn("Requesting the consumer to re-connect on the next run based on polling exception strategy");
        setReconnect(true);
        setConnected(false);
        setRetry(false);
    }

    private void handlePollRetry() {
        LOG.warn("Requesting the consumer to retry polling the same message based on polling exception strategy");
        setRetry(true);
    }

    private boolean isKafkaConsumerRunnable() {
        return (!this.kafkaConsumer.isRunAllowed() || this.kafkaConsumer.isStoppingOrStopped() || this.kafkaConsumer.isSuspendingOrSuspended()) ? false : true;
    }

    private boolean isRunnable() {
        return this.kafkaConsumer.m2getEndpoint().getCamelContext().isStopping() && !this.kafkaConsumer.isRunAllowed();
    }

    private void seekToNextOffset(long j) {
        boolean z = false;
        Set<TopicPartition> assignment = this.consumer.assignment();
        if (assignment != null && j != -1) {
            long j2 = j + 1;
            if (LOG.isInfoEnabled()) {
                LOG.info("Consumer seeking to next offset {} to continue polling next message from {}", Long.valueOf(j2), getPrintableTopic());
            }
            Iterator it = assignment.iterator();
            while (it.hasNext()) {
                this.consumer.seek((TopicPartition) it.next(), j2);
            }
            return;
        }
        if (assignment != null) {
            for (TopicPartition topicPartition : assignment) {
                long position = this.consumer.position(topicPartition) + 1;
                if (!z) {
                    LOG.info("Consumer seeking to next offset {} to continue polling next message from {}", Long.valueOf(position), getPrintableTopic());
                    z = true;
                }
                this.consumer.seek(topicPartition, position);
            }
        }
    }

    private boolean isRetrying() {
        return this.retry;
    }

    private void setRetry(boolean z) {
        this.retry = z;
    }

    private boolean isReconnect() {
        return this.reconnect;
    }

    private void setReconnect(boolean z) {
        this.reconnect = z;
    }

    private void safeStop() {
        long shutdownTimeout = this.kafkaConsumer.m2getEndpoint().getConfiguration().getShutdownTimeout();
        try {
            try {
                LOG.info("Waiting up to {} milliseconds for the processing to finish", Long.valueOf(shutdownTimeout));
                if (!this.lock.tryLock(shutdownTimeout, TimeUnit.MILLISECONDS)) {
                    LOG.warn("The processing of the current record did not finish within {} seconds", Long.valueOf(shutdownTimeout));
                }
                this.consumer.wakeup();
                this.lock.unlock();
            } catch (InterruptedException e) {
                this.consumer.wakeup();
                Thread.currentThread().interrupt();
                this.lock.unlock();
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        safeStop();
    }

    public boolean isConnected() {
        return this.connected;
    }

    public void setConnected(boolean z) {
        this.connected = z;
    }
}
