package com.aliyun.openservices.loghub.client;

import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
import com.aliyun.openservices.loghub.client.throttle.FixedResourceBarrier;
import com.aliyun.openservices.loghub.client.throttle.ResourceBarrier;
import com.aliyun.openservices.loghub.client.throttle.UnlimitedResourceBarrier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/openservices/loghub/client/ClientWorker.class */
public class ClientWorker implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ClientWorker.class);
    private final ILogHubProcessorFactory processorFactory;
    private final LogHubConfig logHubConfig;
    private final LogHubHeartBeat logHubHeartBeat;
    private final Map<Integer, ShardConsumer> shardConsumer;
    private final ExecutorService executorService;
    private final LogHubClientAdapter loghubClient;
    private volatile boolean shutDown;
    private volatile boolean mainLoopExit;
    private ResourceBarrier resourceBarrier;
    private int lastFetchThrottleMinShard;
    private boolean shareThreadPool;

    public ClientWorker(ILogHubProcessorFactory iLogHubProcessorFactory, LogHubConfig logHubConfig) throws LogHubClientWorkerException {
        this(iLogHubProcessorFactory, logHubConfig, null);
    }

    public ClientWorker(ILogHubProcessorFactory iLogHubProcessorFactory, LogHubConfig logHubConfig, ExecutorService executorService) throws LogHubClientWorkerException {
        this.shardConsumer = new HashMap();
        this.shutDown = false;
        this.mainLoopExit = false;
        this.lastFetchThrottleMinShard = 0;
        this.processorFactory = iLogHubProcessorFactory;
        this.logHubConfig = logHubConfig;
        if (executorService == null) {
            this.shareThreadPool = false;
            this.executorService = Executors.newCachedThreadPool(new LogThreadFactory());
        } else {
            this.shareThreadPool = true;
            this.executorService = executorService;
        }
        this.loghubClient = new LogHubClientAdapter(logHubConfig);
        try {
            this.loghubClient.createConsumerGroupIfNotExist(logHubConfig);
            this.logHubHeartBeat = new LogHubHeartBeat(this.loghubClient, logHubConfig);
            int maxInProgressingDataSizeInMB = this.logHubConfig.getMaxInProgressingDataSizeInMB();
            if (maxInProgressingDataSizeInMB > 0) {
                this.resourceBarrier = new FixedResourceBarrier(maxInProgressingDataSizeInMB * 1024 * 1024);
            } else {
                this.resourceBarrier = new UnlimitedResourceBarrier();
            }
        } catch (LogHubClientWorkerException e) {
            this.loghubClient.shutdown();
            throw e;
        }
    }

    public void SwitchClient(String str, String str2) {
        this.loghubClient.SwitchClient(this.logHubConfig.getEndpoint(), str, str2, null);
    }

    public void SwitchClient(String str, String str2, String str3) {
        this.loghubClient.SwitchClient(this.logHubConfig.getEndpoint(), str, str2, str3);
    }

    public void setShardFilter(ShardFilter shardFilter) {
        this.logHubHeartBeat.setShardFilter(shardFilter);
    }

    private static List<Integer> sortShards(List<Integer> list) {
        if (list == null) {
            return Collections.emptyList();
        }
        Collections.sort(list);
        return list;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.logHubHeartBeat.start();
        long fetchIntervalMillis = this.logHubConfig.getFetchIntervalMillis();
        while (!this.shutDown) {
            List<Integer> heldShards = this.logHubHeartBeat.getHeldShards();
            int i = -1;
            Iterator<Integer> it = sortShards(heldShards).iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                if (!consumerForShard(intValue).consume(intValue >= this.lastFetchThrottleMinShard) && i < 0) {
                    i = intValue;
                }
            }
            this.lastFetchThrottleMinShard = Math.max(i, 0);
            cleanConsumer(heldShards);
            LoghubClientUtil.sleep(fetchIntervalMillis);
        }
        this.mainLoopExit = true;
    }

    public void shutdown() {
        this.shutDown = true;
        int i = 0;
        while (!this.mainLoopExit) {
            int i2 = i;
            i++;
            if (i2 >= 20) {
                break;
            } else {
                LoghubClientUtil.sleep(1000L);
            }
        }
        Iterator<ShardConsumer> it = this.shardConsumer.values().iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        if (!this.shareThreadPool) {
            LoghubClientUtil.shutdownThreadPool(this.executorService, 30L);
        }
        this.logHubHeartBeat.stop();
        this.loghubClient.shutdown();
    }

    private void cleanConsumer(List<Integer> list) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Integer, ShardConsumer> entry : this.shardConsumer.entrySet()) {
            int intValue = entry.getKey().intValue();
            if (!list.contains(Integer.valueOf(intValue))) {
                LOG.warn("Shard {} has been assigned to another consumer.", Integer.valueOf(intValue));
                ShardConsumer value = entry.getValue();
                if (value.canBeUnloaded()) {
                    LOG.info("Shutting down consumer of shard: {}", Integer.valueOf(intValue));
                    value.shutdown();
                } else {
                    LOG.warn("Shard {} cannot be unloaded as it's checkpoint has not been committed yet", Integer.valueOf(intValue));
                }
                if (value.isShutdown()) {
                    arrayList.add(Integer.valueOf(intValue));
                    LOG.info("Shard shutdown done, removing from heartbeat list: {}", Integer.valueOf(intValue));
                }
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            this.shardConsumer.remove((Integer) it.next());
        }
        for (Integer num : list) {
            if (!this.shardConsumer.containsKey(num)) {
                arrayList.add(num);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        this.logHubHeartBeat.unsubscribe(arrayList);
        LOG.warn("Cancel heart beating for {}", Arrays.toString(arrayList.toArray()));
    }

    private ShardConsumer consumerForShard(int i) {
        ShardConsumer shardConsumer = this.shardConsumer.get(Integer.valueOf(i));
        if (shardConsumer != null) {
            return shardConsumer;
        }
        ShardConsumer shardConsumer2 = new ShardConsumer(this.loghubClient, i, this.processorFactory.generatorProcessor(), this.executorService, this.logHubConfig, this.logHubHeartBeat, this.resourceBarrier);
        this.shardConsumer.put(Integer.valueOf(i), shardConsumer2);
        LOG.info("Create a consumer for shard: {}", Integer.valueOf(i));
        return shardConsumer2;
    }
}
