package com.bluejeans.common.utils.kafka;

import com.bluejeans.common.utils.EnumCounter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bluejeans/common/utils/kafka/SimpleKafkaConsumer.class */
public class SimpleKafkaConsumer<K, V> extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(SimpleKafkaConsumer.class);
    private Deserializer<K> keyDeserializer;
    private Deserializer<V> valueDeserializer;
    private String topic;
    private Map<String, Object> extraProps;
    private KafkaConsumer<K, V> consumer;
    private List<KafkaRecordProcessor<K, V>> recordProcessors;
    private String server = "localhost:9092";
    private String groupId = "local";
    private boolean enableAutoCommit = true;
    private int autoCommitIntervalMillis = 1000;
    private int sessionTimeoutMillis = 30000;
    private boolean specificPartitions = false;
    private int pollTimeout = 1000;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final AtomicBoolean updateNeeded = new AtomicBoolean(false);
    private final EnumCounter<Status> statusCounter = new EnumCounter<>(Status.class);

    /* loaded from: input_file:com/bluejeans/common/utils/kafka/SimpleKafkaConsumer$Status.class */
    public enum Status {
        RECORDS_POLLED,
        PROCESS_ERROR
    }

    @PostConstruct
    public void init() {
        preInit();
        start();
    }

    public void preInit() {
        if (this.keyDeserializer == null) {
            this.keyDeserializer = new ObjectDeserializer();
        }
        if (this.valueDeserializer == null) {
            this.valueDeserializer = new ObjectDeserializer();
        }
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.server);
        properties.put("group.id", this.groupId);
        properties.put("enable.auto.commit", Boolean.valueOf(this.enableAutoCommit));
        properties.put("auto.commit.interval.ms", Integer.valueOf(this.autoCommitIntervalMillis));
        properties.put("session.timeout.ms", Integer.valueOf(this.sessionTimeoutMillis));
        if (this.extraProps != null) {
            properties.putAll(this.extraProps);
        }
        this.consumer = new KafkaConsumer<>(properties, this.keyDeserializer, this.valueDeserializer);
    }

    private void update() {
        if (!this.specificPartitions) {
            this.consumer.subscribe(Arrays.asList(this.topic.split(",")));
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (String str : this.topic.split(",")) {
            String[] split = str.split(":");
            arrayList.add(new TopicPartition(split[0], Integer.parseInt(split[1])));
        }
        this.consumer.assign(arrayList);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        if (this.running.get()) {
            return;
        }
        try {
            try {
                this.running.set(true);
                update();
                while (this.running.get()) {
                    if (this.updateNeeded.get()) {
                        update();
                        this.updateNeeded.set(false);
                    }
                    ConsumerRecords poll = this.consumer.poll(this.pollTimeout);
                    this.statusCounter.incrementEventCount(Status.RECORDS_POLLED, poll.count());
                    if (this.recordProcessors != null) {
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            ConsumerRecord<K, V> consumerRecord = (ConsumerRecord) it.next();
                            for (KafkaRecordProcessor<K, V> kafkaRecordProcessor : this.recordProcessors) {
                                try {
                                    kafkaRecordProcessor.processKafkaRecord(consumerRecord);
                                } catch (RuntimeException e) {
                                    this.statusCounter.incrementEventCount(Status.PROCESS_ERROR);
                                    logger.warn("Failed to process record - " + consumerRecord + " using processor - " + kafkaRecordProcessor, e);
                                }
                            }
                        }
                    }
                }
                this.consumer.close();
            } catch (WakeupException e2) {
                if (this.running.get()) {
                    throw e2;
                }
                this.consumer.close();
            }
        } catch (Throwable th) {
            this.consumer.close();
            throw th;
        }
    }

    public void consumeFromPartition(String str, String str2, KafkaRecordProcessor<K, V>... kafkaRecordProcessorArr) {
        if (this.running.get()) {
            return;
        }
        this.specificPartitions = true;
        this.server = str;
        this.topic = str2;
        setRecordProcessors(Arrays.asList(kafkaRecordProcessorArr));
        init();
    }

    @PreDestroy
    public void shutdown() {
        this.running.set(false);
        this.consumer.wakeup();
    }

    public void reassignTopic(String str) {
        this.topic = str;
        this.updateNeeded.set(true);
    }

    public Set<String> listTopicNames() {
        return this.consumer.listTopics().keySet();
    }

    public String getServer() {
        return this.server;
    }

    public void setServer(String str) {
        this.server = str;
    }

    public String getGroupId() {
        return this.groupId;
    }

    public void setGroupId(String str) {
        this.groupId = str;
    }

    public boolean isEnableAutoCommit() {
        return this.enableAutoCommit;
    }

    public void setEnableAutoCommit(boolean z) {
        this.enableAutoCommit = z;
    }

    public int getAutoCommitIntervalMillis() {
        return this.autoCommitIntervalMillis;
    }

    public void setAutoCommitIntervalMillis(int i) {
        this.autoCommitIntervalMillis = i;
    }

    public int getSessionTimeoutMillis() {
        return this.sessionTimeoutMillis;
    }

    public void setSessionTimeoutMillis(int i) {
        this.sessionTimeoutMillis = i;
    }

    public Deserializer<K> getKeyDeserializer() {
        return this.keyDeserializer;
    }

    public void setKeyDeserializer(Deserializer<K> deserializer) {
        this.keyDeserializer = deserializer;
    }

    public Deserializer<V> getValueDeserializer() {
        return this.valueDeserializer;
    }

    public void setValueDeserializer(Deserializer<V> deserializer) {
        this.valueDeserializer = deserializer;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    public boolean isSpecificPartitions() {
        return this.specificPartitions;
    }

    public void setSpecificPartitions(boolean z) {
        this.specificPartitions = z;
    }

    public int getPollTimeout() {
        return this.pollTimeout;
    }

    public void setPollTimeout(int i) {
        this.pollTimeout = i;
    }

    public KafkaConsumer<K, V> getConsumer() {
        return this.consumer;
    }

    public AtomicBoolean getRunning() {
        return this.running;
    }

    public List<KafkaRecordProcessor<K, V>> getRecordProcessors() {
        return this.recordProcessors;
    }

    public void setRecordProcessors(List<KafkaRecordProcessor<K, V>> list) {
        this.recordProcessors = list;
    }

    public Map<String, Object> getExtraProps() {
        return this.extraProps;
    }

    public void setExtraProps(Map<String, Object> map) {
        this.extraProps = map;
    }

    public AtomicBoolean getUpdateNeeded() {
        return this.updateNeeded;
    }

    public EnumCounter<Status> getStatusCounter() {
        return this.statusCounter;
    }
}
