/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.loghub.client;

import com.aliyun.openservices.loghub.client.LogHubClientAdapter;
import com.aliyun.openservices.loghub.client.LogThreadFactory;
import com.aliyun.openservices.loghub.client.LoghubClientUtil;
import com.aliyun.openservices.loghub.client.ShardFilter;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogHubHeartBeat {
    private static final Logger LOG = LoggerFactory.getLogger(LogHubHeartBeat.class);
    private static final long STOP_TIMEOUT_SECS = 10L;
    private ScheduledExecutorService executorService;
    private final LogHubClientAdapter client;
    private final long intervalMills;
    private final int timeoutSecs;
    private Set<Integer> heldShards;
    private Set<Integer> heartShards;
    private long lastSuccessTime;
    private ShardFilter shardFilter;

    public LogHubHeartBeat(LogHubClientAdapter client, LogHubConfig config) {
        this.client = client;
        this.intervalMills = config.getHeartBeatIntervalMillis();
        this.timeoutSecs = config.getTimeoutInSeconds();
        this.heldShards = new HashSet<Integer>();
        this.heartShards = new HashSet<Integer>();
    }

    public void start() {
        this.executorService = Executors.newScheduledThreadPool(1, new LogThreadFactory());
        this.executorService.scheduleWithFixedDelay(new HeartBeatRunnable(), 0L, this.intervalMills, TimeUnit.MILLISECONDS);
        LOG.info("Background heartbeat thread started, interval {}", (Object)this.intervalMills);
    }

    public synchronized void setShardFilter(ShardFilter shardFilter) {
        this.shardFilter = shardFilter;
    }

    public void stop() {
        LoghubClientUtil.shutdownThreadPool(this.executorService, 10L);
    }

    public synchronized List<Integer> getHeldShards() {
        return this.shardFilter == null ? new ArrayList<Integer>(this.heldShards) : this.shardFilter.filter(new ArrayList<Integer>(this.heldShards));
    }

    public synchronized void unsubscribe(List<Integer> shards) {
        this.heartShards.removeAll(shards);
    }

    private synchronized void unsubscribeIdle(Set<Integer> shards) {
        if (this.shardFilter == null || shards.isEmpty()) {
            return;
        }
        List<Integer> filtered = this.shardFilter.filter(new ArrayList<Integer>(shards));
        ArrayList<Integer> additional = new ArrayList<Integer>();
        for (Integer shard : shards) {
            if (filtered.contains(shard)) continue;
            additional.add(shard);
        }
        this.unsubscribe(additional);
    }

    private synchronized void heartBeat() {
        block2: {
            long nowMillis = System.currentTimeMillis();
            try {
                LOG.debug("Sending heartbeat {}", (Object)Arrays.toString(this.heartShards.toArray()));
                List<Integer> shards = this.client.HeartBeat(new ArrayList<Integer>(this.heartShards));
                LOG.info("Heartbeat response: {}", shards);
                this.heldShards = new HashSet<Integer>(shards);
                this.heartShards.addAll(shards);
                this.lastSuccessTime = nowMillis;
            }
            catch (Exception ex) {
                LOG.error("Error sending heartbeat, project {}, logstore {}, consumer {}", new Object[]{this.client.getProject(), this.client.getLogstore(), this.client.getConsumer(), ex});
                if (nowMillis - this.lastSuccessTime <= (long)(this.timeoutSecs * 1000) + this.intervalMills) break block2;
                this.heldShards.clear();
                this.unsubscribeIdle(this.heartShards);
                LOG.warn("Heartbeat failed since {}, clear held shards", (Object)this.lastSuccessTime);
            }
        }
    }

    public synchronized void markIdle(int shard) {
        this.heldShards.remove(shard);
    }

    private class HeartBeatRunnable
    implements Runnable {
        private HeartBeatRunnable() {
        }

        @Override
        public void run() {
            LogHubHeartBeat.this.heartBeat();
        }
    }
}

