package org.apache.camel.processor.resume.kafka;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.camel.resume.Deserializable;
import org.apache.camel.resume.Resumable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.class */
public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNodeKafkaResumeStrategy<K> {
    private static final Logger LOG = LoggerFactory.getLogger(MultiNodeKafkaResumeStrategy.class);
    private final ExecutorService executorService;

    public MultiNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration kafkaResumeStrategyConfiguration) {
        this(kafkaResumeStrategyConfiguration, Executors.newSingleThreadExecutor());
    }

    public MultiNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration kafkaResumeStrategyConfiguration, ExecutorService executorService) {
        super(kafkaResumeStrategyConfiguration);
        this.executorService = executorService;
    }

    @Override // org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy
    protected void poll() {
        poll(getConsumer());
    }

    protected void poll(Consumer<byte[], byte[]> consumer) {
        Deserializable adapter = getAdapter();
        while (true) {
            ConsumerRecords<byte[], byte[]> consume = consume(10, consumer);
            if (consume.isEmpty()) {
                return;
            }
            Iterator it = consume.iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                LOG.trace("Read from Kafka: {}", (byte[]) consumerRecord.value());
                adapter.deserialize(ByteBuffer.wrap((byte[]) consumerRecord.key()), ByteBuffer.wrap((byte[]) consumerRecord.value()));
            }
        }
    }

    @Override // org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy
    public void loadCache() throws Exception {
        super.loadCache();
        this.executorService.submit(() -> {
            refresh();
        });
    }

    private void refresh() {
        LOG.trace("Creating a offset cache refresher");
        try {
            Properties properties = (Properties) getResumeStrategyConfiguration().getConsumerProperties().clone();
            properties.setProperty("group.id", UUID.randomUUID().toString());
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            try {
                kafkaConsumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
                poll(kafkaConsumer);
                kafkaConsumer.close();
            } finally {
            }
        } catch (Exception e) {
            LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e);
        }
    }

    @Override // org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy
    public void stop() {
        try {
            this.executorService.shutdown();
        } finally {
            super.stop();
        }
    }
}
