package org.springframework.integration.kafka.inbound;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallbackFactory;
import org.springframework.integration.core.Pausable;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.kafka.listener.ErrorHandlingUtils;
import org.springframework.kafka.listener.LoggingCommitCallback;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.LogIfLevelEnabled;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaMessageSource.class */
public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object> implements Pausable, BeanClassLoaderAware {
    private static final long MIN_ASSIGN_TIMEOUT = 2000;
    private static final int DEFAULT_CLOSE_TIMEOUT = 30;
    public static final String REMAINING_RECORDS = "kafka_remainingRecords";
    private final ConsumerFactory<K, V> consumerFactory;
    private final KafkaAckCallbackFactory<K, V> ackCallbackFactory;
    private final Lock receiveLock;
    private final Lock consumerLock;
    private final Map<TopicPartition, Set<KafkaAckInfo<K, V>>> inflightRecords;
    private final AtomicInteger remainingCount;
    private final ConsumerProperties consumerProperties;
    private final Collection<TopicPartition> assignedPartitions;
    private final Duration commitTimeout;
    private final Duration assignTimeout;
    private final Duration pollTimeout;
    private final AtomicBoolean running;
    private final AtomicBoolean pausing;
    private final AtomicBoolean paused;
    private final AtomicBoolean stopped;
    private RecordMessageConverter messageConverter;
    private Class<?> payloadType;
    private boolean rawMessageHeader;
    private Duration closeTimeout;
    private boolean checkNullKeyForExceptions;
    private boolean checkNullValueForExceptions;
    private volatile Consumer<K, V> consumer;
    private volatile Iterator<ConsumerRecord<K, V>> recordsIterator;
    public volatile boolean newAssignment;
    private ClassLoader classLoader;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.springframework.integration.kafka.inbound.KafkaMessageSource$1, reason: invalid class name */
    /* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaMessageSource$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$springframework$integration$acks$AcknowledgmentCallback$Status = new int[AcknowledgmentCallback.Status.values().length];

        static {
            try {
                $SwitchMap$org$springframework$integration$acks$AcknowledgmentCallback$Status[AcknowledgmentCallback.Status.ACCEPT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$springframework$integration$acks$AcknowledgmentCallback$Status[AcknowledgmentCallback.Status.REJECT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$springframework$integration$acks$AcknowledgmentCallback$Status[AcknowledgmentCallback.Status.REQUEUE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaMessageSource$IntegrationConsumerRebalanceListener.class */
    public class IntegrationConsumerRebalanceListener implements ConsumerRebalanceListener {
        private final ConsumerRebalanceListener providedRebalanceListener;
        private final boolean isConsumerAware;

        IntegrationConsumerRebalanceListener(ConsumerRebalanceListener consumerRebalanceListener) {
            this.providedRebalanceListener = consumerRebalanceListener;
            this.isConsumerAware = consumerRebalanceListener instanceof ConsumerAwareRebalanceListener;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            KafkaMessageSource.this.assignedPartitions.removeAll(collection);
            KafkaMessageSource.this.logger.info(() -> {
                return "Partitions revoked: " + String.valueOf(collection);
            });
            if (this.providedRebalanceListener != null) {
                if (this.isConsumerAware) {
                    this.providedRebalanceListener.onPartitionsRevokedAfterCommit(KafkaMessageSource.this.consumer, collection);
                } else {
                    this.providedRebalanceListener.onPartitionsRevoked(collection);
                }
            }
        }

        public void onPartitionsLost(Collection<TopicPartition> collection) {
            if (this.providedRebalanceListener != null) {
                this.providedRebalanceListener.onPartitionsLost(collection);
            }
            onPartitionsRevoked(collection);
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            KafkaMessageSource.this.assignedPartitions.addAll(collection);
            if (KafkaMessageSource.this.paused.get()) {
                KafkaMessageSource.this.consumer.pause(KafkaMessageSource.this.assignedPartitions);
                KafkaMessageSource.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; consumer paused again, so the initial poll() will never return any records");
            }
            KafkaMessageSource.this.logger.info(() -> {
                return "Partitions assigned: " + String.valueOf(collection);
            });
            if (this.providedRebalanceListener != null) {
                if (this.isConsumerAware) {
                    this.providedRebalanceListener.onPartitionsAssigned(KafkaMessageSource.this.consumer, collection);
                } else {
                    this.providedRebalanceListener.onPartitionsAssigned(collection);
                }
            }
            KafkaMessageSource.this.consumer.wakeup();
            KafkaMessageSource.this.newAssignment = true;
        }
    }

    /* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaMessageSource$KafkaAckCallback.class */
    public static class KafkaAckCallback<K, V> implements AcknowledgmentCallback, Acknowledgment {
        private final LogIfLevelEnabled commitLogger;
        private final KafkaAckInfo<K, V> ackInfo;
        private final Duration commitTimeout;
        private final OffsetCommitCallback commitCallback;
        private final boolean isSyncCommits;
        private volatile boolean acknowledged;
        private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
        private boolean autoAckEnabled = true;

        public KafkaAckCallback(KafkaAckInfo<K, V> kafkaAckInfo, @Nullable ConsumerProperties consumerProperties) {
            Assert.notNull(kafkaAckInfo, "'ackInfo' cannot be null");
            this.ackInfo = kafkaAckInfo;
            this.commitTimeout = consumerProperties != null ? consumerProperties.getSyncCommitTimeout() : null;
            this.isSyncCommits = consumerProperties == null || consumerProperties.isSyncCommits();
            this.commitCallback = (consumerProperties == null || consumerProperties.getCommitCallback() == null) ? new LoggingCommitCallback() : consumerProperties.getCommitCallback();
            this.commitLogger = new LogIfLevelEnabled(this.logger, consumerProperties != null ? consumerProperties.getCommitLogLevel() : LogIfLevelEnabled.Level.DEBUG);
        }

        /* JADX WARN: Finally extract failed */
        public void acknowledge(AcknowledgmentCallback.Status status) {
            Assert.notNull(status, "'status' cannot be null");
            if (this.acknowledged) {
                throw new IllegalStateException("Already acknowledged");
            }
            synchronized (this.ackInfo.getConsumerMonitor()) {
                try {
                    try {
                        ConsumerRecord<K, V> record = this.ackInfo.getRecord();
                        switch (AnonymousClass1.$SwitchMap$org$springframework$integration$acks$AcknowledgmentCallback$Status[status.ordinal()]) {
                            case 1:
                            case 2:
                                commitIfPossible(record);
                                break;
                            case 3:
                                rollback(record);
                                break;
                        }
                        this.acknowledged = true;
                        if (!this.ackInfo.isAckDeferred()) {
                            this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition()).remove(this.ackInfo);
                        }
                    } catch (WakeupException e) {
                        throw new IllegalStateException((Throwable) e);
                    }
                } catch (Throwable th) {
                    this.acknowledged = true;
                    if (!this.ackInfo.isAckDeferred()) {
                        this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition()).remove(this.ackInfo);
                    }
                    throw th;
                }
            }
        }

        private void rollback(ConsumerRecord<K, V> consumerRecord) {
            this.ackInfo.getConsumer().seek(this.ackInfo.getTopicPartition(), consumerRecord.offset());
            Set<KafkaAckInfo<K, V>> set = this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition());
            synchronized (set) {
                if (set.size() > 1) {
                    List list = (List) set.stream().filter(kafkaAckInfo -> {
                        return kafkaAckInfo.getRecord().offset() > consumerRecord.offset();
                    }).map(kafkaAckInfo2 -> {
                        kafkaAckInfo2.setRolledBack(true);
                        return Long.valueOf(kafkaAckInfo2.getRecord().offset());
                    }).collect(Collectors.toList());
                    if (list.size() > 0) {
                        this.logger.warn(() -> {
                            return "Rolled back " + KafkaUtils.format(consumerRecord) + " later in-flight offsets " + String.valueOf(list) + " will also be re-fetched";
                        });
                    }
                }
            }
        }

        private void commitIfPossible(ConsumerRecord<K, V> consumerRecord) {
            if (this.ackInfo.isRolledBack()) {
                this.logger.warn(() -> {
                    return "Cannot commit offset for " + KafkaUtils.format(consumerRecord) + "; an earlier offset was rolled back";
                });
                return;
            }
            Set<KafkaAckInfo<K, V>> set = this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition());
            KafkaAckInfo<K, V> kafkaAckInfo = null;
            synchronized (set) {
                if (set.iterator().next().equals(this.ackInfo)) {
                    ArrayList arrayList = new ArrayList();
                    for (KafkaAckInfo<K, V> kafkaAckInfo2 : set) {
                        if (!this.ackInfo.equals(kafkaAckInfo2)) {
                            if (!kafkaAckInfo2.isAckDeferred()) {
                                break;
                            } else {
                                arrayList.add(kafkaAckInfo2);
                            }
                        }
                    }
                    if (arrayList.size() > 0) {
                        kafkaAckInfo = (KafkaAckInfo) arrayList.get(arrayList.size() - 1);
                        this.commitLogger.log(() -> {
                            return "Committing pending offsets for " + KafkaUtils.format(consumerRecord) + " and all deferred to " + KafkaUtils.format(kafkaAckInfo.getRecord());
                        });
                        Objects.requireNonNull(set);
                        arrayList.forEach((v1) -> {
                            r1.remove(v1);
                        });
                    } else {
                        kafkaAckInfo = this.ackInfo;
                        this.commitLogger.log(() -> {
                            return "Committing offset for " + KafkaUtils.format(consumerRecord);
                        });
                    }
                } else {
                    this.ackInfo.setAckDeferred(true);
                }
                if (kafkaAckInfo != null) {
                    Map singletonMap = Collections.singletonMap(kafkaAckInfo.getTopicPartition(), new OffsetAndMetadata(kafkaAckInfo.getRecord().offset() + 1));
                    if (!this.isSyncCommits) {
                        kafkaAckInfo.getConsumer().commitAsync(singletonMap, this.commitCallback);
                    } else if (this.commitTimeout == null) {
                        kafkaAckInfo.getConsumer().commitSync(singletonMap);
                    } else {
                        kafkaAckInfo.getConsumer().commitSync(singletonMap, this.commitTimeout);
                    }
                } else {
                    this.logger.debug("Deferring commit offset; earlier messages are in flight.");
                }
            }
        }

        public boolean isAcknowledged() {
            return this.acknowledged;
        }

        public void acknowledge() {
            acknowledge(AcknowledgmentCallback.Status.ACCEPT);
        }

        public void noAutoAck() {
            this.autoAckEnabled = false;
        }

        public boolean isAutoAck() {
            return this.autoAckEnabled;
        }
    }

    /* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaMessageSource$KafkaAckCallbackFactory.class */
    public static final class KafkaAckCallbackFactory<K, V> extends Record implements AcknowledgmentCallbackFactory<KafkaAckInfo<K, V>> {
        private final ConsumerProperties consumerProperties;

        public KafkaAckCallbackFactory(ConsumerProperties consumerProperties) {
            this.consumerProperties = consumerProperties;
        }

        public AcknowledgmentCallback createCallback(KafkaAckInfo<K, V> kafkaAckInfo) {
            return new KafkaAckCallback(kafkaAckInfo, this.consumerProperties);
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, KafkaAckCallbackFactory.class), KafkaAckCallbackFactory.class, "consumerProperties", "FIELD:Lorg/springframework/integration/kafka/inbound/KafkaMessageSource$KafkaAckCallbackFactory;->consumerProperties:Lorg/springframework/kafka/listener/ConsumerProperties;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, KafkaAckCallbackFactory.class), KafkaAckCallbackFactory.class, "consumerProperties", "FIELD:Lorg/springframework/integration/kafka/inbound/KafkaMessageSource$KafkaAckCallbackFactory;->consumerProperties:Lorg/springframework/kafka/listener/ConsumerProperties;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, KafkaAckCallbackFactory.class, Object.class), KafkaAckCallbackFactory.class, "consumerProperties", "FIELD:Lorg/springframework/integration/kafka/inbound/KafkaMessageSource$KafkaAckCallbackFactory;->consumerProperties:Lorg/springframework/kafka/listener/ConsumerProperties;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public ConsumerProperties consumerProperties() {
            return this.consumerProperties;
        }
    }

    /* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaMessageSource$KafkaAckInfo.class */
    public interface KafkaAckInfo<K, V> extends Comparable<KafkaAckInfo<K, V>> {
        Object getConsumerMonitor();

        String getGroupId();

        Consumer<K, V> getConsumer();

        ConsumerRecord<K, V> getRecord();

        TopicPartition getTopicPartition();

        Map<TopicPartition, Set<KafkaAckInfo<K, V>>> getOffsets();

        boolean isRolledBack();

        void setRolledBack(boolean z);

        boolean isAckDeferred();

        void setAckDeferred(boolean z);
    }

    /* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaMessageSource$KafkaAckInfoImpl.class */
    public class KafkaAckInfoImpl implements KafkaAckInfo<K, V> {
        private final ConsumerRecord<K, V> record;
        private final TopicPartition topicPartition;
        private volatile boolean rolledBack;
        private volatile boolean ackDeferred;

        KafkaAckInfoImpl(ConsumerRecord<K, V> consumerRecord, TopicPartition topicPartition) {
            this.record = consumerRecord;
            this.topicPartition = topicPartition;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public Object getConsumerMonitor() {
            return KafkaMessageSource.this.consumerLock;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public String getGroupId() {
            return KafkaMessageSource.this.getGroupId();
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public Consumer<K, V> getConsumer() {
            return KafkaMessageSource.this.consumer;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public ConsumerRecord<K, V> getRecord() {
            return this.record;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public TopicPartition getTopicPartition() {
            return this.topicPartition;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public Map<TopicPartition, Set<KafkaAckInfo<K, V>>> getOffsets() {
            return KafkaMessageSource.this.inflightRecords;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public boolean isRolledBack() {
            return this.rolledBack;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public void setRolledBack(boolean z) {
            this.rolledBack = z;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public boolean isAckDeferred() {
            return this.ackDeferred;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public void setAckDeferred(boolean z) {
            this.ackDeferred = z;
        }

        @Override // java.lang.Comparable
        public int compareTo(KafkaAckInfo<K, V> kafkaAckInfo) {
            return Long.compare(this.record.offset(), kafkaAckInfo.getRecord().offset());
        }

        public String toString() {
            return "KafkaAckInfo [record=" + String.valueOf(this.record) + ", rolledBack=" + this.rolledBack + ", ackDeferred=" + this.ackDeferred + "]";
        }
    }

    public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, ConsumerProperties consumerProperties) {
        this(consumerFactory, consumerProperties, new KafkaAckCallbackFactory(consumerProperties), false);
    }

    public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, ConsumerProperties consumerProperties, boolean z) {
        this(consumerFactory, consumerProperties, new KafkaAckCallbackFactory(consumerProperties), z);
    }

    public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, ConsumerProperties consumerProperties, KafkaAckCallbackFactory<K, V> kafkaAckCallbackFactory) {
        this(consumerFactory, consumerProperties, kafkaAckCallbackFactory, false);
    }

    public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, ConsumerProperties consumerProperties, KafkaAckCallbackFactory<K, V> kafkaAckCallbackFactory, boolean z) {
        this.receiveLock = new ReentrantLock();
        this.consumerLock = new ReentrantLock();
        this.inflightRecords = new ConcurrentHashMap();
        this.remainingCount = new AtomicInteger();
        this.assignedPartitions = new LinkedHashSet();
        this.running = new AtomicBoolean();
        this.pausing = new AtomicBoolean();
        this.paused = new AtomicBoolean();
        this.stopped = new AtomicBoolean();
        this.messageConverter = new MessagingMessageConverter();
        this.closeTimeout = Duration.ofSeconds(30L);
        Assert.notNull(consumerFactory, "'consumerFactory' must not be null");
        Assert.notNull(kafkaAckCallbackFactory, "'ackCallbackFactory' must not be null");
        Assert.isTrue((ObjectUtils.isEmpty(consumerProperties.getTopics()) && ObjectUtils.isEmpty(consumerProperties.getTopicPartitions()) && consumerProperties.getTopicPattern() == null) ? false : true, "topics, topicPattern, or topicPartitions must be provided");
        this.consumerProperties = consumerProperties;
        this.consumerFactory = fixOrRejectConsumerFactory(consumerFactory, z);
        this.ackCallbackFactory = kafkaAckCallbackFactory;
        this.pollTimeout = Duration.ofMillis(consumerProperties.getPollTimeout());
        this.assignTimeout = Duration.ofMillis(Math.max(this.pollTimeout.toMillis() * 20, MIN_ASSIGN_TIMEOUT));
        this.commitTimeout = consumerProperties.getSyncCommitTimeout();
        if (JacksonPresent.isJackson2Present()) {
            DefaultKafkaHeaderMapper defaultKafkaHeaderMapper = new DefaultKafkaHeaderMapper();
            defaultKafkaHeaderMapper.addTrustedPackages((String[]) JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
            this.messageConverter.setHeaderMapper(defaultKafkaHeaderMapper);
        }
    }

    public Collection<TopicPartition> getAssignedPartitions() {
        return Collections.unmodifiableCollection(this.assignedPartitions);
    }

    public void setBeanClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
    }

    protected void onInit() {
        if (!StringUtils.hasText(this.consumerProperties.getClientId())) {
            this.consumerProperties.setClientId(getComponentName());
        }
        Properties kafkaConsumerProperties = this.consumerProperties.getKafkaConsumerProperties();
        this.checkNullKeyForExceptions = this.consumerProperties.isCheckDeserExWhenKeyNull() || ErrorHandlingUtils.checkDeserializer(this.consumerFactory, kafkaConsumerProperties, false, this.classLoader);
        this.checkNullValueForExceptions = this.consumerProperties.isCheckDeserExWhenValueNull() || ErrorHandlingUtils.checkDeserializer(this.consumerFactory, kafkaConsumerProperties, true, this.classLoader);
    }

    public ConsumerProperties getConsumerProperties() {
        return this.consumerProperties;
    }

    protected String getGroupId() {
        return this.consumerProperties.getGroupId();
    }

    protected String getClientId() {
        return this.consumerProperties.getClientId();
    }

    protected long getPollTimeout() {
        return this.pollTimeout.toMillis();
    }

    protected RecordMessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(RecordMessageConverter recordMessageConverter) {
        this.messageConverter = recordMessageConverter;
    }

    protected Class<?> getPayloadType() {
        return this.payloadType;
    }

    public void setPayloadType(Class<?> cls) {
        this.payloadType = cls;
    }

    protected ConsumerRebalanceListener getRebalanceListener() {
        return this.consumerProperties.getConsumerRebalanceListener();
    }

    public String getComponentType() {
        return "kafka:message-source";
    }

    protected boolean isRawMessageHeader() {
        return this.rawMessageHeader;
    }

    public void setRawMessageHeader(boolean z) {
        this.rawMessageHeader = z;
    }

    protected Duration getCommitTimeout() {
        return this.commitTimeout;
    }

    public void setCloseTimeout(Duration duration) {
        Assert.notNull(duration, "'closeTimeout' cannot be null");
        this.closeTimeout = duration;
    }

    private ConsumerFactory<K, V> fixOrRejectConsumerFactory(ConsumerFactory<K, V> consumerFactory, boolean z) {
        Object obj = consumerFactory.getConfigurationProperties().get("max.poll.records");
        if (z || !(obj == null || maxPollGtrOne(obj))) {
            return consumerFactory;
        }
        if (!consumerFactory.getClass().getName().equals(DefaultKafkaConsumerFactory.class.getName())) {
            throw new IllegalArgumentException("Custom consumer factory is not configured with 'max.poll.records = 1'");
        }
        this.logger.warn(() -> {
            return "max.poll.records' has been forced from " + String.valueOf(obj == null ? "unspecified" : obj) + " to 1, to avoid having to seek after each record";
        });
        return fixConsumerFactory(consumerFactory);
    }

    private boolean maxPollGtrOne(Object obj) {
        return maxPollNumberGtrOne(obj) || maxPollStringGtr1(obj);
    }

    private boolean maxPollNumberGtrOne(Object obj) {
        return (obj instanceof Number) && ((Number) obj).intValue() != 1;
    }

    private boolean maxPollStringGtr1(Object obj) {
        return (obj instanceof String) && Integer.parseInt((String) obj) != 1;
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public void start() {
        if (this.running.compareAndSet(false, true)) {
            this.stopped.set(false);
        }
    }

    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            stopConsumer();
            this.stopped.set(true);
        }
    }

    public void pause() {
        this.pausing.set(true);
    }

    public void resume() {
        this.pausing.set(false);
    }

    public boolean isPaused() {
        return this.paused.get();
    }

    protected Object doReceive() {
        this.receiveLock.lock();
        try {
            if (this.stopped.get()) {
                this.logger.debug("Message source is stopped; no records will be returned");
                return null;
            }
            if (this.consumer == null) {
                createConsumer();
            }
            if (this.pausing.get() && !this.paused.get() && !this.assignedPartitions.isEmpty()) {
                this.consumer.pause(this.assignedPartitions);
                this.paused.set(true);
            } else if (this.paused.get() && !this.pausing.get()) {
                this.consumer.resume(this.assignedPartitions);
                this.paused.set(false);
            }
            if (this.paused.get() && this.recordsIterator == null) {
                this.logger.debug("Consumer is paused; no records will be returned");
            }
            ConsumerRecord<K, V> pollRecord = pollRecord();
            if (pollRecord != null) {
                return recordToMessage(pollRecord);
            }
            return null;
        } finally {
            this.receiveLock.unlock();
        }
    }

    protected void createConsumer() {
        this.consumerLock.lock();
        try {
            this.consumer = this.consumerFactory.createConsumer(this.consumerProperties.getGroupId(), this.consumerProperties.getClientId(), (String) null, this.consumerProperties.getKafkaConsumerProperties());
            IntegrationConsumerRebalanceListener integrationConsumerRebalanceListener = new IntegrationConsumerRebalanceListener(this.consumerProperties.getConsumerRebalanceListener());
            Pattern topicPattern = this.consumerProperties.getTopicPattern();
            TopicPartitionOffset[] topicPartitions = this.consumerProperties.getTopicPartitions();
            if (topicPattern != null) {
                this.consumer.subscribe(topicPattern, integrationConsumerRebalanceListener);
            } else if (topicPartitions != null) {
                assignAndSeekPartitions(topicPartitions);
            } else {
                this.consumer.subscribe(Arrays.asList(this.consumerProperties.getTopics()), integrationConsumerRebalanceListener);
            }
        } finally {
            this.consumerLock.unlock();
        }
    }

    private void assignAndSeekPartitions(TopicPartitionOffset[] topicPartitionOffsetArr) {
        long position;
        List list = (List) Arrays.stream(topicPartitionOffsetArr).map((v0) -> {
            return v0.getTopicPartition();
        }).collect(Collectors.toList());
        this.consumer.assign(list);
        this.assignedPartitions.addAll(list);
        for (TopicPartitionOffset topicPartitionOffset : topicPartitionOffsetArr) {
            if (TopicPartitionOffset.SeekPosition.BEGINNING.equals(topicPartitionOffset.getPosition())) {
                this.consumer.seekToBeginning(Collections.singleton(topicPartitionOffset.getTopicPartition()));
            } else if (TopicPartitionOffset.SeekPosition.END.equals(topicPartitionOffset.getPosition())) {
                this.consumer.seekToEnd(Collections.singleton(topicPartitionOffset.getTopicPartition()));
            } else {
                TopicPartition topicPartition = topicPartitionOffset.getTopicPartition();
                Long offset = topicPartitionOffset.getOffset();
                if (offset != null) {
                    if (offset.longValue() >= 0) {
                        position = topicPartitionOffset.isRelativeToCurrent() ? this.consumer.position(topicPartition) + offset.longValue() : offset.longValue();
                    } else if (topicPartitionOffset.isRelativeToCurrent()) {
                        position = Math.max(0L, this.consumer.position(topicPartition) + offset.longValue());
                    } else {
                        this.consumer.seekToEnd(Collections.singleton(topicPartition));
                    }
                    try {
                        this.consumer.seek(topicPartition, position);
                    } catch (Exception e) {
                        long j = position;
                        this.logger.error(e, () -> {
                            String valueOf = String.valueOf(topicPartition);
                            this.consumer.position(topicPartition);
                            return "Failed to set initial offset for " + valueOf + " at " + j + ". Position is " + valueOf;
                        });
                    }
                }
            }
        }
    }

    @Nullable
    private ConsumerRecord<K, V> pollRecord() {
        if (this.recordsIterator != null) {
            return nextRecord();
        }
        this.consumerLock.lock();
        try {
            ConsumerRecords poll = this.consumer.poll(this.assignedPartitions.isEmpty() ? this.assignTimeout : this.pollTimeout);
            this.logger.debug(() -> {
                return poll == null ? "Received null" : "Received " + poll.count() + " records";
            });
            if (poll == null || poll.count() == 0) {
                return null;
            }
            this.remainingCount.set(poll.count());
            this.recordsIterator = poll.iterator();
            return nextRecord();
        } catch (WakeupException e) {
            this.logger.debug("Woken");
            if (!this.newAssignment) {
                return null;
            }
            this.newAssignment = false;
            return pollRecord();
        } finally {
            this.consumerLock.unlock();
        }
    }

    private ConsumerRecord<K, V> nextRecord() {
        ConsumerRecord<K, V> next = this.recordsIterator.next();
        if (!this.recordsIterator.hasNext()) {
            this.recordsIterator = null;
        }
        this.remainingCount.decrementAndGet();
        return next;
    }

    private Object recordToMessage(ConsumerRecord<K, V> consumerRecord) {
        if (consumerRecord.value() == null && this.checkNullValueForExceptions) {
            checkDeserializationException(consumerRecord, "springDeserializerExceptionValue");
        }
        if (consumerRecord.key() == null && this.checkNullKeyForExceptions) {
            checkDeserializationException(consumerRecord, "springDeserializerExceptionKey");
        }
        TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
        KafkaAckInfoImpl kafkaAckInfoImpl = new KafkaAckInfoImpl(consumerRecord, topicPartition);
        Acknowledgment createCallback = this.ackCallbackFactory.createCallback((KafkaAckInfo) kafkaAckInfoImpl);
        this.inflightRecords.computeIfAbsent(topicPartition, topicPartition2 -> {
            return Collections.synchronizedSet(new TreeSet());
        }).add(kafkaAckInfoImpl);
        Message message = this.messageConverter.toMessage(consumerRecord, createCallback instanceof Acknowledgment ? createCallback : null, this.consumer, this.payloadType);
        if (!(message.getHeaders() instanceof KafkaMessageHeaders)) {
            AbstractIntegrationMessageBuilder header = getMessageBuilderFactory().fromMessage(message).setHeader("acknowledgmentCallback", createCallback).setHeader(REMAINING_RECORDS, Integer.valueOf(this.remainingCount.get()));
            if (this.rawMessageHeader) {
                header.setHeader("kafka_data", consumerRecord);
                header.setHeader("sourceData", consumerRecord);
            }
            return header;
        }
        Map rawHeaders = message.getHeaders().getRawHeaders();
        rawHeaders.put("acknowledgmentCallback", createCallback);
        rawHeaders.put(REMAINING_RECORDS, Integer.valueOf(this.remainingCount.get()));
        if (this.rawMessageHeader) {
            rawHeaders.put("kafka_data", consumerRecord);
            rawHeaders.put("sourceData", consumerRecord);
        }
        return message;
    }

    private void checkDeserializationException(ConsumerRecord<K, V> consumerRecord, String str) {
        DeserializationException exceptionFromHeader = SerializationUtils.getExceptionFromHeader(consumerRecord, str, this.logger);
        if (exceptionFromHeader != null) {
            throw exceptionFromHeader;
        }
    }

    public void destroy() {
        this.receiveLock.lock();
        try {
            stopConsumer();
        } finally {
            this.receiveLock.unlock();
        }
    }

    private void stopConsumer() {
        this.consumerLock.lock();
        try {
            if (this.consumer != null) {
                this.consumer.wakeup();
                this.consumer.close(this.closeTimeout);
                this.consumer = null;
                this.assignedPartitions.clear();
            }
        } finally {
            this.consumerLock.unlock();
        }
    }

    private static <K, V> ConsumerFactory<K, V> fixConsumerFactory(ConsumerFactory<K, V> consumerFactory) {
        HashMap hashMap = new HashMap(consumerFactory.getConfigurationProperties());
        hashMap.put("max.poll.records", 1);
        DefaultKafkaConsumerFactory defaultKafkaConsumerFactory = new DefaultKafkaConsumerFactory(hashMap);
        if (consumerFactory.getKeyDeserializer() != null) {
            defaultKafkaConsumerFactory.setKeyDeserializer(consumerFactory.getKeyDeserializer());
        }
        if (consumerFactory.getValueDeserializer() != null) {
            defaultKafkaConsumerFactory.setValueDeserializer(consumerFactory.getValueDeserializer());
        }
        return defaultKafkaConsumerFactory;
    }
}
