/*
 * Decompiled with CFR 0.152.
 */
package com.arcadedb.server.ha;

import com.arcadedb.ContextConfiguration;
import com.arcadedb.GlobalConfiguration;
import com.arcadedb.database.Binary;
import com.arcadedb.exception.TimeoutException;
import com.arcadedb.log.LogManager;
import com.arcadedb.network.binary.ChannelBinaryServer;
import com.arcadedb.network.binary.ConnectionException;
import com.arcadedb.server.ha.HAServer;
import com.arcadedb.server.ha.Replica2LeaderNetworkExecutor;
import com.arcadedb.server.ha.ReplicationException;
import com.arcadedb.server.ha.ReplicationMessage;
import com.arcadedb.server.ha.message.CommandForwardRequest;
import com.arcadedb.server.ha.message.HACommand;
import com.arcadedb.server.ha.message.ReplicaConnectHotResyncResponse;
import com.arcadedb.server.ha.message.TxForwardRequest;
import com.arcadedb.utility.Callable;
import com.arcadedb.utility.FileUtils;
import com.arcadedb.utility.Pair;
import com.conversantmedia.util.concurrent.PushPullBlockingQueue;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

public class Leader2ReplicaNetworkExecutor
extends Thread {
    private final HAServer server;
    private final String remoteServerName;
    private final String remoteServerAddress;
    private final String remoteServerHTTPAddress;
    private final BlockingQueue<Binary> senderQueue;
    private Thread senderThread;
    private final BlockingQueue<Pair<ReplicationMessage, HACommand>> forwarderQueue;
    private Thread forwarderThread;
    private long joinedOn;
    private long leftOn = 0L;
    private ChannelBinaryServer channel;
    private STATUS status = STATUS.JOINING;
    private final Object lock = new Object();
    private final Object channelOutputLock = new Object();
    private final Object channelInputLock = new Object();
    private volatile boolean shutdownCommunication = false;
    private long totalMessages;
    private long totalBytes;
    private long latencyMin;
    private long latencyMax;
    private long latencyTotalTime;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Leader2ReplicaNetworkExecutor(HAServer ha, ChannelBinaryServer channel, String remoteServerName, String remoteServerAddress, String remoteServerHTTPAddress) throws IOException {
        this.server = ha;
        this.remoteServerName = remoteServerName;
        this.remoteServerAddress = remoteServerAddress;
        this.remoteServerHTTPAddress = remoteServerHTTPAddress;
        this.channel = channel;
        ContextConfiguration cfg = ha.getServer().getConfiguration();
        int queueSize = cfg.getValueAsInteger(GlobalConfiguration.HA_REPLICATION_QUEUE_SIZE);
        String cfgQueueImpl = cfg.getValueAsString(GlobalConfiguration.ASYNC_OPERATIONS_QUEUE_IMPL);
        if ("fast".equalsIgnoreCase(cfgQueueImpl)) {
            this.senderQueue = new PushPullBlockingQueue(queueSize);
            this.forwarderQueue = new PushPullBlockingQueue(queueSize);
        } else if ("standard".equalsIgnoreCase(cfgQueueImpl)) {
            this.senderQueue = new ArrayBlockingQueue<Binary>(queueSize);
            this.forwarderQueue = new ArrayBlockingQueue<Pair<ReplicationMessage, HACommand>>(queueSize);
        } else {
            LogManager.instance().log((Object)this, Level.WARNING, "Error on async operation queue implementation setting: %s is not supported", null, (Object)cfgQueueImpl);
            this.senderQueue = new ArrayBlockingQueue<Binary>(queueSize);
            this.forwarderQueue = new ArrayBlockingQueue<Pair<ReplicationMessage, HACommand>>(queueSize);
        }
        this.setName(this.server.getServer().getServerName() + " leader2replica->?");
        Object object = this.channelOutputLock;
        synchronized (object) {
            try {
                if (!ha.isLeader()) {
                    Replica2LeaderNetworkExecutor leader = this.server.getLeader();
                    this.channel.writeBoolean(false);
                    this.channel.writeByte((byte)0);
                    this.channel.writeString("Current server '" + ha.getServerName() + "' is not the Leader");
                    this.channel.writeString(leader != null ? leader.getRemoteServerName() : "");
                    this.channel.writeString(leader != null ? leader.getRemoteAddress() : "");
                    throw new ConnectionException(channel.socket.getInetAddress().toString(), "Current server '" + ha.getServerName() + "' is not the Leader");
                }
                HAServer.ELECTION_STATUS electionStatus = ha.getElectionStatus();
                if (electionStatus != HAServer.ELECTION_STATUS.DONE && electionStatus != HAServer.ELECTION_STATUS.LEADER_WAITING_FOR_QUORUM) {
                    this.channel.writeBoolean(false);
                    this.channel.writeByte((byte)1);
                    this.channel.writeString("Election for the Leader is pending");
                    throw new ConnectionException(channel.socket.getInetAddress().toString(), "Election for Leader is pending");
                }
                this.setName(this.server.getServer().getServerName() + " leader2replica->" + remoteServerName + "(" + remoteServerAddress + ")");
                this.channel.writeBoolean(true);
                this.channel.writeString(this.server.getServerName());
                this.channel.writeLong(this.server.lastElectionVote != null ? (Long)this.server.lastElectionVote.getFirst() : 1L);
                this.channel.writeString(this.server.getServer().getHttpServer().getListeningAddress());
                this.channel.writeString(this.server.getServerAddressList());
                LogManager.instance().log((Object)this, Level.INFO, "Remote Replica server '%s' (%s) successfully connected", (Object)remoteServerName, (Object)remoteServerAddress);
            }
            finally {
                this.channel.flush();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void mergeFrom(Leader2ReplicaNetworkExecutor previousConnection) {
        Object object = previousConnection.lock;
        synchronized (object) {
            this.senderQueue.addAll(previousConnection.senderQueue);
            previousConnection.close();
        }
    }

    @Override
    public void run() {
        LogManager.instance().setContext(this.server.getServerName());
        this.senderThread = new Thread(new Runnable(){

            @Override
            public void run() {
                LogManager.instance().setContext(Leader2ReplicaNetworkExecutor.this.server.getServerName());
                Binary lastMessage = null;
                block6: while (!Leader2ReplicaNetworkExecutor.this.shutdownCommunication || !Leader2ReplicaNetworkExecutor.this.senderQueue.isEmpty()) {
                    try {
                        if (lastMessage == null) {
                            lastMessage = Leader2ReplicaNetworkExecutor.this.senderQueue.poll(500L, TimeUnit.MILLISECONDS);
                        }
                        if (lastMessage == null) continue;
                        if (Leader2ReplicaNetworkExecutor.this.shutdownCommunication) break;
                        switch (Leader2ReplicaNetworkExecutor.this.status) {
                            case ONLINE: {
                                LogManager.instance().log((Object)this, Level.FINE, "Sending message to replica '%s' (msgSize=%d buffered=%d)...", (Object)Leader2ReplicaNetworkExecutor.this.remoteServerName, (Object)lastMessage.size(), (Object)Leader2ReplicaNetworkExecutor.this.senderQueue.size());
                                Leader2ReplicaNetworkExecutor.this.sendMessage(lastMessage);
                                lastMessage = null;
                                continue block6;
                            }
                        }
                        LogManager.instance().log((Object)this, Level.FINE, "Replica '%s' is not online, waiting and retry (buffered=%d)...", (Object)Leader2ReplicaNetworkExecutor.this.remoteServerName, (Object)Leader2ReplicaNetworkExecutor.this.senderQueue.size());
                        Thread.sleep(500L);
                    }
                    catch (IOException e) {
                        LogManager.instance().log((Object)this, Level.INFO, "Error on sending replication message to remote server '%s' (error=%s)", (Object)Leader2ReplicaNetworkExecutor.this.remoteServerName, (Object)e);
                        Leader2ReplicaNetworkExecutor.this.shutdownCommunication = true;
                        return;
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
                LogManager.instance().log((Object)this, Level.FINE, "Replication thread to remote server '%s' is off (buffered=%d)", (Object)Leader2ReplicaNetworkExecutor.this.remoteServerName, (Object)Leader2ReplicaNetworkExecutor.this.senderQueue.size());
            }
        });
        this.senderThread.start();
        this.senderThread.setName(this.server.getServer().getServerName() + " leader2replica-sender->" + this.remoteServerName);
        this.forwarderThread = new Thread(new Runnable(){

            @Override
            public void run() {
                LogManager.instance().setContext(Leader2ReplicaNetworkExecutor.this.server.getServerName());
                Binary buffer = new Binary(8192);
                buffer.setAllocationChunkSize(1024);
                while (!Leader2ReplicaNetworkExecutor.this.shutdownCommunication || !Leader2ReplicaNetworkExecutor.this.forwarderQueue.isEmpty()) {
                    try {
                        Pair<ReplicationMessage, HACommand> lastMessage = Leader2ReplicaNetworkExecutor.this.forwarderQueue.poll(500L, TimeUnit.MILLISECONDS);
                        if (lastMessage == null) continue;
                        if (Leader2ReplicaNetworkExecutor.this.shutdownCommunication) break;
                        Leader2ReplicaNetworkExecutor.this.executeMessage(buffer, lastMessage);
                    }
                    catch (IOException e) {
                        LogManager.instance().log((Object)this, Level.INFO, "Error on sending replication message to remote server '%s' (error=%s)", (Object)Leader2ReplicaNetworkExecutor.this.remoteServerName, (Object)e);
                        Leader2ReplicaNetworkExecutor.this.shutdownCommunication = true;
                        return;
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
                LogManager.instance().log((Object)this, Level.FINE, "Replication thread to remote server '%s' is off (buffered=%d)", (Object)Leader2ReplicaNetworkExecutor.this.remoteServerName, (Object)Leader2ReplicaNetworkExecutor.this.forwarderQueue.size());
            }
        });
        this.forwarderThread.start();
        this.forwarderThread.setName(this.server.getServer().getServerName() + " leader-forwarder");
        Binary buffer = new Binary(8192);
        while (!this.shutdownCommunication) {
            Pair<ReplicationMessage, HACommand> request = null;
            try {
                request = this.server.getMessageFactory().deserializeCommand(buffer, this.readRequest());
                if (request == null) {
                    this.channel.clearInput();
                    continue;
                }
                HACommand command = (HACommand)request.getSecond();
                LogManager.instance().log((Object)this, Level.FINE, "Leader received message %d from replica %s: %s", (Object)((ReplicationMessage)request.getFirst()).messageNumber, (Object)this.remoteServerName, (Object)command);
                if (command instanceof TxForwardRequest || command instanceof CommandForwardRequest) {
                    this.forwarderQueue.put(request);
                    continue;
                }
                this.executeMessage(buffer, request);
            }
            catch (TimeoutException e) {
                LogManager.instance().log((Object)this, Level.FINE, "Request %s in timeout (cause=%s)", request, (Object)e.getCause());
            }
            catch (IOException e) {
                LogManager.instance().log((Object)this, Level.FINE, "IO Error from reading requests (cause=%s)", e.getCause());
                this.server.setReplicaStatus(this.remoteServerName, false);
                this.close();
            }
            catch (Exception e) {
                LogManager.instance().log((Object)this, Level.SEVERE, "Generic error during applying of request from Leader (cause=%s)", (Object)e.toString());
                this.server.setReplicaStatus(this.remoteServerName, false);
                this.close();
            }
        }
    }

    public int getMessagesInQueue() {
        return this.senderQueue.size();
    }

    private void executeMessage(Binary buffer, Pair<ReplicationMessage, HACommand> request) throws IOException {
        ReplicationMessage message = (ReplicationMessage)request.getFirst();
        HACommand response = ((HACommand)request.getSecond()).execute(this.server, this.remoteServerName, message.messageNumber);
        if (response != null) {
            this.server.getMessageFactory().serializeCommand(response, buffer, message.messageNumber);
            LogManager.instance().log((Object)this, Level.FINE, "Request %s -> %s to '%s'", request.getSecond(), (Object)response, (Object)this.remoteServerName);
            this.sendMessage(buffer);
            if (response instanceof ReplicaConnectHotResyncResponse) {
                ReplicaConnectHotResyncResponse resyncResponse = (ReplicaConnectHotResyncResponse)response;
                this.server.resendMessagesToReplica(resyncResponse.getMessageNumber(), this.remoteServerName);
                this.server.setReplicaStatus(this.remoteServerName, true);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private byte[] readRequest() throws IOException {
        Object object = this.channelInputLock;
        synchronized (object) {
            return this.channel.readBytes();
        }
    }

    public void closeChannel() {
        ChannelBinaryServer c = this.channel;
        if (c != null) {
            c.close();
            this.channel = null;
        }
    }

    public void close() {
        this.executeInLock((Callable<Object, Object>)((Callable)ignore -> {
            this.shutdownCommunication = true;
            try {
                Thread ft;
                Thread qt = this.senderThread;
                if (qt != null) {
                    try {
                        qt.join(1000L);
                        this.senderThread = null;
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                if ((ft = this.forwarderThread) != null) {
                    try {
                        ft.join(1000L);
                        this.forwarderThread = null;
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                this.closeChannel();
            }
            catch (Exception exception) {
                // empty catch block
            }
            return null;
        }));
    }

    public boolean enqueueMessage(final long msgNumber, final Binary message) {
        if (this.status == STATUS.OFFLINE) {
            return false;
        }
        return (Boolean)this.executeInLock(new Callable<Object, Object>(){

            public Object call(Object iArgument) {
                if (Leader2ReplicaNetworkExecutor.this.senderQueue.size() > 1) {
                    LogManager.instance().log((Object)this, Level.FINE, "Buffering request %d to server '%s' (status=%s buffered=%d)", (Object)msgNumber, (Object)Leader2ReplicaNetworkExecutor.this.remoteServerName, (Object)Leader2ReplicaNetworkExecutor.this.status, (Object)Leader2ReplicaNetworkExecutor.this.senderQueue.size());
                }
                if (!Leader2ReplicaNetworkExecutor.this.senderQueue.offer(message)) {
                    if (Leader2ReplicaNetworkExecutor.this.status == STATUS.OFFLINE) {
                        return false;
                    }
                    LogManager.instance().log((Object)this, Level.WARNING, "Applying back-pressure on replicating messages to server '%s' (latency=%s buffered=%d)...", (Object)Leader2ReplicaNetworkExecutor.this.getRemoteServerName(), (Object)Leader2ReplicaNetworkExecutor.this.getLatencyStats(), (Object)Leader2ReplicaNetworkExecutor.this.senderQueue.size());
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new ReplicationException("Error on replicating to server '" + Leader2ReplicaNetworkExecutor.this.remoteServerName + "'");
                    }
                    if (Leader2ReplicaNetworkExecutor.this.status == STATUS.OFFLINE) {
                        return false;
                    }
                    if (!Leader2ReplicaNetworkExecutor.this.senderQueue.offer(message)) {
                        LogManager.instance().log((Object)this, Level.INFO, "Timeout on writing request to server '%s', setting it offline...", (Object)Leader2ReplicaNetworkExecutor.this.getRemoteServerName());
                        Leader2ReplicaNetworkExecutor.this.senderQueue.clear();
                        Leader2ReplicaNetworkExecutor.this.server.setReplicaStatus(Leader2ReplicaNetworkExecutor.this.remoteServerName, false);
                        throw new ReplicationException("Replica '" + Leader2ReplicaNetworkExecutor.this.remoteServerName + "' is not reading replication messages");
                    }
                }
                Leader2ReplicaNetworkExecutor.this.totalBytes += (long)message.size();
                return true;
            }
        });
    }

    public void setStatus(final STATUS status) {
        if (this.status == status) {
            return;
        }
        this.executeInLock(new Callable<Object, Object>(){

            public Object call(Object iArgument) {
                Leader2ReplicaNetworkExecutor.this.status = status;
                LogManager.instance().log((Object)this, Level.INFO, "Replica server '%s' is %s", (Object)Leader2ReplicaNetworkExecutor.this.remoteServerName, (Object)status);
                long l = Leader2ReplicaNetworkExecutor.this.leftOn = status == STATUS.OFFLINE ? 0L : System.currentTimeMillis();
                if (status == STATUS.ONLINE) {
                    Leader2ReplicaNetworkExecutor.this.joinedOn = System.currentTimeMillis();
                    Leader2ReplicaNetworkExecutor.this.leftOn = 0L;
                } else if (status == STATUS.OFFLINE) {
                    Leader2ReplicaNetworkExecutor.this.leftOn = System.currentTimeMillis();
                    Leader2ReplicaNetworkExecutor.this.close();
                }
                return null;
            }
        });
        if (this.server.getServer().isStarted()) {
            this.server.printClusterConfiguration();
        }
    }

    public String getRemoteServerName() {
        return this.remoteServerName;
    }

    public String getRemoteServerAddress() {
        return this.remoteServerAddress;
    }

    public String getRemoteServerHTTPAddress() {
        return this.remoteServerHTTPAddress;
    }

    public long getJoinedOn() {
        return this.joinedOn;
    }

    public long getLeftOn() {
        return this.leftOn;
    }

    public void updateStats(long sentOn, long receivedOn) {
        ++this.totalMessages;
        long delta = receivedOn - sentOn;
        this.latencyTotalTime += delta;
        if (this.latencyMin == -1L || delta < this.latencyMin) {
            this.latencyMin = delta;
        }
        if (delta > this.latencyMax) {
            this.latencyMax = delta;
        }
    }

    public STATUS getStatus() {
        return this.status;
    }

    public String getLatencyStats() {
        if (this.totalMessages == 0L) {
            return "";
        }
        return "avg=" + this.latencyTotalTime / this.totalMessages + " (min=" + this.latencyMin + " max=" + this.latencyMax + ")";
    }

    public String getThroughputStats() {
        if (this.totalBytes == 0L) {
            return "";
        }
        return FileUtils.getSizeAsString((long)this.totalBytes) + " (" + FileUtils.getSizeAsString((long)((int)((double)this.totalBytes / (double)(System.currentTimeMillis() - this.joinedOn) * 1000.0))) + "/s)";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendMessage(Binary msg) throws IOException {
        Object object = this.channelOutputLock;
        synchronized (object) {
            ChannelBinaryServer c = this.channel;
            if (c == null) {
                this.close();
                throw new IOException("Channel closed");
            }
            c.writeVarLengthBytes(msg.getContent(), msg.size());
            c.flush();
        }
    }

    @Override
    public String toString() {
        return this.remoteServerName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Object executeInLock(Callable<Object, Object> callback) {
        Object object = this.lock;
        synchronized (object) {
            return callback.call(null);
        }
    }

    public static enum STATUS {
        JOINING,
        OFFLINE,
        ONLINE;

    }
}

