/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.journal.raft;

import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.status.CancelledException;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.AddQuorumServerRequest;
import alluxio.grpc.GrpcService;
import alluxio.grpc.JournalQueryRequest;
import alluxio.grpc.NetAddress;
import alluxio.grpc.QuorumServerInfo;
import alluxio.grpc.QuorumServerState;
import alluxio.grpc.ServiceType;
import alluxio.grpc.TransferLeaderMessage;
import alluxio.master.Master;
import alluxio.master.PrimarySelector;
import alluxio.master.journal.AbstractJournalSystem;
import alluxio.master.journal.AsyncJournalWriter;
import alluxio.master.journal.CatchupFuture;
import alluxio.master.journal.Journal;
import alluxio.master.journal.raft.JournalEntryCommand;
import alluxio.master.journal.raft.JournalStateMachine;
import alluxio.master.journal.raft.RaftJournal;
import alluxio.master.journal.raft.RaftJournalAppender;
import alluxio.master.journal.raft.RaftJournalConfiguration;
import alluxio.master.journal.raft.RaftJournalProgressLogger;
import alluxio.master.journal.raft.RaftJournalServiceHandler;
import alluxio.master.journal.raft.RaftJournalUtils;
import alluxio.master.journal.raft.RaftJournalWriter;
import alluxio.master.journal.raft.RaftPrimarySelector;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.metrics.sink.RatisDropwizardExports;
import alluxio.proto.journal.Journal;
import alluxio.util.CommonUtils;
import alluxio.util.LogUtils;
import alluxio.util.WaitForOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
import io.grpc.BindableService;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.io.FileUtils;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.GroupInfoRequest;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.retry.ExponentialBackoffRetry;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class RaftJournalSystem
extends AbstractJournalSystem {
    public static final UUID RAFT_GROUP_UUID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1");
    public static final RaftGroupId RAFT_GROUP_ID = RaftGroupId.valueOf((UUID)RAFT_GROUP_UUID);
    private static final Logger LOG = LoggerFactory.getLogger(RaftJournalSystem.class);
    private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
    private static final long SINGLE_MASTER_ELECTION_TIMEOUT_MS = 500L;
    private static final String WAITING_FOR_ELECTION = "WAITING_FOR_ELECTION";
    private final RaftJournalConfiguration mConf;
    private final AtomicBoolean mSnapshotAllowed;
    private final AtomicBoolean mTransferLeaderAllowed;
    private final Map<String, RatisDropwizardExports> mRatisMetricsMap = new ConcurrentHashMap<String, RatisDropwizardExports>();
    private final RaftPrimarySelector mPrimarySelector;
    private final ConcurrentHashMap<String, RaftJournal> mJournals;
    private JournalStateMachine mStateMachine;
    private RaftServer mServer;
    private RaftJournalWriter mRaftJournalWriter;
    private final AtomicReference<AsyncJournalWriter> mAsyncJournalWriter;
    private final ClientId mClientId = ClientId.randomId();
    private final ClientId mRawClientId = ClientId.randomId();
    private RaftGroup mRaftGroup;
    private RaftPeerId mPeerId;
    private Map<String, TransferLeaderMessage> mErrorMessages;
    private static final long JOURNAL_STAT_LOG_MAX_INTERVAL_MS = 30000L;

    static long nextCallId() {
        return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
    }

    private RaftJournalSystem(RaftJournalConfiguration conf) {
        this.mConf = this.processRaftConfiguration(conf);
        this.mJournals = new ConcurrentHashMap();
        this.mSnapshotAllowed = new AtomicBoolean(true);
        this.mTransferLeaderAllowed = new AtomicBoolean(false);
        this.mPrimarySelector = new RaftPrimarySelector();
        this.mAsyncJournalWriter = new AtomicReference();
        this.mErrorMessages = new ConcurrentHashMap<String, TransferLeaderMessage>();
    }

    private void maybeMigrateOldJournal() {
        File oldJournalPath = new File(this.mConf.getPath(), RAFT_GROUP_UUID.toString());
        File newJournalBasePath = RaftJournalUtils.getRaftJournalDir(this.mConf.getPath());
        File newJournalPath = new File(newJournalBasePath, RAFT_GROUP_UUID.toString());
        if (oldJournalPath.isDirectory() && !newJournalBasePath.exists()) {
            LOG.info("Old journal detected at {} . moving journal to {}", (Object)oldJournalPath, (Object)newJournalPath);
            if (!newJournalBasePath.mkdirs()) {
                LOG.warn("Cannot create journal directory {}", (Object)newJournalBasePath);
            }
            if (!oldJournalPath.renameTo(newJournalPath)) {
                LOG.warn("Failed to move journal from {} to {}", (Object)oldJournalPath, (Object)newJournalPath);
            }
        }
    }

    public static RaftJournalSystem create(RaftJournalConfiguration conf) {
        RaftJournalSystem system = new RaftJournalSystem(conf);
        return system;
    }

    private RaftJournalConfiguration processRaftConfiguration(RaftJournalConfiguration conf) {
        if (conf.getClusterAddresses().size() == 1 && !ServerConfiguration.isSetByUser(PropertyKey.MASTER_EMBEDDED_JOURNAL_MIN_ELECTION_TIMEOUT) && !ServerConfiguration.isSetByUser(PropertyKey.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT)) {
            LOG.debug("Overriding election timeout to {}ms for single master cluster.", (Object)500L);
            conf.setElectionMinTimeoutMs(500L);
            conf.setElectionMaxTimeoutMs(1000L);
        }
        conf.validate();
        return conf;
    }

    public synchronized RaftPeerId getLocalPeerId() {
        return this.mPeerId;
    }

    private synchronized void initServer() throws IOException {
        LOG.debug("Creating journal with max segment size {}", (Object)this.mConf.getMaxLogSize());
        if (this.mStateMachine != null) {
            this.mStateMachine.close();
        }
        this.mStateMachine = new JournalStateMachine(this.mJournals, this, this.mConf.getMaxConcurrencyPoolSize());
        RaftProperties properties = new RaftProperties();
        Parameters parameters = new Parameters();
        RaftConfigKeys.Rpc.setType((RaftProperties)properties, (RpcType)SupportedRpcType.GRPC);
        GrpcConfigKeys.Server.setPort((RaftProperties)properties, (int)this.mConf.getLocalAddress().getPort());
        this.maybeMigrateOldJournal();
        RaftServerConfigKeys.setStorageDir((RaftProperties)properties, Collections.singletonList(RaftJournalUtils.getRaftJournalDir(this.mConf.getPath())));
        RaftServerConfigKeys.Log.setSegmentSizeMax((RaftProperties)properties, (SizeInBytes)SizeInBytes.valueOf((long)this.mConf.getMaxLogSize()));
        RaftServerConfigKeys.Log.Appender.setBufferByteLimit((RaftProperties)properties, (SizeInBytes)SizeInBytes.valueOf((long)ServerConfiguration.global().getBytes(PropertyKey.MASTER_EMBEDDED_JOURNAL_ENTRY_SIZE_MAX)));
        RaftServerConfigKeys.Write.setByteLimit((RaftProperties)properties, (SizeInBytes)SizeInBytes.valueOf((long)ServerConfiguration.global().getBytes(PropertyKey.MASTER_EMBEDDED_JOURNAL_FLUSH_SIZE_MAX)));
        RaftServerConfigKeys.Log.setQueueByteLimit((RaftProperties)properties, (int)((int)ServerConfiguration.global().getBytes(PropertyKey.MASTER_EMBEDDED_JOURNAL_FLUSH_SIZE_MAX)));
        TimeDuration leaderElectionMinTimeout = TimeDuration.valueOf((long)this.mConf.getMinElectionTimeoutMs(), (TimeUnit)TimeUnit.MILLISECONDS);
        TimeDuration leaderElectionMaxTimeout = TimeDuration.valueOf((long)this.mConf.getMaxElectionTimeoutMs(), (TimeUnit)TimeUnit.MILLISECONDS);
        RaftServerConfigKeys.Rpc.setTimeoutMin((RaftProperties)properties, (TimeDuration)leaderElectionMinTimeout);
        RaftServerConfigKeys.Rpc.setTimeoutMax((RaftProperties)properties, (TimeDuration)leaderElectionMaxTimeout);
        RaftServerConfigKeys.Rpc.setRequestTimeout((RaftProperties)properties, (TimeDuration)TimeDuration.valueOf((long)ServerConfiguration.global().getMs(PropertyKey.MASTER_EMBEDDED_JOURNAL_TRANSPORT_REQUEST_TIMEOUT_MS), (TimeUnit)TimeUnit.MILLISECONDS));
        RaftServerConfigKeys.RetryCache.setExpiryTime((RaftProperties)properties, (TimeDuration)TimeDuration.valueOf((long)ServerConfiguration.getMs(PropertyKey.MASTER_EMBEDDED_JOURNAL_RETRY_CACHE_EXPIRY_TIME), (TimeUnit)TimeUnit.MILLISECONDS));
        RaftServerConfigKeys.Snapshot.setRetentionFileNum((RaftProperties)properties, (int)3);
        RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled((RaftProperties)properties, (boolean)true);
        long snapshotAutoTriggerThreshold = ServerConfiguration.global().getLong(PropertyKey.MASTER_JOURNAL_CHECKPOINT_PERIOD_ENTRIES);
        RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold((RaftProperties)properties, (long)snapshotAutoTriggerThreshold);
        RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled((RaftProperties)properties, (boolean)false);
        RaftServerConfigKeys.Rpc.setSlownessTimeout((RaftProperties)properties, (TimeDuration)TimeDuration.valueOf((long)Long.MAX_VALUE, (TimeUnit)TimeUnit.MILLISECONDS));
        RaftServerConfigKeys.LeaderElection.setLeaderStepDownWaitTime((RaftProperties)properties, (TimeDuration)TimeDuration.valueOf((long)Long.MAX_VALUE, (TimeUnit)TimeUnit.MILLISECONDS));
        long messageSize = ServerConfiguration.global().getBytes(PropertyKey.MASTER_EMBEDDED_JOURNAL_TRANSPORT_MAX_INBOUND_MESSAGE_SIZE);
        GrpcConfigKeys.setMessageSizeMax((RaftProperties)properties, (SizeInBytes)SizeInBytes.valueOf((long)messageSize));
        RatisDropwizardExports.registerRatisMetricReporters(this.mRatisMetricsMap);
        this.mServer = RaftServer.newBuilder().setServerId(this.mPeerId).setGroup(this.mRaftGroup).setStateMachine((StateMachine)this.mStateMachine).setProperties(properties).setParameters(parameters).build();
        super.registerMetrics();
        MetricsSystem.registerGaugeIfAbsent((String)MetricKey.CLUSTER_LEADER_INDEX.getName(), () -> this.getLeaderIndex());
        MetricsSystem.registerGaugeIfAbsent((String)MetricKey.MASTER_ROLE_ID.getName(), () -> this.getRoleId());
        MetricsSystem.registerGaugeIfAbsent((String)MetricKey.CLUSTER_LEADER_ID.getName(), () -> this.getLeaderId());
    }

    @VisibleForTesting
    public synchronized RaftGroup getCurrentGroup() {
        try {
            Iterator groupIter = this.mServer.getGroups().iterator();
            Preconditions.checkState((boolean)groupIter.hasNext(), (Object)"no group info found");
            RaftGroup group = (RaftGroup)groupIter.next();
            Preconditions.checkState((group.getGroupId() == RAFT_GROUP_ID ? 1 : 0) != 0, (Object)String.format("Invalid group id %s, expecting %s", group.getGroupId(), RAFT_GROUP_ID));
            return group;
        }
        catch (IOException | IllegalStateException e) {
            LogUtils.warnWithException((Logger)LOG, (String)"Failed to get raft group, falling back to initial group", (Object[])new Object[]{e});
            return this.mRaftGroup;
        }
    }

    private RaftClient createClient() {
        RaftProperties properties = new RaftProperties();
        Parameters parameters = new Parameters();
        RaftClientConfigKeys.Rpc.setRequestTimeout((RaftProperties)properties, (TimeDuration)TimeDuration.valueOf((long)15L, (TimeUnit)TimeUnit.SECONDS));
        ExponentialBackoffRetry retryPolicy = ExponentialBackoffRetry.newBuilder().setBaseSleepTime(TimeDuration.valueOf((long)100L, (TimeUnit)TimeUnit.MILLISECONDS)).setMaxAttempts(10).setMaxSleepTime(TimeDuration.valueOf((long)this.mConf.getMaxElectionTimeoutMs(), (TimeUnit)TimeUnit.MILLISECONDS)).build();
        return RaftClient.newBuilder().setRaftGroup(this.mRaftGroup).setClientId(this.mClientId).setLeaderId(null).setProperties(properties).setParameters(parameters).setRetryPolicy((RetryPolicy)retryPolicy).build();
    }

    @Override
    public synchronized Journal createJournal(Master master) {
        RaftJournal journal = new RaftJournal(master, this.mConf.getPath().toURI(), this.mAsyncJournalWriter);
        this.mJournals.put(master.getName(), journal);
        return journal;
    }

    @Override
    public synchronized void gainPrimacy() {
        LOG.info("Gaining primacy.");
        this.mSnapshotAllowed.set(false);
        RaftJournalAppender client = new RaftJournalAppender(this.mServer, this::createClient, this.mRawClientId, ServerConfiguration.global());
        Runnable closeClient = () -> {
            try {
                client.close();
            }
            catch (IOException e) {
                LOG.warn("Failed to close raft client: {}", (Object)e.toString());
            }
        };
        try {
            this.catchUp(this.mStateMachine, client);
        }
        catch (TimeoutException e) {
            closeClient.run();
            throw new RuntimeException(e);
        }
        catch (InterruptedException e) {
            closeClient.run();
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        long nextSN = this.mStateMachine.upgrade() + 1L;
        Preconditions.checkState((this.mRaftJournalWriter == null ? 1 : 0) != 0);
        this.mRaftJournalWriter = new RaftJournalWriter(nextSN, client);
        this.mAsyncJournalWriter.set(new AsyncJournalWriter(this.mRaftJournalWriter, () -> this.getJournalSinks(null)));
        this.mTransferLeaderAllowed.set(true);
        LOG.info("Gained primacy.");
    }

    @Override
    public synchronized void losePrimacy() {
        LOG.info("Losing primacy.");
        if (this.mServer.getLifeCycleState() != LifeCycle.State.RUNNING) {
            return;
        }
        this.mTransferLeaderAllowed.set(false);
        try {
            this.mAsyncJournalWriter.get().close();
            this.mRaftJournalWriter.close();
        }
        catch (IOException e) {
            LOG.warn("Error closing journal writer: {}", (Object)e.toString());
        }
        finally {
            this.mAsyncJournalWriter.set(null);
            this.mRaftJournalWriter = null;
        }
        LOG.info("Shutting down Raft server");
        try {
            this.mServer.close();
        }
        catch (IOException e) {
            throw new IllegalStateException("Fatal error: failed to leave Raft cluster while stepping down", e);
        }
        LOG.info("Shut down Raft server");
        try {
            this.mSnapshotAllowed.set(true);
            this.initServer();
        }
        catch (IOException e) {
            throw new IllegalStateException(String.format("Fatal error: failed to init Raft cluster with addresses %s while stepping down", this.mConf.getClusterAddresses()), e);
        }
        LOG.info("Bootstrapping new Raft server");
        try {
            this.mServer.start();
        }
        catch (IOException e) {
            throw new IllegalStateException(String.format("Fatal error: failed to start Raft cluster with addresses %s while stepping down", this.mConf.getClusterAddresses()), e);
        }
        LOG.info("Raft server successfully restarted and lost primacy");
    }

    @Override
    public synchronized Map<String, Long> getCurrentSequenceNumbers() {
        Preconditions.checkState((this.mStateMachine != null ? 1 : 0) != 0, (Object)"State machine not initialized");
        long currentGlobalState = this.mStateMachine.getLastAppliedSequenceNumber();
        HashMap<String, Long> sequenceMap = new HashMap<String, Long>();
        for (String master : this.mJournals.keySet()) {
            sequenceMap.put(master, currentGlobalState);
        }
        return sequenceMap;
    }

    @Override
    public synchronized void suspend(Runnable interruptCallback) throws IOException {
        this.mSnapshotAllowed.set(false);
        this.mStateMachine.suspend(interruptCallback);
    }

    @Override
    public synchronized void resume() throws IOException {
        try {
            this.mStateMachine.resume();
        }
        finally {
            this.mSnapshotAllowed.set(true);
        }
    }

    public synchronized boolean isSuspended() {
        return this.mStateMachine.isSuspended();
    }

    @Override
    public synchronized CatchupFuture catchup(Map<String, Long> journalSequenceNumbers) {
        List distinctSequences = journalSequenceNumbers.values().stream().distinct().collect(Collectors.toList());
        Preconditions.checkState((distinctSequences.size() == 1 ? 1 : 0) != 0, (Object)"incorrect journal sequences");
        return this.mStateMachine.catchup((Long)distinctSequences.get(0));
    }

    static Message toRaftMessage(Journal.JournalEntry entry) {
        return Message.valueOf((ByteString)UnsafeByteOperations.unsafeWrap((byte[])new JournalEntryCommand(entry).getSerializedJournalEntry()));
    }

    @Override
    public synchronized void checkpoint() throws IOException {
        try (RaftJournalAppender client = new RaftJournalAppender(this.mServer, this::createClient, this.mRawClientId, ServerConfiguration.global());){
            this.mSnapshotAllowed.set(true);
            this.catchUp(this.mStateMachine, client);
            this.mStateMachine.takeLocalSnapshot();
        }
        catch (TimeoutException e) {
            LOG.warn("Timeout while performing snapshot: {}", (Object)e.toString());
            throw new IOException("Timeout while performing snapshot", e);
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted while performing snapshot: {}", (Object)e.toString());
            Thread.currentThread().interrupt();
            throw new CancelledException("Interrupted while performing snapshot", (Throwable)e);
        }
        finally {
            this.mSnapshotAllowed.set(false);
        }
    }

    @Override
    public synchronized Map<ServiceType, GrpcService> getJournalServices() {
        HashMap<ServiceType, GrpcService> services = new HashMap<ServiceType, GrpcService>();
        services.put(ServiceType.RAFT_JOURNAL_SERVICE, new GrpcService((BindableService)new RaftJournalServiceHandler(this.mStateMachine.getSnapshotReplicationManager())));
        return services;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void catchUp(JournalStateMachine stateMachine, RaftJournalAppender client) throws TimeoutException, InterruptedException {
        long startTime = System.currentTimeMillis();
        long waitBeforeRetry = ServerConfiguration.global().getMs(PropertyKey.MASTER_EMBEDDED_JOURNAL_CATCHUP_RETRY_WAIT);
        CommonUtils.waitFor((String)"snapshotting to finish", () -> !stateMachine.isSnapshotting(), (WaitForOptions)WaitForOptions.defaults().setTimeoutMs(600000));
        OptionalLong endCommitIndex = OptionalLong.empty();
        try {
            RaftJournalSystem raftJournalSystem = this;
            synchronized (raftJournalSystem) {
                RaftPeerId serverId = this.mServer.getId();
                Optional<RaftProtos.CommitInfoProto> commitInfo = this.getGroupInfo().getCommitInfos().stream().filter(commit -> serverId.equals((Object)RaftPeerId.valueOf((ByteString)commit.getServer().getId()))).findFirst();
                if (!commitInfo.isPresent()) {
                    throw new IOException("Commit info was not present. Couldn't find the current server's latest commit");
                }
                endCommitIndex = OptionalLong.of(commitInfo.get().getCommitIndex());
            }
        }
        catch (IOException e) {
            LogUtils.warnWithException((Logger)LOG, (String)"Failed to get raft log information before replay. Replay statistics will not be available", (Object[])new Object[]{e});
        }
        RaftJournalProgressLogger progressLogger = new RaftJournalProgressLogger(this.mStateMachine, endCommitIndex);
        while (true) {
            Exception ex;
            if (this.mPrimarySelector.getState() != PrimarySelector.State.PRIMARY) {
                return;
            }
            long lastAppliedSN = stateMachine.getLastAppliedSequenceNumber();
            long gainPrimacySN = ThreadLocalRandom.current().nextLong(Long.MIN_VALUE, 0L);
            LOG.info("Performing catchup. Last applied SN: {}. Catchup ID: {}", (Object)lastAppliedSN, (Object)gainPrimacySN);
            try {
                CompletableFuture<RaftClientReply> future = client.sendAsync(RaftJournalSystem.toRaftMessage(Journal.JournalEntry.newBuilder().setSequenceNumber(gainPrimacySN).build()), TimeDuration.valueOf((long)5L, (TimeUnit)TimeUnit.SECONDS));
                RaftClientReply reply = future.get(5L, TimeUnit.SECONDS);
                ex = reply.getException();
            }
            catch (IOException | ExecutionException | TimeoutException e) {
                ex = e;
            }
            if (ex != null) {
                if (ex instanceof LeaderNotReadyException) {
                    progressLogger.logProgress();
                } else {
                    LOG.info("Exception submitting term start entry: {}", (Object)ex.toString());
                }
                Thread.sleep(waitBeforeRetry);
                continue;
            }
            try {
                CommonUtils.waitFor((String)("term start entry " + gainPrimacySN + " to be applied to state machine"), () -> stateMachine.getLastPrimaryStartSequenceNumber() == gainPrimacySN, (WaitForOptions)WaitForOptions.defaults().setInterval(1000).setTimeoutMs(5000));
            }
            catch (TimeoutException e) {
                LOG.info(e.toString());
                continue;
            }
            try {
                CommonUtils.waitFor((String)("check primacySN " + gainPrimacySN + " and lastAppliedSN " + lastAppliedSN + " to be applied to leader"), () -> stateMachine.getLastAppliedSequenceNumber() == lastAppliedSN && stateMachine.getLastPrimaryStartSequenceNumber() == gainPrimacySN, (WaitForOptions)WaitForOptions.defaults().setInterval(1000).setTimeoutMs((int)this.mConf.getMaxElectionTimeoutMs()));
            }
            catch (TimeoutException e) {
                continue;
            }
            break;
        }
        LOG.info("Caught up in {}ms. Last sequence number from previous term: {}.", (Object)(System.currentTimeMillis() - startTime), (Object)stateMachine.getLastAppliedSequenceNumber());
    }

    @Override
    public synchronized void startInternal() throws InterruptedException, IOException {
        LOG.info("Initializing Raft Journal System");
        InetSocketAddress localAddress = this.mConf.getLocalAddress();
        this.mPeerId = RaftJournalUtils.getPeerId(localAddress);
        List<InetSocketAddress> addresses = this.mConf.getClusterAddresses();
        Set peers = addresses.stream().map(addr -> RaftPeer.newBuilder().setId(RaftJournalUtils.getPeerId(addr)).setAddress(addr).build()).collect(Collectors.toSet());
        this.mRaftGroup = RaftGroup.valueOf((RaftGroupId)RAFT_GROUP_ID, peers);
        this.initServer();
        super.registerMetrics();
        List<InetSocketAddress> clusterAddresses = this.mConf.getClusterAddresses();
        LOG.info("Starting Raft journal system. Cluster addresses: {}. Local address: {}", clusterAddresses, (Object)this.mConf.getLocalAddress());
        long startTime = System.currentTimeMillis();
        try {
            this.mServer.start();
        }
        catch (IOException e) {
            String errorMessage = ExceptionMessage.FAILED_RAFT_BOOTSTRAP.getMessage(new Object[]{Arrays.toString(clusterAddresses.toArray()), e.getCause() == null ? e : e.getCause().toString()});
            throw new IOException(errorMessage, e.getCause());
        }
        LOG.info("Started Raft Journal System in {}ms", (Object)(System.currentTimeMillis() - startTime));
        this.joinQuorum();
    }

    private void joinQuorum() {
        InetSocketAddress localAddress = this.mConf.getLocalAddress();
        AddQuorumServerRequest request = AddQuorumServerRequest.newBuilder().setServerAddress(NetAddress.newBuilder().setHost(localAddress.getHostString()).setRpcPort(localAddress.getPort())).build();
        RaftClient client = this.createClient();
        client.async().sendReadOnly(Message.valueOf((ByteString)UnsafeByteOperations.unsafeWrap((byte[])JournalQueryRequest.newBuilder().setAddQuorumServerRequest(request).build().toByteArray()))).whenComplete((reply, t) -> {
            if (t != null) {
                LogUtils.warnWithException((Logger)LOG, (String)"Exception occurred while joining quorum", (Object[])new Object[]{t});
            }
            if (reply != null && reply.getException() != null) {
                LogUtils.warnWithException((Logger)LOG, (String)"Received an error while joining quorum", (Object[])new Object[]{reply.getException()});
            }
            try {
                client.close();
            }
            catch (IOException e) {
                LogUtils.warnWithException((Logger)LOG, (String)"Exception occurred closing raft client", (Object[])new Object[]{e});
            }
        });
    }

    @Override
    public synchronized void stopInternal() throws InterruptedException, IOException {
        LOG.info("Shutting down raft journal");
        if (this.mRaftJournalWriter != null) {
            this.mRaftJournalWriter.close();
        }
        try {
            this.mServer.close();
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to shut down Raft server", e);
        }
        LOG.info("Journal shutdown complete");
    }

    public synchronized List<QuorumServerInfo> getQuorumServerInfoList() throws IOException {
        LinkedList<QuorumServerInfo> quorumMemberStateList = new LinkedList<QuorumServerInfo>();
        GroupInfoReply groupInfo = this.getGroupInfo();
        if (groupInfo == null) {
            throw new UnavailableException("Cannot get raft group info");
        }
        if (groupInfo.getException() != null) {
            throw groupInfo.getException();
        }
        RaftProtos.RoleInfoProto roleInfo = groupInfo.getRoleInfoProto();
        if (roleInfo == null) {
            throw new UnavailableException("Cannot get server role info");
        }
        RaftProtos.LeaderInfoProto leaderInfo = roleInfo.getLeaderInfo();
        if (leaderInfo == null) {
            throw new UnavailableException("Cannot get server leader info");
        }
        for (RaftProtos.ServerRpcProto member : leaderInfo.getFollowerInfoList()) {
            HostAndPort hp = HostAndPort.fromString((String)member.getId().getAddress());
            NetAddress memberAddress = NetAddress.newBuilder().setHost(hp.getHost()).setRpcPort(hp.getPort()).build();
            quorumMemberStateList.add(QuorumServerInfo.newBuilder().setIsLeader(false).setPriority(member.getId().getPriority()).setServerAddress(memberAddress).setServerState(member.getLastRpcElapsedTimeMs() > this.mConf.getMaxElectionTimeoutMs() ? QuorumServerState.UNAVAILABLE : QuorumServerState.AVAILABLE).build());
        }
        InetSocketAddress localAddress = this.mConf.getLocalAddress();
        NetAddress self = NetAddress.newBuilder().setHost(localAddress.getHostString()).setRpcPort(localAddress.getPort()).build();
        quorumMemberStateList.add(QuorumServerInfo.newBuilder().setIsLeader(true).setPriority(roleInfo.getSelf().getPriority()).setServerAddress(self).setServerState(QuorumServerState.AVAILABLE).build());
        quorumMemberStateList.sort(Comparator.comparing(info -> info.getServerAddress().toString()));
        return quorumMemberStateList;
    }

    public synchronized CompletableFuture<RaftClientReply> sendMessageAsync(RaftPeerId server, Message message) {
        RaftClient client = this.createClient();
        RaftClientRequest request = RaftClientRequest.newBuilder().setClientId(this.mRawClientId).setServerId(server).setGroupId(RAFT_GROUP_ID).setCallId(RaftJournalSystem.nextCallId()).setMessage(message).setType(RaftClientRequest.staleReadRequestType((long)0L)).setSlidingWindowEntry(null).build();
        return client.getClientRpc().sendRequestAsync(request).whenComplete((reply, t) -> {
            try {
                client.close();
            }
            catch (IOException e) {
                throw new CompletionException(e);
            }
        });
    }

    private GroupInfoReply getGroupInfo() throws IOException {
        GroupInfoRequest groupInfoRequest = new GroupInfoRequest(this.mRawClientId, this.getLocalPeerId(), RAFT_GROUP_ID, RaftJournalSystem.nextCallId());
        return this.getRaftServer().getGroupInfo(groupInfoRequest);
    }

    @VisibleForTesting
    public synchronized boolean isLeader() {
        return this.mServer != null && this.mServer.getLifeCycleState() == LifeCycle.State.RUNNING && this.mPrimarySelector.getState() == PrimarySelector.State.PRIMARY;
    }

    public synchronized void removeQuorumServer(NetAddress serverNetAddress) throws IOException {
        InetSocketAddress serverAddress = InetSocketAddress.createUnresolved(serverNetAddress.getHost(), serverNetAddress.getRpcPort());
        RaftPeerId peerId = RaftJournalUtils.getPeerId(serverAddress);
        try (RaftClient client = this.createClient();){
            Collection peers = ((RaftGroup)this.mServer.getGroups().iterator().next()).getPeers();
            RaftClientReply reply = client.admin().setConfiguration(peers.stream().filter(peer -> !peer.getId().equals((Object)peerId)).collect(Collectors.toList()));
            if (reply.getException() != null) {
                throw reply.getException();
            }
        }
    }

    public synchronized void resetPriorities() throws IOException {
        ArrayList<RaftPeer> resetPeers = new ArrayList<RaftPeer>();
        boolean NEUTRAL_PRIORITY = true;
        for (RaftPeer peer : this.mRaftGroup.getPeers()) {
            resetPeers.add(RaftPeer.newBuilder((RaftPeer)peer).setPriority(1).build());
        }
        LOG.info("Resetting RaftPeer priorities");
        try (RaftClient client = this.createClient();){
            RaftClientReply reply = client.admin().setConfiguration(resetPeers);
            this.processReply(reply, "failed to reset master priorities to 1");
        }
    }

    public synchronized String transferLeadership(NetAddress newLeaderNetAddress) {
        boolean allowed = this.mTransferLeaderAllowed.getAndSet(false);
        String transferId = UUID.randomUUID().toString();
        if (!allowed) {
            String msg = "transfer is not allowed at the moment because the master is " + (this.mRaftJournalWriter == null ? "still gaining primacy" : "already transferring the ") + "leadership";
            this.mErrorMessages.put(transferId, TransferLeaderMessage.newBuilder().setMsg(msg).build());
            return transferId;
        }
        try {
            InetSocketAddress serverAddress = InetSocketAddress.createUnresolved(newLeaderNetAddress.getHost(), newLeaderNetAddress.getRpcPort());
            ArrayList oldPeers = new ArrayList(this.mRaftGroup.getPeers());
            String strAddr = NetUtils.address2String((InetSocketAddress)serverAddress);
            if (oldPeers.stream().map(RaftPeer::getAddress).noneMatch(addr -> addr.equals(strAddr))) {
                throw new IOException(String.format("<%s> is not part of the quorum <%s>.", strAddr, oldPeers.stream().map(RaftPeer::getAddress).collect(Collectors.toList())));
            }
            if (strAddr.equals(this.mRaftGroup.getPeer(this.mPeerId).getAddress())) {
                throw new IOException(String.format("%s is already the leader", strAddr));
            }
            RaftPeerId newLeaderPeerId = RaftJournalUtils.getPeerId(serverAddress);
            ArrayList<RaftPeer> peersWithNewPriorities = new ArrayList<RaftPeer>();
            for (RaftPeer peer : oldPeers) {
                peersWithNewPriorities.add(RaftPeer.newBuilder((RaftPeer)peer).setPriority(peer.getId().equals((Object)newLeaderPeerId) ? 2 : 1).build());
            }
            try (RaftClient client = this.createClient();){
                String stringPeers = "[" + peersWithNewPriorities.stream().map(RaftPeer::toString).collect(Collectors.joining(", ")) + "]";
                LOG.info("Applying new peer state before transferring leadership: {}", (Object)stringPeers);
                RaftClientReply reply = client.admin().setConfiguration(peersWithNewPriorities);
                this.processReply(reply, "failed to set master priorities before initiating election");
                LOG.info("Transferring leadership to master with address <{}> and with RaftPeerId <{}>", (Object)serverAddress, (Object)newLeaderPeerId);
                int SLEEP_TIME_MS = 3000;
                int TRANSFER_LEADER_WAIT_MS = 30000;
                new Thread(() -> {
                    try {
                        Thread.sleep(3000L);
                        RaftClientReply reply1 = client.admin().transferLeadership(newLeaderPeerId, 30000L);
                        this.processReply(reply1, "election failed");
                    }
                    catch (Throwable t) {
                        LOG.error("caught an error when executing transfer: {}", (Object)t.getMessage());
                        this.mTransferLeaderAllowed.set(true);
                        this.mErrorMessages.put(transferId, TransferLeaderMessage.newBuilder().setMsg(t.getMessage()).build());
                    }
                }).start();
                LOG.info("Transferring leadership initiated");
            }
        }
        catch (Throwable t) {
            this.mTransferLeaderAllowed.set(true);
            LOG.warn(t.getMessage());
            this.mErrorMessages.put(transferId, TransferLeaderMessage.newBuilder().setMsg(t.getMessage()).build());
        }
        return transferId;
    }

    private void processReply(RaftClientReply reply, String msgToUser) throws IOException {
        if (!reply.isSuccess()) {
            Object ioe = reply.getException() != null ? reply.getException() : new IOException(String.format("reply <%s> failed", reply));
            LOG.error("{}. Error: {}", (Object)msgToUser, ioe);
            throw new IOException(msgToUser);
        }
    }

    public synchronized TransferLeaderMessage getTransferLeaderMessage(String transferId) {
        if (this.mErrorMessages.get(transferId) != null) {
            return this.mErrorMessages.get(transferId);
        }
        return TransferLeaderMessage.newBuilder().setMsg("").build();
    }

    public synchronized void addQuorumServer(NetAddress serverNetAddress) throws IOException {
        InetSocketAddress serverAddress = InetSocketAddress.createUnresolved(serverNetAddress.getHost(), serverNetAddress.getRpcPort());
        RaftPeerId peerId = RaftJournalUtils.getPeerId(serverAddress);
        Collection peers = ((RaftGroup)this.mServer.getGroups().iterator().next()).getPeers();
        if (peers.stream().anyMatch(peer -> peer.getId().equals((Object)peerId))) {
            return;
        }
        RaftPeer newPeer = RaftPeer.newBuilder().setId(peerId).setAddress(serverAddress).build();
        ArrayList<RaftPeer> newPeers = new ArrayList<RaftPeer>(peers);
        newPeers.add(newPeer);
        RaftClientReply reply = this.mServer.setConfiguration(new SetConfigurationRequest(this.mRawClientId, this.mPeerId, RAFT_GROUP_ID, RaftJournalSystem.nextCallId(), newPeers));
        if (reply.getException() != null) {
            throw reply.getException();
        }
    }

    @Override
    public synchronized boolean isEmpty() {
        return this.mRaftJournalWriter != null && this.mRaftJournalWriter.getNextSequenceNumberToWrite() == 0L;
    }

    @Override
    public boolean isFormatted() {
        return this.mConf.getPath().exists();
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void format() throws IOException {
        File journalPath = this.mConf.getPath();
        if (journalPath.isDirectory()) {
            if (!alluxio.util.io.FileUtils.isStorageDirAccessible((String)journalPath.getPath())) throw new AccessDeniedException(journalPath.getPath());
            FileUtils.cleanDirectory((File)journalPath);
            return;
        } else {
            if (journalPath.exists()) {
                FileUtils.forceDelete((File)journalPath);
            }
            if (journalPath.mkdirs()) return;
            throw new AccessDeniedException(journalPath.getPath());
        }
    }

    public PrimarySelector getPrimarySelector() {
        return this.mPrimarySelector;
    }

    public boolean isSnapshotAllowed() {
        return this.mSnapshotAllowed.get();
    }

    public void notifyLeadershipStateChanged(boolean isLeader) {
        this.mPrimarySelector.notifyStateChanged(isLeader ? PrimarySelector.State.PRIMARY : PrimarySelector.State.STANDBY);
    }

    @VisibleForTesting
    synchronized RaftServer getRaftServer() {
        return this.mServer;
    }

    public synchronized void updateGroup() {
        RaftGroup newGroup = this.getCurrentGroup();
        if (!newGroup.equals((Object)this.mRaftGroup)) {
            LOG.info("Raft group updated: old {}, new {}", (Object)this.mRaftGroup, (Object)newGroup);
            this.mRaftGroup = newGroup;
        }
    }

    @Nullable
    private RaftProtos.RoleInfoProto getRaftRoleInfo() {
        GroupInfoReply groupInfo = null;
        try {
            groupInfo = this.getGroupInfo();
        }
        catch (IOException e) {
            LOG.error("Error while getting RAFT group info", (Throwable)e);
        }
        if (groupInfo == null || groupInfo.getException() != null) {
            return null;
        }
        return groupInfo.getRoleInfoProto();
    }

    public int getRoleId() {
        RaftProtos.RoleInfoProto roleInfo = this.getRaftRoleInfo();
        if (roleInfo != null) {
            return roleInfo.getRoleValue();
        }
        return -1;
    }

    public String getLeaderId() {
        RaftProtos.RoleInfoProto roleInfo = this.getRaftRoleInfo();
        if (roleInfo == null) {
            return WAITING_FOR_ELECTION;
        }
        if (roleInfo.getRole() == RaftProtos.RaftPeerRole.LEADER) {
            return this.getLocalPeerId().toString();
        }
        RaftProtos.FollowerInfoProto followerInfo = roleInfo.getFollowerInfo();
        if (followerInfo == null) {
            return WAITING_FOR_ELECTION;
        }
        if (followerInfo.getLeaderInfo().getId() == null || followerInfo.getLeaderInfo().getId().getId() == null) {
            return WAITING_FOR_ELECTION;
        }
        return followerInfo.getLeaderInfo().getId().getId().toStringUtf8();
    }

    protected int getLeaderIndex() {
        String leaderId = this.getLeaderId();
        if (WAITING_FOR_ELECTION.equals(leaderId)) {
            return -1;
        }
        String leaderAddress = leaderId.replace('_', ':');
        int index = 0;
        for (InetSocketAddress address : this.mConf.getClusterAddresses()) {
            if (address.toString().equals(leaderAddress)) {
                return index;
            }
            ++index;
        }
        return -1;
    }
}

