/*
 * Decompiled with CFR 0.152.
 */
package com.arcadedb.server.ha;

import com.arcadedb.database.Binary;
import com.arcadedb.database.DatabaseContext;
import com.arcadedb.database.DatabaseFactory;
import com.arcadedb.database.DatabaseInternal;
import com.arcadedb.engine.ComponentFile;
import com.arcadedb.log.LogManager;
import com.arcadedb.network.HostUtil;
import com.arcadedb.network.binary.ChannelBinaryClient;
import com.arcadedb.network.binary.ConnectionException;
import com.arcadedb.network.binary.NetworkProtocolException;
import com.arcadedb.network.binary.ServerIsNotTheLeaderException;
import com.arcadedb.server.ReplicationCallback;
import com.arcadedb.server.ServerDatabase;
import com.arcadedb.server.ServerException;
import com.arcadedb.server.ha.HAServer;
import com.arcadedb.server.ha.ReplicationException;
import com.arcadedb.server.ha.ReplicationMessage;
import com.arcadedb.server.ha.message.DatabaseStructureRequest;
import com.arcadedb.server.ha.message.DatabaseStructureResponse;
import com.arcadedb.server.ha.message.FileContentRequest;
import com.arcadedb.server.ha.message.FileContentResponse;
import com.arcadedb.server.ha.message.HACommand;
import com.arcadedb.server.ha.message.ReplicaConnectFullResyncResponse;
import com.arcadedb.server.ha.message.ReplicaConnectRequest;
import com.arcadedb.server.ha.message.ReplicaReadyRequest;
import com.arcadedb.server.ha.message.TxRequest;
import com.arcadedb.utility.FileUtils;
import com.arcadedb.utility.Pair;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;

