/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.ipc;

import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.ipc.CallRunner;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.QueueBalancer;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcExecutor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.LimitedPrivate(value={"Coprocesssor", "Phoenix"})
@InterfaceStability.Evolving
public class RWQueueRpcExecutor
extends RpcExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);
    public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.read.ratio";
    public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.scan.ratio";
    private final QueueBalancer writeBalancer;
    private final QueueBalancer readBalancer;
    private final QueueBalancer scanBalancer;
    private final int writeHandlersCount;
    private final int readHandlersCount;
    private final int scanHandlersCount;
    private final int numWriteQueues;
    private final int numReadQueues;
    private final int numScanQueues;
    private final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0);
    private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0);
    private final AtomicInteger activeScanHandlerCount = new AtomicInteger(0);

    public RWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength, PriorityFunction priority, Configuration conf, Abortable abortable) {
        super(name, handlerCount, maxQueueLength, priority, conf, abortable);
        float callqReadShare = this.getReadShare(conf);
        float callqScanShare = this.getScanShare(conf);
        this.numWriteQueues = RWQueueRpcExecutor.calcNumWriters(this.numCallQueues, callqReadShare);
        this.writeHandlersCount = Math.max(this.numWriteQueues, RWQueueRpcExecutor.calcNumWriters(handlerCount, callqReadShare));
        int readQueues = RWQueueRpcExecutor.calcNumReaders(this.numCallQueues, callqReadShare);
        int readHandlers = Math.max(readQueues, RWQueueRpcExecutor.calcNumReaders(handlerCount, callqReadShare));
        int scanQueues = Math.max(0, (int)Math.floor((float)readQueues * callqScanShare));
        int scanHandlers = Math.max(0, (int)Math.floor((float)readHandlers * callqScanShare));
        if (readQueues - scanQueues > 0) {
            readQueues -= scanQueues;
            readHandlers -= scanHandlers;
        } else {
            scanQueues = 0;
            scanHandlers = 0;
        }
        this.numReadQueues = readQueues;
        this.readHandlersCount = readHandlers;
        this.numScanQueues = scanQueues;
        this.scanHandlersCount = scanHandlers;
        this.initializeQueues(this.numWriteQueues);
        this.initializeQueues(this.numReadQueues);
        this.initializeQueues(this.numScanQueues);
        this.writeBalancer = RWQueueRpcExecutor.getBalancer(name, conf, this.queues.subList(0, this.numWriteQueues));
        this.readBalancer = RWQueueRpcExecutor.getBalancer(name, conf, this.queues.subList(this.numWriteQueues, this.numWriteQueues + this.numReadQueues));
        this.scanBalancer = this.numScanQueues > 0 ? RWQueueRpcExecutor.getBalancer(name, conf, this.queues.subList(this.numWriteQueues + this.numReadQueues, this.numWriteQueues + this.numReadQueues + this.numScanQueues)) : null;
        LOG.info(this.getName() + " writeQueues=" + this.numWriteQueues + " writeHandlers=" + this.writeHandlersCount + " readQueues=" + this.numReadQueues + " readHandlers=" + this.readHandlersCount + " scanQueues=" + this.numScanQueues + " scanHandlers=" + this.scanHandlersCount);
    }

    @Override
    protected int computeNumCallQueues(int handlerCount, float callQueuesHandlersFactor) {
        return Math.max(2, Math.round((float)handlerCount * callQueuesHandlersFactor));
    }

    @Override
    protected void startHandlers(int port) {
        this.startHandlers(".write", this.writeHandlersCount, this.queues, 0, this.numWriteQueues, port, this.activeWriteHandlerCount);
        this.startHandlers(".read", this.readHandlersCount, this.queues, this.numWriteQueues, this.numReadQueues, port, this.activeReadHandlerCount);
        if (this.numScanQueues > 0) {
            this.startHandlers(".scan", this.scanHandlersCount, this.queues, this.numWriteQueues + this.numReadQueues, this.numScanQueues, port, this.activeScanHandlerCount);
        }
    }

    @Override
    public boolean dispatch(CallRunner callTask) {
        RpcCall call = callTask.getRpcCall();
        return this.dispatchTo(this.isWriteRequest(call.getHeader(), call.getParam()), this.shouldDispatchToScanQueue(callTask), callTask);
    }

    protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue, CallRunner callTask) {
        int queueIndex = toWriteQueue ? this.writeBalancer.getNextQueue(callTask) : (toScanQueue ? this.numWriteQueues + this.numReadQueues + this.scanBalancer.getNextQueue(callTask) : this.numWriteQueues + this.readBalancer.getNextQueue(callTask));
        Queue queue = (Queue)this.queues.get(queueIndex);
        if (queue.size() >= this.currentQueueLimit) {
            return false;
        }
        return queue.offer(callTask);
    }

    @Override
    public int getWriteQueueLength() {
        int length = 0;
        for (int i = 0; i < this.numWriteQueues; ++i) {
            length += ((BlockingQueue)this.queues.get(i)).size();
        }
        return length;
    }

    @Override
    public int getReadQueueLength() {
        int length = 0;
        for (int i = this.numWriteQueues; i < this.numWriteQueues + this.numReadQueues; ++i) {
            length += ((BlockingQueue)this.queues.get(i)).size();
        }
        return length;
    }

    @Override
    public int getScanQueueLength() {
        int length = 0;
        for (int i = this.numWriteQueues + this.numReadQueues; i < this.numWriteQueues + this.numReadQueues + this.numScanQueues; ++i) {
            length += ((BlockingQueue)this.queues.get(i)).size();
        }
        return length;
    }

    @Override
    public int getActiveHandlerCount() {
        return this.activeWriteHandlerCount.get() + this.activeReadHandlerCount.get() + this.activeScanHandlerCount.get();
    }

    @Override
    public int getActiveWriteHandlerCount() {
        return this.activeWriteHandlerCount.get();
    }

    @Override
    public int getActiveReadHandlerCount() {
        return this.activeReadHandlerCount.get();
    }

    @Override
    public int getActiveScanHandlerCount() {
        return this.activeScanHandlerCount.get();
    }

    protected boolean isWriteRequest(RPCProtos.RequestHeader header, Message param) {
        if (param instanceof ClientProtos.MultiRequest) {
            ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest)param;
            for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
                for (ClientProtos.Action action : regionAction.getActionList()) {
                    if (!action.hasMutation()) continue;
                    return true;
                }
            }
        }
        if (param instanceof ClientProtos.MutateRequest) {
            return true;
        }
        if (param instanceof RegionServerStatusProtos.ReportRegionStateTransitionRequest) {
            return true;
        }
        if (param instanceof RegionServerStatusProtos.RegionServerStartupRequest) {
            return true;
        }
        return param instanceof RegionServerStatusProtos.RegionServerReportRequest;
    }

    QueueBalancer getWriteBalancer() {
        return this.writeBalancer;
    }

    QueueBalancer getReadBalancer() {
        return this.readBalancer;
    }

    QueueBalancer getScanBalancer() {
        return this.scanBalancer;
    }

    private boolean isScanRequest(RPCProtos.RequestHeader header, Message param) {
        return param instanceof ClientProtos.ScanRequest;
    }

    protected boolean shouldDispatchToScanQueue(CallRunner task) {
        RpcCall call = task.getRpcCall();
        return this.numScanQueues > 0 && this.isScanRequest(call.getHeader(), call.getParam());
    }

    protected float getReadShare(Configuration conf) {
        return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0.0f);
    }

    protected float getScanShare(Configuration conf) {
        return conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.0f);
    }

    private static int calcNumWriters(int count, float readShare) {
        return Math.max(1, count - Math.max(1, Math.round((float)count * readShare)));
    }

    private static int calcNumReaders(int count, float readShare) {
        return count - RWQueueRpcExecutor.calcNumWriters(count, readShare);
    }

    @Override
    public void onConfigurationChange(Configuration conf) {
        super.onConfigurationChange(conf);
        this.propagateBalancerConfigChange(this.writeBalancer, conf);
        this.propagateBalancerConfigChange(this.readBalancer, conf);
        this.propagateBalancerConfigChange(this.scanBalancer, conf);
    }

    private void propagateBalancerConfigChange(QueueBalancer balancer, Configuration conf) {
        if (balancer instanceof ConfigurationObserver) {
            ((ConfigurationObserver)balancer).onConfigurationChange(conf);
        }
    }
}

