/*
 * Decompiled with CFR 0.152.
 */
package org.apache.skywalking.oap.server.analyzer.agent.kafka;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.KafkaHandler;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.server.pool.CustomThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaFetcherHandlerRegister
implements Runnable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaFetcherHandlerRegister.class);
    private ImmutableMap.Builder<String, KafkaHandler> builder = ImmutableMap.builder();
    private ImmutableMap<String, KafkaHandler> handlerMap;
    private List<TopicPartition> topicPartitions = Lists.newArrayList();
    private KafkaConsumer<String, Bytes> consumer = null;
    private final KafkaFetcherConfig config;
    private final boolean isSharding;
    private final Properties properties;
    private int threadPoolSize = Runtime.getRuntime().availableProcessors() * 2;
    private int threadPoolQueueSize = 10000;
    private final ThreadPoolExecutor executor;
    private final boolean enableKafkaMessageAutoCommit;

    public KafkaFetcherHandlerRegister(KafkaFetcherConfig config) {
        this.config = config;
        this.properties = new Properties();
        this.properties.putAll((Map<?, ?>)config.getKafkaConsumerConfig());
        this.properties.setProperty("group.id", config.getGroupId());
        this.properties.setProperty("bootstrap.servers", config.getBootstrapServers());
        this.isSharding = config.isSharding() && StringUtil.isNotEmpty((String)config.getConsumePartitions());
        if (config.getKafkaHandlerThreadPoolSize() > 0) {
            this.threadPoolSize = config.getKafkaHandlerThreadPoolSize();
        }
        if (config.getKafkaHandlerThreadPoolQueueSize() > 0) {
            this.threadPoolQueueSize = config.getKafkaHandlerThreadPoolQueueSize();
        }
        this.enableKafkaMessageAutoCommit = (Boolean)this.properties.getOrDefault((Object)"enable.auto.commit", (Object)true);
        this.consumer = new KafkaConsumer(this.properties, (Deserializer)new StringDeserializer(), (Deserializer)new BytesDeserializer());
        this.executor = new ThreadPoolExecutor(this.threadPoolSize, this.threadPoolSize, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(this.threadPoolQueueSize), (ThreadFactory)new CustomThreadFactory("KafkaConsumer"), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void register(KafkaHandler handler) {
        this.builder.put((Object)handler.getTopic(), (Object)handler);
        this.topicPartitions.addAll(handler.getTopicPartitions());
    }

    public void start() throws ModuleStartException {
        this.handlerMap = this.builder.build();
        this.createTopicIfNeeded((Collection<String>)this.handlerMap.keySet(), this.properties);
        if (this.isSharding) {
            this.consumer.assign(this.topicPartitions);
        } else {
            this.consumer.subscribe((Collection)this.handlerMap.keySet());
        }
        this.consumer.seekToEnd((Collection)this.consumer.assignment());
        this.executor.submit(this);
    }

    @Override
    public void run() {
        while (true) {
            try {
                while (true) {
                    ConsumerRecords consumerRecords;
                    if ((consumerRecords = this.consumer.poll(Duration.ofMillis(500L))).isEmpty()) {
                        continue;
                    }
                    for (ConsumerRecord record : consumerRecords) {
                        this.executor.submit(() -> ((KafkaHandler)this.handlerMap.get((Object)record.topic())).handle((ConsumerRecord<String, Bytes>)record));
                    }
                    if (this.enableKafkaMessageAutoCommit) continue;
                    this.consumer.commitAsync();
                }
            }
            catch (Exception e) {
                log.error("Kafka handle message error.", (Throwable)e);
                continue;
            }
            break;
        }
    }

    private void createTopicIfNeeded(Collection<String> topics, Properties properties) throws ModuleStartException {
        AdminClient adminClient = AdminClient.create((Properties)properties);
        Set missedTopics = adminClient.describeTopics(topics).values().entrySet().stream().map(entry -> {
            try {
                ((KafkaFuture)entry.getValue()).get();
                return null;
            }
            catch (InterruptedException | ExecutionException exception) {
                return (String)entry.getKey();
            }
        }).filter(Objects::nonNull).collect(Collectors.toSet());
        if (!missedTopics.isEmpty()) {
            log.info("Topics" + missedTopics.toString() + " not exist.");
            List newTopicList = missedTopics.stream().map(topic -> new NewTopic(topic, this.config.getPartitions(), (short)this.config.getReplicationFactor())).collect(Collectors.toList());
            try {
                adminClient.createTopics(newTopicList).all().get();
            }
            catch (Exception e) {
                throw new ModuleStartException("Failed to create Kafka Topics" + missedTopics + ".", (Throwable)e);
            }
        }
    }
}

