/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.loghub.client;

import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.loghub.client.FetchedLogGroup;
import com.aliyun.openservices.loghub.client.InnerFetcherProcessorFactory;
import com.aliyun.openservices.loghub.client.LogHubClientAdapter;
import com.aliyun.openservices.loghub.client.LogHubHeartBeat;
import com.aliyun.openservices.loghub.client.LogThreadFactory;
import com.aliyun.openservices.loghub.client.ShardConsumer;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubShardListener;
import com.aliyun.openservices.loghub.client.throttle.UnlimitedResourceBarrier;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ClientFetcher {
    private final ILogHubProcessorFactory mLogHubProcessorFactory;
    private final LogHubConfig mLogHubConfig;
    private final LogHubHeartBeat mLogHubHeartBeat;
    private LogHubClientAdapter mLogHubClientAdapter;
    private final Map<Integer, ShardConsumer> mShardConsumer = new HashMap<Integer, ShardConsumer>();
    private int _curShardIndex = 0;
    private final List<Integer> mShardList = new ArrayList<Integer>();
    private final Map<Integer, FetchedLogGroup> mCachedData = new HashMap<Integer, FetchedLogGroup>();
    private final ExecutorService mExecutorService = Executors.newCachedThreadPool(new LogThreadFactory());
    private final ScheduledExecutorService mShardListUpdateService = Executors.newScheduledThreadPool(1);
    private final long mShardListUpdateIntervalInMills = 500L;
    private ILogHubShardListener mLogHubShardListener = null;

    public ClientFetcher(LogHubConfig config) throws LogHubClientWorkerException {
        this.mLogHubProcessorFactory = new InnerFetcherProcessorFactory(this);
        this.mLogHubConfig = config;
        this.mLogHubClientAdapter = new LogHubClientAdapter(config);
        int timeout = config.getTimeoutInSeconds();
        try {
            this.mLogHubClientAdapter.CreateConsumerGroup(timeout, config.isConsumeInOrder());
        }
        catch (LogException e) {
            if (e.GetErrorCode().compareToIgnoreCase("ConsumerGroupAlreadyExist") == 0) {
                try {
                    this.mLogHubClientAdapter.UpdateConsumerGroup(timeout, config.isConsumeInOrder());
                }
                catch (LogException e1) {
                    throw new LogHubClientWorkerException("error occour when update consumer group, errorCode: " + e1.GetErrorCode() + ", errorMessage: " + e1.GetErrorMessage());
                }
            }
            throw new LogHubClientWorkerException("error occour when create consumer group, errorCode: " + e.GetErrorCode() + ", errorMessage: " + e.GetErrorMessage());
        }
        this.mLogHubHeartBeat = new LogHubHeartBeat(this.mLogHubClientAdapter, config);
    }

    public void SwitchClient(String accessKeyId, String accessKey) {
        this.mLogHubClientAdapter.SwitchClient(this.mLogHubConfig.getEndpoint(), accessKeyId, accessKey, null);
    }

    public void SwitchClient(String accessKeyId, String accessKey, String stsToken) {
        this.mLogHubClientAdapter.SwitchClient(this.mLogHubConfig.getEndpoint(), accessKeyId, accessKey, stsToken);
    }

    public void start() {
        this.mLogHubHeartBeat.start();
        this.mShardListUpdateService.scheduleWithFixedDelay(new ShardListUpdator(), 0L, 500L, TimeUnit.MILLISECONDS);
    }

    public void shutdown() {
        for (ShardConsumer consumer : this.mShardConsumer.values()) {
            consumer.shutdown();
        }
        this.mExecutorService.shutdown();
        try {
            this.mExecutorService.awaitTermination(30L, TimeUnit.SECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this.mLogHubHeartBeat.stop();
        this.mShardListUpdateService.shutdown();
    }

    public void registerShardListener(ILogHubShardListener listener) {
        this.mLogHubShardListener = listener;
    }

    public ILogHubShardListener getShardListener() {
        return this.mLogHubShardListener;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FetchedLogGroup nextNoBlock() {
        FetchedLogGroup result = null;
        List<Integer> list = this.mShardList;
        synchronized (list) {
            for (int i = 0; i < this.mShardList.size(); ++i) {
                this._curShardIndex %= this.mShardList.size();
                int shardId = this.mShardList.get(this._curShardIndex);
                result = this.mCachedData.get(shardId);
                this.mCachedData.put(shardId, null);
                ShardConsumer consumer = this.mShardConsumer.get(shardId);
                if (consumer != null) {
                    consumer.consume(true);
                }
                this._curShardIndex = (this._curShardIndex + 1) % this.mShardList.size();
                if (result != null) break;
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void saveCheckPoint(int shardId, String cursor, boolean persistent) throws LogHubCheckPointException {
        List<Integer> list = this.mShardList;
        synchronized (list) {
            ShardConsumer consumer = this.mShardConsumer.get(shardId);
            if (consumer == null) {
                throw new LogHubCheckPointException("Invalid shardId when saving checkpoint");
            }
            consumer.saveCheckPoint(cursor, persistent);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateCachedData(int shardId, FetchedLogGroup data) {
        List<Integer> list = this.mShardList;
        synchronized (list) {
            this.mCachedData.put(shardId, data);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cleanCachedData(int shardId) {
        List<Integer> list = this.mShardList;
        synchronized (list) {
            this.mCachedData.remove(shardId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanConsumer(List<Integer> ownedShard) {
        List<Integer> list = this.mShardList;
        synchronized (list) {
            ArrayList<Integer> removeShards = new ArrayList<Integer>();
            for (Map.Entry<Integer, ShardConsumer> shard : this.mShardConsumer.entrySet()) {
                ShardConsumer consumer = shard.getValue();
                if (!ownedShard.contains(shard.getKey())) {
                    consumer.shutdown();
                }
                if (!consumer.isShutdown()) continue;
                this.mShardConsumer.remove(shard.getKey());
                removeShards.add(shard.getKey());
                this.mShardList.remove(shard.getKey());
            }
            Iterator<Map.Entry<Integer, ShardConsumer>> iterator = removeShards.iterator();
            while (iterator.hasNext()) {
                int shard = (Integer)((Object)iterator.next());
                this.mShardConsumer.remove(shard);
            }
            this.mLogHubHeartBeat.unsubscribe(removeShards);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ShardConsumer getConsumer(int shardId) {
        List<Integer> list = this.mShardList;
        synchronized (list) {
            ShardConsumer consumer = this.mShardConsumer.get(shardId);
            if (consumer != null) {
                return consumer;
            }
            consumer = new ShardConsumer(this.mLogHubClientAdapter, shardId, this.mLogHubProcessorFactory.generatorProcessor(), this.mExecutorService, this.mLogHubConfig, this.mLogHubHeartBeat, new UnlimitedResourceBarrier());
            this.mShardConsumer.put(shardId, consumer);
            this.mShardList.add(shardId);
            consumer.consume(true);
            return consumer;
        }
    }

    private class ShardListUpdator
    implements Runnable {
        private ShardListUpdator() {
        }

        @Override
        public void run() {
            try {
                List<Integer> heldShards = ClientFetcher.this.mLogHubHeartBeat.getHeldShards();
                for (int shard : heldShards) {
                    ClientFetcher.this.getConsumer(shard);
                }
                ClientFetcher.this.cleanConsumer(heldShards);
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
    }
}

