package org.apache.camel.processor.resume.kafka;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.camel.Resumable;
import org.apache.camel.ResumeCache;
import org.apache.camel.Service;
import org.apache.camel.UpdatableConsumerResumeStrategy;
import org.apache.camel.util.StringHelper;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.class */
public abstract class AbstractKafkaResumeStrategy<K, V> implements UpdatableConsumerResumeStrategy<K, V, Resumable<K, V>>, Service {
    public static final int UNLIMITED = -1;
    private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaResumeStrategy.class);
    private final String topic;
    private Consumer<K, V> consumer;
    private Producer<K, V> producer;
    private long errorCount;
    private Duration pollDuration = Duration.ofSeconds(1);
    private final List<Future<RecordMetadata>> sentItems = new ArrayList();
    private final ResumeCache<K, V> resumeCache;
    private boolean subscribed;
    private Properties producerConfig;
    private Properties consumerConfig;

    public AbstractKafkaResumeStrategy(String str, String str2, ResumeCache<K, V> resumeCache) {
        this.topic = str2;
        this.producerConfig = createProducer(str);
        this.consumerConfig = createConsumer(str);
        this.resumeCache = resumeCache;
        init();
    }

    public AbstractKafkaResumeStrategy(String str, ResumeCache<K, V> resumeCache, Properties properties, Properties properties2) {
        this.topic = str;
        this.resumeCache = resumeCache;
        this.producerConfig = properties;
        this.consumerConfig = properties2;
        init();
    }

    public static Properties createProducer(String str) {
        Properties properties = new Properties();
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        StringHelper.notEmpty(str, "bootstrapServers");
        properties.put("bootstrap.servers", str);
        return properties;
    }

    public static Properties createConsumer(String str) {
        Properties properties = new Properties();
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        StringHelper.notEmpty(str, "bootstrapServers");
        properties.put("bootstrap.servers", str);
        String uuid = UUID.randomUUID().toString();
        LOG.debug("Creating consumer with {}[{}]", "group.id", uuid);
        properties.put("group.id", uuid);
        properties.put("enable.auto.commit", Boolean.TRUE.toString());
        return properties;
    }

    public void produce(K k, V v) throws ExecutionException, InterruptedException {
        ProducerRecord producerRecord = new ProducerRecord(this.topic, k, v);
        this.errorCount = 0L;
        this.sentItems.add(this.producer.send(producerRecord, (recordMetadata, exc) -> {
            if (exc != null) {
                LOG.error("Failed to send message {}", exc.getMessage(), exc);
                this.errorCount++;
            }
        }));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void updateLastOffset(Resumable<K, V> resumable) throws Exception {
        Object addressable = resumable.getAddressable();
        Object offset = resumable.getLastOffset().offset();
        LOG.debug("Updating offset on Kafka with key {} to {}", addressable, offset);
        produce(addressable, offset);
        this.resumeCache.add(addressable, offset);
    }

    protected void loadCache() throws Exception {
        subscribe();
        LOG.debug("Loading records from topic {}", this.topic);
        while (true) {
            ConsumerRecords<K, V> consume = consume();
            if (consume.isEmpty()) {
                unsubscribe();
                return;
            }
            Iterator it = consume.iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                LOG.trace("Read from Kafka: {}", consumerRecord.value());
                this.resumeCache.add(consumerRecord.key(), consumerRecord.value());
                if (this.resumeCache.isFull()) {
                    break;
                }
            }
        }
    }

    public void checkAndSubscribe(String str) {
        if (this.subscribed) {
            return;
        }
        this.consumer.subscribe(Collections.singletonList(str));
        this.subscribed = true;
    }

    public void checkAndSubscribe(String str, final long j) {
        if (this.subscribed) {
            return;
        }
        this.consumer.subscribe(Collections.singletonList(str), new ConsumerRebalanceListener() { // from class: org.apache.camel.processor.resume.kafka.AbstractKafkaResumeStrategy.1
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                AbstractKafkaResumeStrategy.this.consumer.seekToEnd(collection);
                for (TopicPartition topicPartition : collection) {
                    long position = AbstractKafkaResumeStrategy.this.consumer.position(topicPartition) - j;
                    if (position >= 0) {
                        AbstractKafkaResumeStrategy.this.consumer.seek(topicPartition, position);
                    } else {
                        AbstractKafkaResumeStrategy.LOG.info("Ignoring the seek command because the initial offset is negative (the topic is likely empty)");
                    }
                }
            }
        });
        this.subscribed = true;
    }

    public abstract void subscribe() throws Exception;

    public void unsubscribe() {
        try {
            this.consumer.unsubscribe();
        } catch (IllegalStateException e) {
            LOG.warn("The consumer is likely already closed. Skipping unsubscribing from {}", this.topic);
        } catch (Exception e2) {
            LOG.error("Error unsubscribing from the Kafka topic {}: {}", new Object[]{this.topic, e2.getMessage(), e2});
        }
    }

    public ConsumerRecords<K, V> consume() {
        return consume(10);
    }

    public ConsumerRecords<K, V> consume(int i) {
        while (i > 0) {
            ConsumerRecords<K, V> poll = this.consumer.poll(this.pollDuration);
            if (!poll.isEmpty()) {
                return poll;
            }
            i--;
        }
        return ConsumerRecords.empty();
    }

    public long getErrorCount() {
        return this.errorCount;
    }

    public List<Future<RecordMetadata>> getSentItems() {
        return Collections.unmodifiableList(this.sentItems);
    }

    public void build() {
        super.build();
    }

    public void init() {
        super.init();
        LOG.debug("Initializing the Kafka resume strategy");
        if (this.consumer == null) {
            this.consumer = new KafkaConsumer(this.consumerConfig);
        }
        if (this.producer == null) {
            this.producer = new KafkaProducer(this.producerConfig);
        }
    }

    public void stop() {
    }

    public void close() throws IOException {
        super.close();
    }

    public void start() {
        LOG.info("Starting the kafka resume strategy");
        try {
            loadCache();
        } catch (Exception e) {
            LOG.error("Failed to load already processed items: {}", e.getMessage(), e);
        }
    }

    public Duration getPollDuration() {
        return this.pollDuration;
    }

    public void setPollDuration(Duration duration) {
        this.pollDuration = (Duration) Objects.requireNonNull(duration, "The poll duration cannot be null");
    }

    protected Consumer<K, V> getConsumer() {
        return this.consumer;
    }

    protected Producer<K, V> getProducer() {
        return this.producer;
    }
}