public class Replica2LeaderNetworkExecutor
extends Thread {
    private final HAServer server;
    private String host;
    private int port;
    private String leaderServerName = "?";
    private String leaderServerHTTPAddress;
    private ChannelBinaryClient channel;
    private volatile boolean shutdown = false;
    private final Object channelOutputLock = new Object();
    private final Object channelInputLock = new Object();
    private long installDatabaseLastLogNumber = -1L;

    public Replica2LeaderNetworkExecutor(HAServer ha, String host, int port) {
        this.server = ha;
        this.host = host;
        this.port = port;
        this.connect();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        LogManager.instance().setContext(this.server.getServer().getServerName());
        Binary buffer = new Binary(8192);
        buffer.setAllocationChunkSize(1024);
        long lastReqId = -1L;
        while (!this.shutdown) {
            long reqId = -1L;
            try {
                byte[] requestBytes = this.receiveResponse();
                if (this.shutdown) break;
                Pair<ReplicationMessage, HACommand> request = this.server.getMessageFactory().deserializeCommand(buffer, requestBytes);
                if (request == null) {
                    LogManager.instance().log((Object)this, Level.SEVERE, "Error on receiving message NULL, reconnecting (threadId=%d)", (Object)Thread.currentThread().getId());
                    this.reconnect(null);
                    continue;
                }
                ReplicationMessage message = (ReplicationMessage)request.getFirst();
                lastReqId = reqId = message.messageNumber;
                if (reqId > -1L) {
                    LogManager.instance().log((Object)this, Level.FINE, "Received request %d from the Leader (threadId=%d)", (Object)reqId, (Object)Thread.currentThread().getId());
                } else {
                    LogManager.instance().log((Object)this, Level.FINE, "Received response %d from the Leader (threadId=%d)", (Object)reqId, (Object)Thread.currentThread().getId());
                }
                if (reqId > -1L) {
                    long lastMessage = this.server.getReplicationLogFile().getLastMessageNumber();
                    if (reqId <= lastMessage) {
                        LogManager.instance().log((Object)this, Level.FINE, "Message %d already applied on local server (last=%d). Skip this", (Object)reqId, (Object)lastMessage);
                        continue;
                    }
                    if (!this.server.getReplicationLogFile().checkMessageOrder(message)) {
                        this.closeChannel();
                        this.connect();
                        this.startup();
                        continue;
                    }
                }
                if (this.installDatabaseLastLogNumber > -1L && request.getSecond() instanceof TxRequest) {
                    ((TxRequest)request.getSecond()).installDatabaseLastLogNumber = this.installDatabaseLastLogNumber;
                }
                HACommand response = ((HACommand)request.getSecond()).execute(this.server, this.leaderServerName, reqId);
                if (reqId > -1L && !this.server.getReplicationLogFile().appendMessage(message)) {
                    this.closeChannel();
                    this.connect();
                    this.startup();
                    continue;
                }
                this.server.getServer().lifecycleEvent(ReplicationCallback.TYPE.REPLICA_MSG_RECEIVED, request);
                if (response != null) {
                    this.sendCommandToLeader(buffer, response, reqId);
                }
                reqId = -1L;
            }
            catch (SocketTimeoutException requestBytes) {
            }
            catch (Exception e) {
                LogManager.instance().log((Object)this, Level.INFO, "Exception during execution of request %d (shutdown=%s name=%s error=%s)", (Object)reqId, (Object)this.shutdown, (Object)this.getName(), (Object)e.toString());
                this.reconnect(e);
            }
        }
        LogManager.instance().log((Object)this, Level.INFO, "Replica message thread closed (shutdown=%s name=%s threadId=%d lastReqId=%d)", (Object)this.shutdown, (Object)this.getName(), (Object)Thread.currentThread().getId(), (Object)lastReqId);
    }

    public String getRemoteServerName() {
        return this.leaderServerName;
    }

    public String getRemoteAddress() {
        return this.host + ":" + this.port;
    }

    private void reconnect(Exception e) {
        if (Thread.currentThread().isInterrupted()) {
            this.shutdown();
        }
        if (!this.shutdown) {
            this.closeChannel();
            if (this.server.getLeader() != this) {
                LogManager.instance().log((Object)this, Level.SEVERE, "Removing connection to the previous Leader ('%s'). New Leader is: %s", (Object)this.getRemoteServerName(), (Object)this.server.getLeader().getRemoteServerName());
                this.close();
                return;
            }
            LogManager.instance().log((Object)this, Level.FINE, "Error on communication between current replica and the Leader ('%s'), reconnecting... (error=%s)", (Object)this.getRemoteServerName(), (Object)e);
            if (!this.shutdown) {
                try {
                    this.connect();
                    this.startup();
                }
                catch (Exception e1) {
                    LogManager.instance().log((Object)this, Level.SEVERE, "Error on re-connecting to the Leader ('%s') (error=%s)", (Object)this.getRemoteServerName(), (Object)e1);
                    HashSet<String> serverAddressListCopy = new HashSet<String>(Arrays.asList(this.server.getServerAddressList().split(",")));
                    for (int retry = 0; retry < 3 && !this.shutdown && !serverAddressListCopy.isEmpty(); ++retry) {
                        for (String serverAddress : serverAddressListCopy) {
                            try {
                                if (this.server.isCurrentServer(serverAddress)) continue;
                                String[] parts = HostUtil.parseHostAddress((String)serverAddress, (String)"2424");
                                this.host = parts[0];
                                this.port = Integer.parseInt(parts[1]);
                                this.connect();
                                this.startup();
                                return;
                            }
                            catch (Exception e2) {
                                LogManager.instance().log((Object)this, Level.SEVERE, "Error on re-connecting to the server '%s' (error=%s)", (Object)this.getRemoteAddress(), (Object)e2);
                            }
                        }
                        try {
                            Thread.sleep(2000L);
                        }
                        catch (InterruptedException interruptedException) {
                            Thread.currentThread().interrupt();
                            this.shutdown = true;
                            return;
                        }
                        serverAddressListCopy = new HashSet<String>(Arrays.asList(this.server.getServerAddressList().split(",")));
                    }
                    this.server.startElection(true);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendCommandToLeader(Binary buffer, HACommand response, long messageNumber) throws IOException {
        if (messageNumber > -1L) {
            LogManager.instance().log((Object)this, Level.FINE, "Sending message (response to %d) to the Leader '%s'...", (Object)messageNumber, (Object)response);
        } else {
            LogManager.instance().log((Object)this, Level.FINE, "Sending message (request %d) to the Leader '%s'...", (Object)messageNumber, (Object)response);
        }
        this.server.getMessageFactory().serializeCommand(response, buffer, messageNumber);
        Object object = this.channelOutputLock;
        synchronized (object) {
            ChannelBinaryClient c = this.channel;
            if (c == null) {
                throw new ReplicationException("Error on sending command back to the leader server '" + this.leaderServerName + "' (cause=socket closed)");
            }
            c.writeVarLengthBytes(buffer.getContent(), buffer.size());
            c.flush();
        }
    }

    public void close() {
        this.shutdown();
        this.closeChannel();
    }

    public void kill() {
        this.shutdown();
        this.interrupt();
        this.close();
        try {
            this.join();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void closeChannel() {
        ChannelBinaryClient c = this.channel;
        if (c != null) {
            c.close();
            this.channel = null;
        }
    }

    public String getRemoteHTTPAddress() {
        return this.leaderServerHTTPAddress;
    }

    @Override
    public String toString() {
        return this.leaderServerName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private byte[] receiveResponse() throws IOException {
        Object object = this.channelInputLock;
        synchronized (object) {
            return this.channel.readBytes();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void connect() {
        LogManager.instance().log((Object)this, Level.FINE, "Connecting to server %s:%d...", (Object)this.host, (Object)this.port);
        try {
            this.channel = this.server.createNetworkConnection(this.host, this.port, (short)0);
            this.channel.flush();
            Object object = this.channelInputLock;
            synchronized (object) {
                boolean connectionAccepted = this.channel.readBoolean();
                if (!connectionAccepted) {
                    byte reasonCode = this.channel.readByte();
                    String reason = this.channel.readString();
                    switch (reasonCode) {
                        case 0: {
                            String leaderServerName = this.channel.readString();
                            String leaderAddress = this.channel.readString();
                            LogManager.instance().log((Object)this, Level.INFO, "Cannot accept incoming connections: remote server is not a Leader, connecting to the current Leader '%s' (%s)", (Object)leaderServerName, (Object)leaderAddress);
                            this.closeChannel();
                            throw new ServerIsNotTheLeaderException("Remote server is not a Leader, connecting to the current Leader '" + leaderServerName + "' (" + leaderAddress + ")", leaderAddress);
                        }
                        case 1: {
                            LogManager.instance().log((Object)this, Level.INFO, "Cannot accept incoming connections: an election for the Leader server is in progress");
                            this.closeChannel();
                            throw new ReplicationException("An election for the Leader server is pending");
                        }
                        case 2: {
                            LogManager.instance().log((Object)this, Level.INFO, "Cannot accept incoming connections: remote server does not support protocol %d", (Object)0);
                            break;
                        }
                        case 3: {
                            LogManager.instance().log((Object)this, Level.INFO, "Cannot accept incoming connections: remote server joined a different cluster than '%s'", (Object)this.server.getClusterName());
                            break;
                        }
                        case 4: {
                            LogManager.instance().log((Object)this, Level.INFO, "Cannot accept incoming connections: remote server has the same name as the local server '%s'", (Object)this.server.getServerName());
                            break;
                        }
                        default: {
                            LogManager.instance().log((Object)this, Level.INFO, "Cannot accept incoming connections: unknown reason code '%s'", (Object)reasonCode);
                        }
                    }
                    this.closeChannel();
                    throw new ConnectionException(this.host + ":" + this.port, reason);
                }
                this.leaderServerName = this.channel.readString();
                long leaderElectedAtTurn = this.channel.readLong();
                this.leaderServerHTTPAddress = this.channel.readString();
                String memberList = this.channel.readString();
                this.server.lastElectionVote = new Pair((Object)leaderElectedAtTurn, (Object)this.leaderServerName);
                this.server.setServerAddresses(memberList);
            }
        }
        catch (Exception e) {
            LogManager.instance().log((Object)this, Level.FINE, "Error on connecting to the server %s:%d (cause=%s)", (Object)this.host, (Object)this.port, (Object)e.toString());
            throw new ConnectionException(this.host + ":" + this.port, (Throwable)e);
        }
    }

    public void startup() {
        LogManager.instance().log((Object)this, Level.INFO, "Server connected to the Leader server %s:%d, members=[%s]", (Object)this.host, (Object)this.port, (Object)this.server.getServerAddressList());
        this.setName(this.server.getServerName() + " replica2leader<-" + this.getRemoteServerName());
        LogManager.instance().log((Object)this, Level.INFO, "Server started as Replica in HA mode (cluster=%s leader=%s:%d)", (Object)this.server.getClusterName(), (Object)this.host, (Object)this.port);
        this.installDatabases();
    }

    private void installDatabases() {
        Binary buffer = new Binary(8192);
        buffer.setAllocationChunkSize(1024);
        long lastLogNumber = this.server.getReplicationLogFile().getLastMessageNumber();
        LogManager.instance().log((Object)this, Level.INFO, "Requesting install of databases up to log %d...", (Object)lastLogNumber);
        try {
            this.sendCommandToLeader(buffer, new ReplicaConnectRequest(lastLogNumber), -1L);
            HACommand response = this.receiveCommandFromLeaderDuringJoin(buffer);
            if (response instanceof ReplicaConnectFullResyncResponse) {
                ReplicaConnectFullResyncResponse fullSync = (ReplicaConnectFullResyncResponse)response;
                LogManager.instance().log((Object)this, Level.INFO, "Asking for a full resync...");
                this.server.getServer().lifecycleEvent(ReplicationCallback.TYPE.REPLICA_FULL_RESYNC, null);
                Set<String> databases = fullSync.getDatabases();
                for (String db : databases) {
                    this.requestInstallDatabase(buffer, db);
                }
            } else {
                LogManager.instance().log((Object)this, Level.INFO, "Receiving hot resync (from=%d)...", (Object)lastLogNumber);
                this.server.getServer().lifecycleEvent(ReplicationCallback.TYPE.REPLICA_HOT_RESYNC, null);
            }
            this.sendCommandToLeader(buffer, new ReplicaReadyRequest(), -1L);
        }
        catch (Exception e) {
            this.shutdown();
            LogManager.instance().log((Object)this, Level.SEVERE, "Error starting HA service (error=%s)", (Throwable)e, (Object)e.getMessage());
            throw new ServerException("Cannot start HA service", e);
        }
    }

    public void requestInstallDatabase(Binary buffer, String db) throws IOException {
        this.sendCommandToLeader(buffer, new DatabaseStructureRequest(db), -1L);
        DatabaseStructureResponse dbStructure = (DatabaseStructureResponse)this.receiveCommandFromLeaderDuringJoin(buffer);
        this.server.getReplicationLogFile().setLastMessageNumber(dbStructure.getCurrentLogNumber());
        ServerDatabase database = this.server.getServer().getOrCreateDatabase(db);
        try (FileWriter schemaFile = new FileWriter(database.getDatabasePath() + File.separator + "schema.json", DatabaseFactory.getDefaultCharset());){
            schemaFile.write(dbStructure.getSchemaJson());
        }
        long databaseSize = 0L;
        ArrayList<Map.Entry<Integer, String>> list = new ArrayList<Map.Entry<Integer, String>>(dbStructure.getFileNames().entrySet());
        for (int i = 0; i < list.size(); ++i) {
            Map.Entry f = (Map.Entry)list.get(i);
            try {
                databaseSize += this.installFile(buffer, db, (Integer)f.getKey(), (String)f.getValue(), 0, -1);
                continue;
            }
            catch (Exception e) {
                LogManager.instance().log((Object)this, Level.SEVERE, "Error on installing file '%s' (%s %d/%d files)", (Throwable)e, f.getKey(), (Object)FileUtils.getSizeAsString((long)databaseSize), (Object)i, (Object)list.size());
                database.getEmbedded().drop();
                throw new ReplicationException("Error on installing database '" + db + "'", e);
            }
        }
        this.sendCommandToLeader(buffer, new DatabaseStructureRequest(db), -1L);
        DatabaseStructureResponse lastStructure = (DatabaseStructureResponse)this.receiveCommandFromLeaderDuringJoin(buffer);
        this.installDatabaseLastLogNumber = lastStructure.getCurrentLogNumber();
        database.getSchema().getEmbedded().close();
        DatabaseContext.INSTANCE.init((DatabaseInternal)database);
        database.getSchema().getEmbedded().load(ComponentFile.MODE.READ_WRITE, true);
        LogManager.instance().log((Object)this, Level.INFO, "Database '%s' installed from the cluster (%s - %d files lastLogNumber=%d)", null, (Object)db, (Object)FileUtils.getSizeAsString((long)databaseSize), (Object)list.size(), (Object)this.installDatabaseLastLogNumber);
    }

    private long installFile(Binary buffer, String db, int fileId, String fileName, int pageFromInclusive, int pageToInclusive) throws IOException {
        int from = pageFromInclusive;
        LogManager.instance().log((Object)this, Level.FINE, "Installing file '%s'...", (Object)fileName);
        int pagesWritten = 0;
        long fileSize = 0L;
        while (true) {
            this.sendCommandToLeader(buffer, new FileContentRequest(db, fileId, from, pageToInclusive), -1L);
            FileContentResponse fileChunk = (FileContentResponse)this.receiveCommandFromLeaderDuringJoin(buffer);
            fileSize += (long)fileChunk.getPagesContent().size();
            fileChunk.execute(this.server, null, -1L);
            if (fileChunk.getPages() == 0) break;
            pagesWritten += fileChunk.getPages();
            if (fileChunk.isLast()) break;
            from += fileChunk.getPages();
        }
        LogManager.instance().log((Object)this, Level.FINE, "File '%s' installed (pagesWritten=%d size=%s)", (Object)fileName, (Object)pagesWritten, (Object)FileUtils.getSizeAsString((long)fileSize));
        return fileSize;
    }

    private HACommand receiveCommandFromLeaderDuringJoin(Binary buffer) throws IOException {
        byte[] response = this.receiveResponse();
        Pair<ReplicationMessage, HACommand> command = this.server.getMessageFactory().deserializeCommand(buffer, response);
        if (command == null) {
            throw new NetworkProtocolException("Error on reading response, message " + response[0] + " not valid");
        }
        return (HACommand)command.getSecond();
    }

    private void shutdown() {
        LogManager.instance().log((Object)this, Level.FINE, "Shutting down thread %s (id=%d)...", (Object)this.getName(), (Object)this.getId());
        this.shutdown = true;
    }
}

