/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener.adapter;

import java.math.BigInteger;
import java.time.Clock;
import java.time.Instant;
import java.util.Optional;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.jspecify.annotations.Nullable;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.KafkaBackoffException;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.TimestampedException;
import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;

public class KafkaBackoffAwareMessageListenerAdapter<K, V>
extends AbstractDelegatingMessageListenerAdapter<MessageListener<K, V>>
implements AcknowledgingConsumerAwareMessageListener<K, V> {
    private final String listenerId;
    private final String backoffTimestampHeader;
    private final Clock clock;
    private final KafkaConsumerBackoffManager kafkaConsumerBackoffManager;

    public KafkaBackoffAwareMessageListenerAdapter(MessageListener<K, V> delegate, KafkaConsumerBackoffManager kafkaConsumerBackoffManager, String listenerId, String backoffTimestampHeader, Clock clock) {
        super(delegate);
        this.listenerId = listenerId;
        this.kafkaConsumerBackoffManager = kafkaConsumerBackoffManager;
        this.backoffTimestampHeader = backoffTimestampHeader;
        this.clock = clock;
    }

    public KafkaBackoffAwareMessageListenerAdapter(MessageListener<K, V> adapter, KafkaConsumerBackoffManager kafkaConsumerBackoffManager, String listenerId, Clock clock) throws KafkaBackoffException {
        this(adapter, kafkaConsumerBackoffManager, listenerId, "retry_topic-backoff-timestamp", clock);
    }

    @Override
    public void onMessage(ConsumerRecord<K, V> consumerRecord, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer) {
        this.maybeGetBackoffTimestamp(consumerRecord).ifPresent(nextExecutionTimestamp -> this.kafkaConsumerBackoffManager.backOffIfNecessary(this.createContext(consumerRecord, (long)nextExecutionTimestamp, consumer)));
        try {
            this.invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer);
        }
        catch (Exception ex) {
            throw new TimestampedException(ex, Instant.now(this.clock));
        }
    }

    private void invokeDelegateOnMessage(ConsumerRecord<K, V> consumerRecord, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer) {
        switch (this.delegateType) {
            case ACKNOWLEDGING_CONSUMER_AWARE: {
                ((MessageListener)this.delegate).onMessage(consumerRecord, acknowledgment, consumer);
                break;
            }
            case ACKNOWLEDGING: {
                ((MessageListener)this.delegate).onMessage(consumerRecord, acknowledgment);
                break;
            }
            case CONSUMER_AWARE: {
                ((MessageListener)this.delegate).onMessage(consumerRecord, consumer);
                break;
            }
            case SIMPLE: {
                ((MessageListener)this.delegate).onMessage(consumerRecord);
            }
        }
    }

    private KafkaConsumerBackoffManager.Context createContext(ConsumerRecord<K, V> data, long nextExecutionTimestamp, @Nullable Consumer<?, ?> consumer) {
        return this.kafkaConsumerBackoffManager.createContext(nextExecutionTimestamp, this.listenerId, new TopicPartition(data.topic(), data.partition()), consumer);
    }

    private Optional<Long> maybeGetBackoffTimestamp(ConsumerRecord<K, V> data) {
        return Optional.ofNullable(data.headers().lastHeader(this.backoffTimestampHeader)).map(timestampHeader -> new BigInteger(timestampHeader.value()).longValue());
    }

    @Override
    public void onMessage(ConsumerRecord<K, V> data) {
        this.onMessage(data, (Acknowledgment)null, (Consumer<?, ?>)null);
    }

    @Override
    public void onMessage(ConsumerRecord<K, V> data, @Nullable Acknowledgment acknowledgment) {
        this.onMessage(data, acknowledgment, (Consumer<?, ?>)null);
    }

    @Override
    public void onMessage(ConsumerRecord<K, V> data, @Nullable Consumer<?, ?> consumer) {
        this.onMessage(data, (Acknowledgment)null, consumer);
    }
}

