package com.xiaomi.infra.galaxy.talos.producer;

import com.google.common.base.Preconditions;
import com.xiaomi.infra.galaxy.rpc.thrift.Credential;
import com.xiaomi.infra.galaxy.talos.admin.TalosAdmin;
import com.xiaomi.infra.galaxy.talos.client.Constants;
import com.xiaomi.infra.galaxy.talos.client.TalosClientFactory;
import com.xiaomi.infra.galaxy.talos.client.TopicAbnormalCallback;
import com.xiaomi.infra.galaxy.talos.client.Utils;
import com.xiaomi.infra.galaxy.talos.thrift.DescribeTopicRequest;
import com.xiaomi.infra.galaxy.talos.thrift.Message;
import com.xiaomi.infra.galaxy.talos.thrift.MessageType;
import com.xiaomi.infra.galaxy.talos.thrift.Topic;
import com.xiaomi.infra.galaxy.talos.thrift.TopicTalosResourceName;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import libthrift091.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xiaomi/infra/galaxy/talos/producer/TalosProducer.class */
public class TalosProducer {
    private final int partitionKeyMinLen = 1;
    private final int partitionKeyMaxLen = 256;
    private AtomicReference<PRODUCER_STATE> producerState;
    private Partitioner partitioner;
    private TopicAbnormalCallback topicAbnormalCallback;
    private UserMessageCallback userMessageCallback;
    private TalosProducerConfig talosProducerConfig;
    private long updatePartitionIdInterval;
    private long lastUpdatePartitionIdTime;
    private long updatePartitionIdMsgNumber;
    private long lastAddMsgNumber;
    private Random random;
    private int maxBufferedMsgNumber;
    private int maxBufferedMsgBytes;
    private BufferedMessageCount bufferedCount;
    private String clientId;
    private TalosClientFactory talosClientFactory;
    private TalosAdmin talosAdmin;
    private String topicName;
    private int partitionNumber;
    private int curPartitionId;
    private TopicTalosResourceName topicTalosResourceName;
    private ExecutorService messageCallbackExecutors;
    private ScheduledExecutorService partitionCheckExecutor;
    private Future partitionCheckFuture;
    private final Map<Integer, PartitionSender> partitionSenderMap;
    private static final Logger LOG = LoggerFactory.getLogger(TalosProducer.class);
    private static final AtomicLong requestId = new AtomicLong(1);
    private static final Object globalLock = new Object();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/xiaomi/infra/galaxy/talos/producer/TalosProducer$CheckPartitionTask.class */
    public class CheckPartitionTask implements Runnable {
        private CheckPartitionTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Topic describeTopic = TalosProducer.this.talosAdmin.describeTopic(new DescribeTopicRequest(TalosProducer.this.topicName));
                if (!TalosProducer.this.topicTalosResourceName.equals(describeTopic.getTopicInfo().getTopicTalosResourceName())) {
                    String str = "The topic: " + TalosProducer.this.topicTalosResourceName.getTopicTalosResourceName() + " not exist. It might have been deleted. The putMessage threads will be cancel.";
                    TalosProducer.LOG.error(str);
                    TalosProducer.this.disableProducer(new Throwable(str));
                } else {
                    int partitionNumber = describeTopic.getTopicAttribute().getPartitionNumber();
                    if (TalosProducer.this.partitionNumber < partitionNumber) {
                        TalosProducer.this.adjustPartitionSender(partitionNumber);
                        TalosProducer.this.setPartitionNumber(partitionNumber);
                    }
                }
            } catch (Throwable th) {
                TalosProducer.LOG.error("Exception in CheckPartitionTask: " + th.toString());
                if (Utils.isTopicNotExist(th)) {
                    TalosProducer.this.disableProducer(th);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/xiaomi/infra/galaxy/talos/producer/TalosProducer$PRODUCER_STATE.class */
    public enum PRODUCER_STATE {
        ACTIVE,
        DISABLED,
        SHUTDOWN
    }

    public TalosProducer(TalosProducerConfig talosProducerConfig, TopicTalosResourceName topicTalosResourceName, TopicAbnormalCallback topicAbnormalCallback, UserMessageCallback userMessageCallback) throws TException {
        this(talosProducerConfig, new Credential(), topicTalosResourceName, new SimplePartitioner(), topicAbnormalCallback, userMessageCallback);
    }

    public TalosProducer(TalosProducerConfig talosProducerConfig, Credential credential, TopicTalosResourceName topicTalosResourceName, TopicAbnormalCallback topicAbnormalCallback, UserMessageCallback userMessageCallback) throws TException {
        this(talosProducerConfig, credential, topicTalosResourceName, new SimplePartitioner(), topicAbnormalCallback, userMessageCallback);
    }

    public TalosProducer(TalosProducerConfig talosProducerConfig, Credential credential, TopicTalosResourceName topicTalosResourceName, Partitioner partitioner, TopicAbnormalCallback topicAbnormalCallback, UserMessageCallback userMessageCallback) throws TException {
        this.partitionKeyMinLen = 1;
        this.partitionKeyMaxLen = Constants.TALOS_PARTITION_KEY_LENGTH_MAXIMAL;
        this.partitionSenderMap = new ConcurrentHashMap();
        this.producerState = new AtomicReference<>(PRODUCER_STATE.ACTIVE);
        this.partitioner = partitioner;
        this.topicAbnormalCallback = topicAbnormalCallback;
        this.userMessageCallback = userMessageCallback;
        this.talosProducerConfig = talosProducerConfig;
        this.updatePartitionIdInterval = this.talosProducerConfig.getUpdatePartitionIdInterval();
        this.lastUpdatePartitionIdTime = System.currentTimeMillis();
        this.updatePartitionIdMsgNumber = this.talosProducerConfig.getUpdatePartitionMsgNum();
        this.lastAddMsgNumber = 0L;
        this.random = new Random();
        this.maxBufferedMsgNumber = this.talosProducerConfig.getMaxBufferedMsgNumber();
        this.maxBufferedMsgBytes = this.talosProducerConfig.getMaxBufferedMsgBytes();
        this.bufferedCount = new BufferedMessageCount(this.maxBufferedMsgNumber, this.maxBufferedMsgBytes);
        this.clientId = Utils.generateClientId();
        this.talosClientFactory = new TalosClientFactory(this.talosProducerConfig, credential);
        this.talosAdmin = new TalosAdmin(this.talosClientFactory);
        checkAndGetTopicInfo(topicTalosResourceName);
        this.messageCallbackExecutors = Executors.newFixedThreadPool(this.talosProducerConfig.getThreadPoolsize());
        this.partitionCheckExecutor = Executors.newSingleThreadScheduledExecutor();
        initPartitionSender();
        initCheckPartitionTask();
        LOG.info("Init a producer for topic: " + topicTalosResourceName.getTopicTalosResourceName() + ", partitions: " + this.partitionNumber);
    }

    public TalosProducer(TalosProducerConfig talosProducerConfig, TopicTalosResourceName topicTalosResourceName, TalosAdmin talosAdmin, TalosClientFactory talosClientFactory, PartitionSender partitionSender, TopicAbnormalCallback topicAbnormalCallback, UserMessageCallback userMessageCallback) throws TException {
        this.partitionKeyMinLen = 1;
        this.partitionKeyMaxLen = Constants.TALOS_PARTITION_KEY_LENGTH_MAXIMAL;
        this.partitionSenderMap = new ConcurrentHashMap();
        this.producerState = new AtomicReference<>(PRODUCER_STATE.ACTIVE);
        this.partitioner = new SimplePartitioner();
        this.topicAbnormalCallback = topicAbnormalCallback;
        this.userMessageCallback = userMessageCallback;
        this.talosProducerConfig = talosProducerConfig;
        this.updatePartitionIdInterval = this.talosProducerConfig.getUpdatePartitionIdInterval();
        this.lastUpdatePartitionIdTime = System.currentTimeMillis();
        this.updatePartitionIdMsgNumber = this.talosProducerConfig.getUpdatePartitionMsgNum();
        this.lastAddMsgNumber = 0L;
        this.random = new Random();
        this.maxBufferedMsgNumber = this.talosProducerConfig.getMaxBufferedMsgNumber();
        this.maxBufferedMsgBytes = this.talosProducerConfig.getMaxBufferedMsgBytes();
        this.bufferedCount = new BufferedMessageCount(this.maxBufferedMsgNumber, this.maxBufferedMsgBytes);
        this.topicTalosResourceName = topicTalosResourceName;
        this.clientId = Utils.generateClientId();
        this.talosClientFactory = talosClientFactory;
        this.talosAdmin = talosAdmin;
        checkAndGetTopicInfo(topicTalosResourceName);
        this.messageCallbackExecutors = Executors.newFixedThreadPool(this.talosProducerConfig.getThreadPoolsize());
        this.partitionCheckExecutor = Executors.newSingleThreadScheduledExecutor();
        initPartitionSender();
        initCheckPartitionTask();
        LOG.info("Init a producer for topic: " + topicTalosResourceName.getTopicTalosResourceName() + ", partitions: " + this.partitionNumber);
    }

    public synchronized void addUserMessage(List<Message> list) throws ProducerNotActiveException {
        int i;
        if (!isActive()) {
            throw new ProducerNotActiveException("Producer is not active, current state: " + this.producerState);
        }
        while (this.bufferedCount.isFull()) {
            synchronized (globalLock) {
                try {
                    globalLock.wait();
                    LOG.info("too many buffered messages, globalLock is active. message number: " + this.bufferedCount.getBufferedMsgNumber() + ", message bytes:  " + this.bufferedCount.getBufferedMsgBytes());
                } catch (InterruptedException e) {
                    LOG.error("addUserMessage global lock wait is interrupt.");
                }
            }
        }
        HashMap hashMap = new HashMap();
        synchronized (this) {
            if (shouldUpdatePartition()) {
                this.curPartitionId = (this.curPartitionId + 1) % this.partitionNumber;
                this.lastUpdatePartitionIdTime = System.currentTimeMillis();
                this.lastAddMsgNumber = 0L;
            }
            i = this.curPartitionId;
            this.lastAddMsgNumber += list.size();
        }
        hashMap.put(Integer.valueOf(i), new ArrayList());
        for (Message message : list) {
            Utils.updateMessage(message, MessageType.BINARY);
            Utils.checkMessageValidity(message);
            if (message.isSetPartitionKey()) {
                checkMessagePartitionKeyValidity(message.getPartitionKey());
                int partitionId = getPartitionId(message.getPartitionKey());
                if (!hashMap.containsKey(Integer.valueOf(partitionId))) {
                    hashMap.put(Integer.valueOf(partitionId), new ArrayList());
                }
                ((List) hashMap.get(Integer.valueOf(partitionId))).add(new UserMessage(message));
            } else {
                ((List) hashMap.get(Integer.valueOf(i))).add(new UserMessage(message));
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            int intValue = ((Integer) entry.getKey()).intValue();
            Preconditions.checkArgument(this.partitionSenderMap.containsKey(Integer.valueOf(intValue)));
            this.partitionSenderMap.get(Integer.valueOf(intValue)).addMessage((List) entry.getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void disableProducer(Throwable th) {
        if (isActive()) {
            this.producerState.set(PRODUCER_STATE.DISABLED);
            stopAndWait();
            this.topicAbnormalCallback.abnormalHandler(this.topicTalosResourceName, th);
        }
    }

    public synchronized void shutdown() {
        if (isActive()) {
            this.producerState.set(PRODUCER_STATE.SHUTDOWN);
            stopAndWait();
        }
    }

    private void stopAndWait() {
        Iterator<Map.Entry<Integer, PartitionSender>> it = this.partitionSenderMap.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().shutdown();
        }
        this.partitionCheckFuture.cancel(false);
        this.partitionCheckExecutor.shutdownNow();
        this.messageCallbackExecutors.shutdownNow();
    }

    public boolean isActive() {
        return this.producerState.get() == PRODUCER_STATE.ACTIVE;
    }

    public boolean isDisabled() {
        return this.producerState.get() == PRODUCER_STATE.DISABLED;
    }

    public boolean isShutdowned() {
        return this.producerState.get() == PRODUCER_STATE.SHUTDOWN;
    }

    private synchronized boolean shouldUpdatePartition() {
        return System.currentTimeMillis() - this.lastUpdatePartitionIdTime >= this.updatePartitionIdInterval || this.lastAddMsgNumber >= this.updatePartitionIdMsgNumber;
    }

    private synchronized void checkAndGetTopicInfo(TopicTalosResourceName topicTalosResourceName) throws TException {
        this.topicName = Utils.getTopicNameByResourceName(topicTalosResourceName.getTopicTalosResourceName());
        Topic describeTopic = this.talosAdmin.describeTopic(new DescribeTopicRequest(this.topicName));
        if (!topicTalosResourceName.equals(describeTopic.getTopicInfo().getTopicTalosResourceName())) {
            throw new IllegalArgumentException("The topic: " + topicTalosResourceName.getTopicTalosResourceName() + " not found");
        }
        this.partitionNumber = describeTopic.getTopicAttribute().getPartitionNumber();
        this.curPartitionId = this.random.nextInt(this.partitionNumber);
        this.topicTalosResourceName = topicTalosResourceName;
    }

    private synchronized void initPartitionSender() {
        for (int i = 0; i < this.partitionNumber; i++) {
            createPartitionSender(i);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void adjustPartitionSender(int i) {
        for (int i2 = this.partitionNumber; i2 < i; i2++) {
            createPartitionSender(i2);
        }
        LOG.info("Adjust partitionSender and partitionNumber from: " + this.partitionNumber + " to: " + i);
    }

    private synchronized void createPartitionSender(int i) {
        this.partitionSenderMap.put(Integer.valueOf(i), new PartitionSender(i, this.topicName, this.topicTalosResourceName, requestId, this.clientId, this.talosProducerConfig, this.talosClientFactory.newMessageClient(), this.userMessageCallback, this.messageCallbackExecutors, globalLock, this));
    }

    private synchronized void initCheckPartitionTask() {
        this.partitionCheckFuture = this.partitionCheckExecutor.scheduleAtFixedRate(new CheckPartitionTask(), this.talosProducerConfig.getCheckPartitionInterval(), this.talosProducerConfig.getCheckPartitionInterval(), TimeUnit.MILLISECONDS);
    }

    private int getPartitionId(String str) {
        return this.partitioner.partition(str, this.partitionNumber);
    }

    private String generatePartitionKey() {
        return UUID.randomUUID().toString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void setPartitionNumber(int i) {
        this.partitionNumber = i;
    }

    private void checkMessagePartitionKeyValidity(String str) {
        Preconditions.checkNotNull(str);
        if (str.length() < 1 || str.length() > 256) {
            throw new IllegalArgumentException("Invalid partition key which length must be at least 1 and at most 1, got " + str.length());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void increaseBufferedCount(int i, int i2) {
        this.bufferedCount.increase(i, i2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void decreaseBufferedCount(int i, int i2) {
        this.bufferedCount.descrease(i, i2);
    }
}
