package com.xiaomi.infra.galaxy.talos.consumer;

import com.google.common.base.Preconditions;
import com.xiaomi.infra.galaxy.rpc.thrift.Credential;
import com.xiaomi.infra.galaxy.talos.admin.TalosAdmin;
import com.xiaomi.infra.galaxy.talos.client.TalosClientFactory;
import com.xiaomi.infra.galaxy.talos.client.TopicAbnormalCallback;
import com.xiaomi.infra.galaxy.talos.client.Utils;
import com.xiaomi.infra.galaxy.talos.thrift.ConsumeUnit;
import com.xiaomi.infra.galaxy.talos.thrift.ConsumerService;
import com.xiaomi.infra.galaxy.talos.thrift.DescribeTopicRequest;
import com.xiaomi.infra.galaxy.talos.thrift.LockWorkerRequest;
import com.xiaomi.infra.galaxy.talos.thrift.QueryWorkerRequest;
import com.xiaomi.infra.galaxy.talos.thrift.QueryWorkerResponse;
import com.xiaomi.infra.galaxy.talos.thrift.RenewRequest;
import com.xiaomi.infra.galaxy.talos.thrift.RenewResponse;
import com.xiaomi.infra.galaxy.talos.thrift.Topic;
import com.xiaomi.infra.galaxy.talos.thrift.TopicTalosResourceName;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import libthrift091.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xiaomi/infra/galaxy/talos/consumer/TalosConsumer.class */
public class TalosConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(TalosConsumer.class);
    private String workerId;
    private Random random;
    private String consumerGroup;
    private MessageProcessorFactory messageProcessorFactory;
    private MessageReaderFactory messageReaderFactory;
    private Map<Integer, PartitionFetcher> partitionFetcherMap;
    private TalosConsumerConfig talosConsumerConfig;
    private TalosClientFactory talosClientFactory;
    private TalosAdmin talosAdmin;
    private ConsumerService.Iface consumerClient;
    private TopicAbnormalCallback topicAbnormalCallback;
    private ReadWriteLock readWriteLock;
    private ScheduledExecutorService partitionScheduledExecutor;
    private ScheduledExecutorService workerScheduleExecutor;
    private ScheduledExecutorService renewScheduleExecutor;
    private ExecutorService reBalanceExecutor;
    private String topicName;
    private int partitionNumber;
    private TopicTalosResourceName topicTalosResourceName;
    private Map<String, List<Integer>> workerInfoMap;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/xiaomi/infra/galaxy/talos/consumer/TalosConsumer$CheckPartitionTask.class */
    public class CheckPartitionTask implements Runnable {
        private CheckPartitionTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Topic describeTopic = TalosConsumer.this.talosAdmin.describeTopic(new DescribeTopicRequest(TalosConsumer.this.topicName));
                if (!TalosConsumer.this.topicTalosResourceName.equals(describeTopic.getTopicInfo().getTopicTalosResourceName())) {
                    String str = "The topic: " + TalosConsumer.this.topicTalosResourceName.getTopicTalosResourceName() + " not exist. It might have been deleted. The getMessage threads will be cancel.";
                    TalosConsumer.LOG.error(str);
                    TalosConsumer.this.cancelAllConsumingTask();
                    TalosConsumer.this.topicAbnormalCallback.abnormalHandler(TalosConsumer.this.topicTalosResourceName, new Throwable(str));
                    return;
                }
                int partitionNumber = describeTopic.getTopicAttribute().getPartitionNumber();
                if (TalosConsumer.this.partitionNumber < partitionNumber) {
                    TalosConsumer.LOG.info("partitionNumber changed from " + TalosConsumer.this.partitionNumber + " to " + partitionNumber + ", execute a re-balance task.");
                    TalosConsumer.this.setPartitionNumber(partitionNumber);
                    TalosConsumer.this.reBalanceExecutor.execute(new ReBalanceTask());
                }
            } catch (Throwable th) {
                TalosConsumer.LOG.error("Exception in CheckPartitionTask: " + th.toString());
                if (Utils.isTopicNotExist(th)) {
                    TalosConsumer.this.cancelAllConsumingTask();
                    TalosConsumer.this.topicAbnormalCallback.abnormalHandler(TalosConsumer.this.topicTalosResourceName, th);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/xiaomi/infra/galaxy/talos/consumer/TalosConsumer$CheckWorkerInfoTask.class */
    public class CheckWorkerInfoTask implements Runnable {
        private CheckWorkerInfoTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                TalosConsumer.this.getWorkerInfo();
            } catch (Throwable th) {
                TalosConsumer.LOG.error("Get worker info error: " + th.toString());
            }
            TalosConsumer.this.reBalanceExecutor.execute(new ReBalanceTask());
        }
    }

    /* loaded from: input_file:com/xiaomi/infra/galaxy/talos/consumer/TalosConsumer$ReBalanceTask.class */
    private class ReBalanceTask implements Runnable {
        private ReBalanceTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            TalosConsumer.this.makeBalance();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/xiaomi/infra/galaxy/talos/consumer/TalosConsumer$ReNewTask.class */
    public class ReNewTask implements Runnable {
        private ReNewTask() {
        }

        private List<Integer> getRenewPartitionList() {
            ArrayList arrayList = new ArrayList();
            TalosConsumer.this.readWriteLock.readLock().lock();
            for (Map.Entry entry : TalosConsumer.this.partitionFetcherMap.entrySet()) {
                if (((PartitionFetcher) entry.getValue()).isHoldingLock()) {
                    arrayList.add(entry.getKey());
                }
            }
            TalosConsumer.this.readWriteLock.readLock().unlock();
            return arrayList;
        }

        @Override // java.lang.Runnable
        public void run() {
            List<Integer> renewPartitionList = getRenewPartitionList();
            RenewRequest renewRequest = new RenewRequest(new ConsumeUnit(TalosConsumer.this.consumerGroup, TalosConsumer.this.topicTalosResourceName, renewPartitionList, TalosConsumer.this.workerId));
            RenewResponse renewResponse = null;
            int reNewMaxRetry = TalosConsumer.this.talosConsumerConfig.getReNewMaxRetry() + 1;
            while (true) {
                int i = reNewMaxRetry;
                reNewMaxRetry--;
                if (i <= 0) {
                    if (renewResponse != null && !renewResponse.isHeartbeatSuccess()) {
                        TalosConsumer.LOG.error("The worker: " + TalosConsumer.this.workerId + " failed to make heartbeat, cancel all consumer task");
                        TalosConsumer.this.cancelAllConsumingTask();
                    }
                    if (renewResponse == null || renewResponse.getFailedPartitionListSize() <= 0) {
                        return;
                    }
                    List<Integer> failedPartitionList = renewResponse.getFailedPartitionList();
                    TalosConsumer.LOG.error("The worker: " + TalosConsumer.this.workerId + " failed to renew partitions: " + failedPartitionList);
                    TalosConsumer.this.releasePartitionLock(failedPartitionList);
                    return;
                }
                try {
                    renewResponse = TalosConsumer.this.consumerClient.renew(renewRequest);
                    if (renewResponse.isHeartbeatSuccess() && renewResponse.getFailedPartitionListSize() == 0) {
                        if (TalosConsumer.LOG.isDebugEnabled()) {
                            TalosConsumer.LOG.debug("The worker: " + TalosConsumer.this.workerId + " success heartbeat and renew partitions: " + renewPartitionList);
                            return;
                        }
                        return;
                    }
                } catch (Throwable th) {
                    TalosConsumer.LOG.error("Worker: " + TalosConsumer.this.workerId + " renew error: " + th.toString());
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/xiaomi/infra/galaxy/talos/consumer/TalosConsumer$WorkerPair.class */
    public class WorkerPair implements Comparable<WorkerPair> {
        private String workerId;
        private int hasPartitionNum;

        private WorkerPair(String str, int i) {
            this.workerId = str;
            this.hasPartitionNum = i;
        }

        @Override // java.lang.Comparable
        public int compareTo(WorkerPair workerPair) {
            int i = workerPair.hasPartitionNum - this.hasPartitionNum;
            return 0 == i ? workerPair.workerId.compareTo(this.workerId) : i;
        }

        public String toString() {
            return "{'" + this.workerId + "', " + this.hasPartitionNum + '}';
        }
    }

    private TalosConsumer(String str, TalosConsumerConfig talosConsumerConfig, Credential credential, TopicTalosResourceName topicTalosResourceName, MessageReaderFactory messageReaderFactory, MessageProcessorFactory messageProcessorFactory, String str2, TopicAbnormalCallback topicAbnormalCallback) throws TException {
        this.workerId = Utils.generateClientId(str2);
        this.random = new Random();
        Utils.checkNameValidity(str);
        this.consumerGroup = str;
        this.messageProcessorFactory = messageProcessorFactory;
        this.messageReaderFactory = messageReaderFactory;
        this.partitionFetcherMap = new ConcurrentHashMap();
        this.talosConsumerConfig = talosConsumerConfig;
        this.talosClientFactory = new TalosClientFactory(this.talosConsumerConfig, credential);
        this.talosAdmin = new TalosAdmin(this.talosClientFactory);
        this.consumerClient = this.talosClientFactory.newConsumerClient();
        this.topicAbnormalCallback = topicAbnormalCallback;
        this.readWriteLock = new ReentrantReadWriteLock();
        this.partitionScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        this.workerScheduleExecutor = Executors.newSingleThreadScheduledExecutor();
        this.renewScheduleExecutor = Executors.newSingleThreadScheduledExecutor();
        this.reBalanceExecutor = Executors.newSingleThreadExecutor();
        LOG.info("The worker: " + this.workerId + " is initializing...");
        checkAndGetTopicInfo(topicTalosResourceName);
        registerSelf();
        getWorkerInfo();
        makeBalance();
        initCheckPartitionTask();
        initCheckWorkerInfoTask();
        initRenewTask();
    }

    public TalosConsumer(String str, TalosConsumerConfig talosConsumerConfig, Credential credential, TopicTalosResourceName topicTalosResourceName, MessageProcessorFactory messageProcessorFactory, String str2, TopicAbnormalCallback topicAbnormalCallback) throws TException {
        this(str, talosConsumerConfig, credential, topicTalosResourceName, new TalosMessageReaderFactory(), messageProcessorFactory, str2, topicAbnormalCallback);
    }

    public TalosConsumer(String str, TalosConsumerConfig talosConsumerConfig, TopicTalosResourceName topicTalosResourceName, MessageProcessorFactory messageProcessorFactory, TopicAbnormalCallback topicAbnormalCallback) throws TException {
        this(str, talosConsumerConfig, new Credential(), topicTalosResourceName, messageProcessorFactory, topicAbnormalCallback);
    }

    public TalosConsumer(String str, TalosConsumerConfig talosConsumerConfig, Credential credential, TopicTalosResourceName topicTalosResourceName, MessageProcessorFactory messageProcessorFactory, TopicAbnormalCallback topicAbnormalCallback) throws TException {
        this(str, talosConsumerConfig, credential, topicTalosResourceName, messageProcessorFactory, "", topicAbnormalCallback);
    }

    public TalosConsumer(String str, TalosConsumerConfig talosConsumerConfig, TopicTalosResourceName topicTalosResourceName, String str2, TopicAbnormalCallback topicAbnormalCallback, ConsumerService.Iface iface, TalosAdmin talosAdmin, Map<Integer, PartitionFetcher> map) throws Exception {
        this.workerId = str2;
        this.random = new Random();
        this.consumerGroup = str;
        this.partitionFetcherMap = map;
        this.talosConsumerConfig = talosConsumerConfig;
        this.talosAdmin = talosAdmin;
        this.consumerClient = iface;
        this.topicAbnormalCallback = topicAbnormalCallback;
        this.readWriteLock = new ReentrantReadWriteLock();
        this.partitionScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        this.workerScheduleExecutor = Executors.newSingleThreadScheduledExecutor();
        this.renewScheduleExecutor = Executors.newSingleThreadScheduledExecutor();
        this.reBalanceExecutor = Executors.newSingleThreadExecutor();
        LOG.info("The worker: " + str2 + " is initializing...");
        checkAndGetTopicInfo(topicTalosResourceName);
        registerSelf();
        getWorkerInfo();
        makeBalance();
        initCheckPartitionTask();
        initCheckWorkerInfoTask();
        initRenewTask();
    }

    private void checkAndGetTopicInfo(TopicTalosResourceName topicTalosResourceName) throws TException {
        this.topicName = Utils.getTopicNameByResourceName(topicTalosResourceName.getTopicTalosResourceName());
        Topic describeTopic = this.talosAdmin.describeTopic(new DescribeTopicRequest(this.topicName));
        if (!topicTalosResourceName.equals(describeTopic.getTopicInfo().getTopicTalosResourceName())) {
            LOG.info("The consumer initialize failed by topic not found");
            throw new IllegalArgumentException("The topic: " + topicTalosResourceName.getTopicTalosResourceName() + " not found");
        }
        setPartitionNumber(describeTopic.getTopicAttribute().getPartitionNumber());
        this.topicTalosResourceName = topicTalosResourceName;
        LOG.info("The worker: " + this.workerId + " check and get topic info done");
    }

    private void registerSelf() throws TException {
        LockWorkerRequest lockWorkerRequest = new LockWorkerRequest(new ConsumeUnit(this.consumerGroup, this.topicTalosResourceName, new ArrayList(), this.workerId));
        int selfRegisterMaxRetry = this.talosConsumerConfig.getSelfRegisterMaxRetry() + 1;
        while (true) {
            int i = selfRegisterMaxRetry;
            selfRegisterMaxRetry--;
            if (i <= 0) {
                LOG.error("The worker: " + this.workerId + " register self failed");
                throw new RuntimeException(this.workerId + " register self failed");
            }
            if (this.consumerClient.lockWorker(lockWorkerRequest).isRegisterSuccess()) {
                LOG.info("The worker: " + this.workerId + " register self success");
                return;
            }
            LOG.warn("The worker: " + this.workerId + " register self failed, make " + selfRegisterMaxRetry + " retry");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void getWorkerInfo() throws TException {
        QueryWorkerResponse queryWorker = this.consumerClient.queryWorker(new QueryWorkerRequest(this.consumerGroup, this.topicTalosResourceName));
        if (queryWorker.getWorkerMapSize() == 0 || !queryWorker.getWorkerMap().containsKey(this.workerId)) {
            return;
        }
        this.readWriteLock.writeLock().lock();
        this.workerInfoMap = queryWorker.getWorkerMap();
        this.readWriteLock.writeLock().unlock();
    }

    private void calculateTargetList(int i, int i2, List<Integer> list) {
        if (i2 == 1) {
            list.add(Integer.valueOf(i));
        } else if (i < i2) {
            for (int i3 = 0; i3 < i; i3++) {
                list.add(1);
            }
        } else {
            int i4 = 0;
            int i5 = i / i2;
            int i6 = i % i2;
            for (int i7 = 0; i7 < i6; i7++) {
                list.add(Integer.valueOf(i5 + 1));
                i4 += i5 + 1;
            }
            for (int i8 = 0; i8 < i2 - i6; i8++) {
                list.add(Integer.valueOf(i5));
                i4 += i5;
            }
            Preconditions.checkArgument(i4 == i);
        }
        Collections.sort(list, Collections.reverseOrder());
        LOG.info("worker: " + this.workerId + " calculate target partitions done: " + list);
    }

    private void calculateWorkerPairs(Map<String, List<Integer>> map, List<WorkerPair> list) {
        for (Map.Entry<String, List<Integer>> entry : map.entrySet()) {
            list.add(new WorkerPair(entry.getKey(), entry.getValue().size()));
        }
        Collections.sort(list);
        LOG.info("worker: " + this.workerId + " calculate sorted worker pairs: " + list);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void makeBalance() {
        int i = this.partitionNumber;
        Map<String, List<Integer>> deepCopyWorkerInfoMap = deepCopyWorkerInfoMap();
        if (!deepCopyWorkerInfoMap.containsKey(this.workerId)) {
            LOG.error("WorkerInfoMap not contains worker: " + this.workerId + ". There may be some error for renew task.");
            return;
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        calculateTargetList(i, deepCopyWorkerInfoMap.size(), arrayList);
        calculateWorkerPairs(deepCopyWorkerInfoMap, arrayList2);
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        int i2 = 0;
        while (true) {
            if (i2 >= arrayList2.size()) {
                break;
            }
            if (arrayList2.get(i2).workerId.equals(this.workerId)) {
                List<Integer> hasList = getHasList();
                int size = hasList.size();
                if (i2 < arrayList.size()) {
                    int intValue = arrayList.get(i2).intValue();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Worker: " + this.workerId + " has: " + size + " target: " + intValue);
                    }
                    if (size != intValue) {
                        if (size > intValue) {
                            int i3 = size - intValue;
                            while (true) {
                                int i4 = i3;
                                i3--;
                                if (i4 <= 0 || hasList.size() <= 0) {
                                    break;
                                } else {
                                    arrayList4.add(hasList.remove(0));
                                }
                            }
                        } else {
                            List<Integer> idlePartitions = getIdlePartitions();
                            if (idlePartitions.size() > 0) {
                                int i5 = intValue - size;
                                while (true) {
                                    int i6 = i5;
                                    i5--;
                                    if (i6 <= 0 || idlePartitions.size() <= 0) {
                                        break;
                                    } else {
                                        arrayList3.add(idlePartitions.remove(this.random.nextInt(idlePartitions.size())));
                                    }
                                }
                            }
                        }
                    }
                }
            } else {
                i2++;
            }
        }
        Preconditions.checkArgument(arrayList3.size() <= 0 || arrayList4.size() <= 0);
        if (arrayList3.size() > 0) {
            stealPartitionLock(arrayList3);
        } else if (arrayList4.size() > 0) {
            releasePartitionLock(arrayList4);
        } else {
            LOG.info("The worker: " + this.workerId + " have reached a balance state.");
        }
    }

    private void stealPartitionLock(List<Integer> list) {
        LOG.info("Worker: " + this.workerId + " try to steal " + list.size() + " partition: " + list);
        this.readWriteLock.writeLock().lock();
        for (Integer num : list) {
            if (!this.partitionFetcherMap.containsKey(num)) {
                this.partitionFetcherMap.put(num, new PartitionFetcher(this.consumerGroup, this.topicName, this.topicTalosResourceName, num.intValue(), this.talosConsumerConfig, this.workerId, this.consumerClient, this.talosClientFactory.newMessageClient(), this.messageProcessorFactory.createProcessor(), this.messageReaderFactory.createMessageReader(this.talosConsumerConfig)));
            }
            this.partitionFetcherMap.get(num).lock();
        }
        this.readWriteLock.writeLock().unlock();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void releasePartitionLock(List<Integer> list) {
        LOG.info("Worker: " + this.workerId + " try to release " + list.size() + " partition: " + list);
        for (Integer num : list) {
            Preconditions.checkArgument(this.partitionFetcherMap.containsKey(num));
            this.partitionFetcherMap.get(num).unlock();
        }
    }

    private void initCheckPartitionTask() {
        this.partitionScheduledExecutor.scheduleWithFixedDelay(new CheckPartitionTask(), this.talosConsumerConfig.getPartitionCheckInterval(), this.talosConsumerConfig.getPartitionCheckInterval(), TimeUnit.MILLISECONDS);
    }

    private void initCheckWorkerInfoTask() {
        this.workerScheduleExecutor.scheduleWithFixedDelay(new CheckWorkerInfoTask(), this.talosConsumerConfig.getWorkerInfoCheckInterval(), this.talosConsumerConfig.getWorkerInfoCheckInterval(), TimeUnit.MILLISECONDS);
    }

    private void initRenewTask() {
        this.renewScheduleExecutor.scheduleAtFixedRate(new ReNewTask(), this.talosConsumerConfig.getReNewCheckInterval(), this.talosConsumerConfig.getReNewCheckInterval(), TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setPartitionNumber(int i) {
        this.readWriteLock.writeLock().lock();
        this.partitionNumber = i;
        this.readWriteLock.writeLock().unlock();
    }

    private List<Integer> getIdlePartitions() {
        this.readWriteLock.readLock().lock();
        Preconditions.checkArgument(this.partitionNumber > 0);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.partitionNumber; i++) {
            arrayList.add(Integer.valueOf(i));
        }
        Iterator<List<Integer>> it = this.workerInfoMap.values().iterator();
        while (it.hasNext()) {
            Iterator<Integer> it2 = it.next().iterator();
            while (it2.hasNext()) {
                arrayList.remove(new Integer(it2.next().intValue()));
            }
        }
        this.readWriteLock.readLock().unlock();
        return arrayList;
    }

    private List<Integer> getHasList() {
        ArrayList arrayList = new ArrayList();
        this.readWriteLock.readLock().lock();
        for (Map.Entry<Integer, PartitionFetcher> entry : this.partitionFetcherMap.entrySet()) {
            if (entry.getValue().isServing()) {
                arrayList.add(entry.getKey());
            }
        }
        this.readWriteLock.readLock().unlock();
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cancelAllConsumingTask() {
        releasePartitionLock(getHasList());
    }

    private void shutDownAllFetcher() {
        Iterator<Map.Entry<Integer, PartitionFetcher>> it = this.partitionFetcherMap.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().shutDown();
        }
    }

    public void shutDown() {
        LOG.info("Worker: " + this.workerId + " is shutting down...");
        shutDownAllFetcher();
        this.partitionScheduledExecutor.shutdownNow();
        this.workerScheduleExecutor.shutdownNow();
        this.renewScheduleExecutor.shutdownNow();
        this.reBalanceExecutor.shutdownNow();
        LOG.info("Worker: " + this.workerId + " shutdown.");
    }

    private Map<String, List<Integer>> deepCopyWorkerInfoMap() {
        this.readWriteLock.readLock().lock();
        HashMap hashMap = new HashMap(this.workerInfoMap.size());
        for (Map.Entry<String, List<Integer>> entry : this.workerInfoMap.entrySet()) {
            hashMap.put(entry.getKey(), new ArrayList(entry.getValue()));
        }
        this.readWriteLock.readLock().unlock();
        return hashMap;
    }
}
