package com.alipay.sofa.jraft.rhea.client.pd;

import com.alipay.sofa.jraft.Lifecycle;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.rhea.StoreEngine;
import com.alipay.sofa.jraft.rhea.cmd.pd.BaseRequest;
import com.alipay.sofa.jraft.rhea.cmd.pd.BaseResponse;
import com.alipay.sofa.jraft.rhea.cmd.pd.RegionHeartbeatRequest;
import com.alipay.sofa.jraft.rhea.cmd.pd.StoreHeartbeatRequest;
import com.alipay.sofa.jraft.rhea.errors.ErrorsHelper;
import com.alipay.sofa.jraft.rhea.metadata.Instruction;
import com.alipay.sofa.jraft.rhea.metadata.Region;
import com.alipay.sofa.jraft.rhea.metadata.RegionStats;
import com.alipay.sofa.jraft.rhea.metadata.TimeInterval;
import com.alipay.sofa.jraft.rhea.options.HeartbeatOptions;
import com.alipay.sofa.jraft.rhea.rpc.ExtSerializerSupports;
import com.alipay.sofa.jraft.rhea.storage.BaseKVStoreClosure;
import com.alipay.sofa.jraft.rhea.util.Lists;
import com.alipay.sofa.jraft.rhea.util.Pair;
import com.alipay.sofa.jraft.rhea.util.StackTraceUtil;
import com.alipay.sofa.jraft.rhea.util.concurrent.DiscardOldPolicyWithReport;
import com.alipay.sofa.jraft.rhea.util.concurrent.NamedThreadFactory;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.InvokeContext;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.alipay.sofa.jraft.util.timer.HashedWheelTimer;
import com.alipay.sofa.jraft.util.timer.Timeout;
import com.alipay.sofa.jraft.util.timer.TimerTask;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alipay/sofa/jraft/rhea/client/pd/HeartbeatSender.class */
public class HeartbeatSender implements Lifecycle<HeartbeatOptions> {
    private static final Logger LOG = LoggerFactory.getLogger(HeartbeatSender.class);
    private final StoreEngine storeEngine;
    private final PlacementDriverClient pdClient;
    private final RpcClient rpcClient;
    private StatsCollector statsCollector;
    private InstructionProcessor instructionProcessor;
    private int heartbeatRpcTimeoutMillis;
    private ThreadPoolExecutor heartbeatRpcCallbackExecutor;
    private HashedWheelTimer heartbeatTimer;
    private boolean started;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alipay/sofa/jraft/rhea/client/pd/HeartbeatSender$HeartbeatClosure.class */
    public static abstract class HeartbeatClosure<V> extends BaseKVStoreClosure {
        private volatile V result;

        private HeartbeatClosure() {
        }

        public V getResult() {
            return this.result;
        }

        public void setResult(V v) {
            this.result = v;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alipay/sofa/jraft/rhea/client/pd/HeartbeatSender$RegionHeartbeatTask.class */
    public final class RegionHeartbeatTask implements TimerTask {
        private final long nextDelay;
        private final long lastTime;
        private final boolean forceRefreshLeader;

        private RegionHeartbeatTask(long j, long j2, boolean z) {
            this.nextDelay = j;
            this.lastTime = j2;
            this.forceRefreshLeader = z;
        }

        public void run(Timeout timeout) throws Exception {
            try {
                HeartbeatSender.this.sendRegionHeartbeat(this.nextDelay, this.lastTime, this.forceRefreshLeader);
            } catch (Throwable th) {
                HeartbeatSender.LOG.error("Caught a error on sending [RegionHeartbeat]: {}.", StackTraceUtil.stackTrace(th));
            }
        }

        public long getNextDelay() {
            return this.nextDelay;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alipay/sofa/jraft/rhea/client/pd/HeartbeatSender$StoreHeartbeatTask.class */
    public final class StoreHeartbeatTask implements TimerTask {
        private final long nextDelay;
        private final long lastTime;
        private final boolean forceRefreshLeader;

        private StoreHeartbeatTask(long j, long j2, boolean z) {
            this.nextDelay = j;
            this.lastTime = j2;
            this.forceRefreshLeader = z;
        }

        public void run(Timeout timeout) throws Exception {
            try {
                HeartbeatSender.this.sendStoreHeartbeat(this.nextDelay, this.forceRefreshLeader, this.lastTime);
            } catch (Throwable th) {
                HeartbeatSender.LOG.error("Caught a error on sending [StoreHeartbeat]: {}.", StackTraceUtil.stackTrace(th));
            }
        }

        public long getNextDelay() {
            return this.nextDelay;
        }
    }

    public HeartbeatSender(StoreEngine storeEngine) {
        this.storeEngine = storeEngine;
        this.pdClient = storeEngine.getPlacementDriverClient();
        this.rpcClient = ((AbstractPlacementDriverClient) this.pdClient).getRpcClient();
    }

    public synchronized boolean init(HeartbeatOptions heartbeatOptions) {
        if (this.started) {
            LOG.info("[HeartbeatSender] already started.");
            return true;
        }
        this.statsCollector = new StatsCollector(this.storeEngine);
        this.instructionProcessor = new InstructionProcessor(this.storeEngine);
        this.heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("heartbeat-timer", true), 50L, TimeUnit.MILLISECONDS, 4096);
        this.heartbeatRpcTimeoutMillis = heartbeatOptions.getHeartbeatRpcTimeoutMillis();
        if (this.heartbeatRpcTimeoutMillis <= 0) {
            throw new IllegalArgumentException("Heartbeat rpc timeout millis must > 0, " + this.heartbeatRpcTimeoutMillis);
        }
        this.heartbeatRpcCallbackExecutor = ThreadPoolUtil.newBuilder().poolName("rheakv-heartbeat-callback").enableMetric(true).coreThreads(4).maximumThreads(4).keepAliveSeconds(120L).workQueue(new ArrayBlockingQueue(1024)).threadFactory(new NamedThreadFactory("rheakv-heartbeat-callback", true)).rejectedHandler(new DiscardOldPolicyWithReport("rheakv-heartbeat-callback")).build();
        long storeHeartbeatIntervalSeconds = heartbeatOptions.getStoreHeartbeatIntervalSeconds();
        long regionHeartbeatIntervalSeconds = heartbeatOptions.getRegionHeartbeatIntervalSeconds();
        if (storeHeartbeatIntervalSeconds <= 0) {
            throw new IllegalArgumentException("Store heartbeat interval seconds must > 0, " + storeHeartbeatIntervalSeconds);
        }
        if (regionHeartbeatIntervalSeconds <= 0) {
            throw new IllegalArgumentException("Region heartbeat interval seconds must > 0, " + regionHeartbeatIntervalSeconds);
        }
        long currentTimeMillis = System.currentTimeMillis();
        StoreHeartbeatTask storeHeartbeatTask = new StoreHeartbeatTask(storeHeartbeatIntervalSeconds, currentTimeMillis, false);
        RegionHeartbeatTask regionHeartbeatTask = new RegionHeartbeatTask(regionHeartbeatIntervalSeconds, currentTimeMillis, false);
        this.heartbeatTimer.newTimeout(storeHeartbeatTask, storeHeartbeatTask.getNextDelay(), TimeUnit.SECONDS);
        this.heartbeatTimer.newTimeout(regionHeartbeatTask, regionHeartbeatTask.getNextDelay(), TimeUnit.SECONDS);
        LOG.info("[HeartbeatSender] start successfully, options: {}.", heartbeatOptions);
        this.started = true;
        return true;
    }

    public synchronized void shutdown() {
        ExecutorServiceHelper.shutdownAndAwaitTermination(this.heartbeatRpcCallbackExecutor);
        if (this.heartbeatTimer != null) {
            this.heartbeatTimer.stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendStoreHeartbeat(final long j, boolean z, long j2) {
        final long currentTimeMillis = System.currentTimeMillis();
        StoreHeartbeatRequest storeHeartbeatRequest = new StoreHeartbeatRequest();
        storeHeartbeatRequest.setClusterId(this.storeEngine.getClusterId());
        storeHeartbeatRequest.setStats(this.statsCollector.collectStoreStats(new TimeInterval(j2, currentTimeMillis)));
        callAsyncWithRpc(this.pdClient.getPdLeader(z, this.heartbeatRpcTimeoutMillis), storeHeartbeatRequest, new HeartbeatClosure<Object>() { // from class: com.alipay.sofa.jraft.rhea.client.pd.HeartbeatSender.1
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            public void run(Status status) {
                StoreHeartbeatTask storeHeartbeatTask = new StoreHeartbeatTask(j, currentTimeMillis, !status.isOk() && ErrorsHelper.isInvalidPeer(getError()));
                HeartbeatSender.this.heartbeatTimer.newTimeout(storeHeartbeatTask, storeHeartbeatTask.getNextDelay(), TimeUnit.SECONDS);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendRegionHeartbeat(final long j, long j2, boolean z) {
        final long currentTimeMillis = System.currentTimeMillis();
        RegionHeartbeatRequest regionHeartbeatRequest = new RegionHeartbeatRequest();
        regionHeartbeatRequest.setClusterId(this.storeEngine.getClusterId());
        regionHeartbeatRequest.setStoreId(this.storeEngine.getStoreId().longValue());
        regionHeartbeatRequest.setLeastKeysOnSplit(this.storeEngine.getStoreOpts().getLeastKeysOnSplit());
        List<Long> leaderRegionIds = this.storeEngine.getLeaderRegionIds();
        if (leaderRegionIds.isEmpty()) {
            RegionHeartbeatTask regionHeartbeatTask = new RegionHeartbeatTask(j, currentTimeMillis, false);
            this.heartbeatTimer.newTimeout(regionHeartbeatTask, regionHeartbeatTask.getNextDelay(), TimeUnit.SECONDS);
            if (LOG.isInfoEnabled()) {
                LOG.info("So sad, there is no even a region leader on [clusterId:{}, storeId: {}, endpoint:{}].", new Object[]{Long.valueOf(this.storeEngine.getClusterId()), this.storeEngine.getStoreId(), this.storeEngine.getSelfEndpoint()});
                return;
            }
            return;
        }
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(leaderRegionIds.size());
        TimeInterval timeInterval = new TimeInterval(j2, currentTimeMillis);
        Iterator<Long> it = leaderRegionIds.iterator();
        while (it.hasNext()) {
            Region regionById = this.pdClient.getRegionById(it.next().longValue());
            RegionStats collectRegionStats = this.statsCollector.collectRegionStats(regionById, timeInterval);
            if (collectRegionStats != null) {
                newArrayListWithCapacity.add(Pair.of(regionById, collectRegionStats));
            }
        }
        regionHeartbeatRequest.setRegionStatsList(newArrayListWithCapacity);
        callAsyncWithRpc(this.pdClient.getPdLeader(z, this.heartbeatRpcTimeoutMillis), regionHeartbeatRequest, new HeartbeatClosure<List<Instruction>>() { // from class: com.alipay.sofa.jraft.rhea.client.pd.HeartbeatSender.2
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            public void run(Status status) {
                List<Instruction> result;
                boolean isOk = status.isOk();
                if (isOk && (result = getResult()) != null && !result.isEmpty()) {
                    HeartbeatSender.this.instructionProcessor.process(result);
                }
                RegionHeartbeatTask regionHeartbeatTask2 = new RegionHeartbeatTask(j, currentTimeMillis, !isOk && ErrorsHelper.isInvalidPeer(getError()));
                HeartbeatSender.this.heartbeatTimer.newTimeout(regionHeartbeatTask2, regionHeartbeatTask2.getNextDelay(), TimeUnit.SECONDS);
            }
        });
    }

    private <V> void callAsyncWithRpc(final Endpoint endpoint, BaseRequest baseRequest, final HeartbeatClosure<V> heartbeatClosure) {
        InvokeContext invokeContext = new InvokeContext();
        invokeContext.put("BOLT_CTX", ExtSerializerSupports.getInvokeContext());
        try {
            this.rpcClient.invokeAsync(endpoint, baseRequest, invokeContext, new InvokeCallback() { // from class: com.alipay.sofa.jraft.rhea.client.pd.HeartbeatSender.3
                public void complete(Object obj, Throwable th) {
                    if (th != null) {
                        heartbeatClosure.run(new Status(-1, th.getMessage()));
                        return;
                    }
                    BaseResponse baseResponse = (BaseResponse) obj;
                    if (baseResponse.isSuccess()) {
                        heartbeatClosure.setResult(baseResponse.getValue());
                        heartbeatClosure.run(Status.OK());
                    } else {
                        heartbeatClosure.setError(baseResponse.getError());
                        heartbeatClosure.run(new Status(-1, "RPC failed with address: %s, response: %s", new Object[]{endpoint, baseResponse}));
                    }
                }

                public Executor executor() {
                    return HeartbeatSender.this.heartbeatRpcCallbackExecutor;
                }
            }, this.heartbeatRpcTimeoutMillis);
        } catch (Throwable th) {
            heartbeatClosure.run(new Status(-1, th.getMessage()));
        }
    }
}
