/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.journal.raft;

import alluxio.ProcessUtils;
import alluxio.annotation.SuppressFBWarnings;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.AddQuorumServerRequest;
import alluxio.grpc.JournalQueryRequest;
import alluxio.master.journal.CatchupFuture;
import alluxio.master.journal.JournalUtils;
import alluxio.master.journal.Journaled;
import alluxio.master.journal.checkpoint.CheckpointInputStream;
import alluxio.master.journal.raft.BufferedJournalApplier;
import alluxio.master.journal.raft.RaftJournal;
import alluxio.master.journal.raft.RaftJournalSystem;
import alluxio.master.journal.raft.RaftJournalUtils;
import alluxio.master.journal.raft.SnapshotReplicationManager;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.proto.journal.Journal;
import alluxio.util.LogUtils;
import alluxio.util.StreamUtils;
import alluxio.util.logging.SamplingLogger;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.MD5FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class JournalStateMachine
extends BaseStateMachine {
    private static final Logger LOG = LoggerFactory.getLogger(JournalStateMachine.class);
    private static final Logger SAMPLING_LOG = new SamplingLogger(LOG, 600000L);
    private static final CompletableFuture<Message> EMPTY_FUTURE = CompletableFuture.completedFuture(Message.EMPTY);
    private final Map<String, RaftJournal> mJournals;
    private final RaftJournalSystem mJournalSystem;
    private final SnapshotReplicationManager mSnapshotManager;
    @GuardedBy(value="this")
    private boolean mIgnoreApplys = false;
    @GuardedBy(value="this")
    private boolean mClosed = false;
    private volatile long mLastAppliedCommitIndex = -1L;
    private volatile long mLastPrimaryStartSequenceNumber = 0L;
    private volatile long mNextSequenceNumberToRead = 0L;
    private volatile boolean mSnapshotting = false;
    private volatile boolean mIsLeader = false;
    private final ExecutorService mJournalPool;
    private volatile Runnable mInterruptCallback;
    private volatile long mLastSnapshotStartTime = 0L;
    private volatile long mSnapshotLastIndex = -1L;
    private BufferedJournalApplier mJournalApplier;
    private final SimpleStateMachineStorage mStorage = new SimpleStateMachineStorage();
    private RaftGroupId mRaftGroupId;
    private RaftServer mServer;
    private long mLastCheckPointTime = -1L;

    public JournalStateMachine(Map<String, RaftJournal> journals, RaftJournalSystem journalSystem, Integer maxConcurrencyPoolSize) {
        this.mJournalPool = new ForkJoinPool(maxConcurrencyPoolSize);
        LOG.info("Ihe max concurrency for notifyTermIndexUpdated is loading with max threads {}", (Object)maxConcurrencyPoolSize);
        this.mJournals = journals;
        this.mJournalApplier = new BufferedJournalApplier(journals, () -> journalSystem.getJournalSinks(null));
        this.resetState();
        LOG.info("Initialized new journal state machine");
        this.mJournalSystem = journalSystem;
        this.mSnapshotManager = new SnapshotReplicationManager(journalSystem, this.mStorage);
        MetricsSystem.registerGaugeIfAbsent((String)MetricKey.MASTER_EMBEDDED_JOURNAL_SNAPSHOT_LAST_INDEX.getName(), () -> this.mSnapshotLastIndex);
        MetricsSystem.registerGaugeIfAbsent((String)MetricKey.MASTER_JOURNAL_ENTRIES_SINCE_CHECKPOINT.getName(), () -> this.getLastAppliedTermIndex().getIndex() - this.mSnapshotLastIndex);
        MetricsSystem.registerGaugeIfAbsent((String)MetricKey.MASTER_JOURNAL_LAST_CHECKPOINT_TIME.getName(), () -> this.mLastCheckPointTime);
        MetricsSystem.registerGaugeIfAbsent((String)MetricKey.MASTER_JOURNAL_LAST_APPLIED_COMMIT_INDEX.getName(), () -> this.mLastAppliedCommitIndex);
        MetricsSystem.registerGaugeIfAbsent((String)MetricKey.MASTER_JOURNAL_CHECKPOINT_WARN.getName(), () -> this.getLastAppliedTermIndex().getIndex() - this.mSnapshotLastIndex > ServerConfiguration.getLong(PropertyKey.MASTER_JOURNAL_CHECKPOINT_PERIOD_ENTRIES) && System.currentTimeMillis() - this.mLastCheckPointTime > ServerConfiguration.getMs(PropertyKey.MASTER_WEB_JOURNAL_CHECKPOINT_WARNING_THRESHOLD_TIME));
    }

    public void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException {
        this.getLifeCycle().startAndTransition(() -> {
            super.initialize(server, groupId, raftStorage);
            this.mServer = server;
            this.mRaftGroupId = groupId;
            this.mStorage.init(raftStorage);
            this.loadSnapshot(this.mStorage.getLatestSnapshot());
        }, new Class[0]);
    }

    public void reinitialize() throws IOException {
        LOG.info("Reinitializing state machine.");
        this.mStorage.loadLatestSnapshot();
        this.loadSnapshot(this.mStorage.getLatestSnapshot());
        this.unpause();
    }

    private synchronized void loadSnapshot(SingleFileSnapshotInfo snapshot) throws IOException {
        if (snapshot == null) {
            LOG.info("No snapshot to load");
            return;
        }
        LOG.info("Loading Snapshot {}", (Object)snapshot);
        File snapshotFile = snapshot.getFile().getPath().toFile();
        if (!snapshotFile.exists()) {
            throw new FileNotFoundException(String.format("The snapshot file %s does not exist", snapshotFile.getPath()));
        }
        try {
            this.resetState();
            this.setLastAppliedTermIndex(snapshot.getTermIndex());
            this.install(snapshotFile);
            this.mSnapshotLastIndex = this.getLatestSnapshot() != null ? this.getLatestSnapshot().getIndex() : -1L;
        }
        catch (Exception e) {
            throw new IOException(String.format("Failed to load snapshot %s", snapshot), e);
        }
    }

    public long takeSnapshot() {
        if (this.mIsLeader) {
            try {
                Preconditions.checkState((boolean)this.mServer.getGroups().iterator().hasNext());
                RaftGroup group = (RaftGroup)this.mServer.getGroups().iterator().next();
                Preconditions.checkState((boolean)group.getGroupId().equals((Object)this.mRaftGroupId));
                if (group.getPeers().size() < 2) {
                    SAMPLING_LOG.warn("No follower to perform delegated snapshot. Please add more masters to the quorum or manually take snapshot using 'alluxio fsadmin journal checkpoint'");
                    return -1L;
                }
            }
            catch (IOException e) {
                SAMPLING_LOG.warn("Failed to get raft group info: {}", (Object)e.getMessage());
            }
            long index = this.mSnapshotManager.maybeCopySnapshotFromFollower();
            if (index != -1L) {
                this.mSnapshotLastIndex = index;
            }
            this.mLastCheckPointTime = System.currentTimeMillis();
            return index;
        }
        return this.takeLocalSnapshot();
    }

    public SnapshotInfo getLatestSnapshot() {
        return this.mStorage.getLatestSnapshot();
    }

    public StateMachineStorage getStateMachineStorage() {
        return this.mStorage;
    }

    public CompletableFuture<Message> query(Message request) {
        CompletableFuture<Message> future = new CompletableFuture<Message>();
        try {
            JournalQueryRequest queryRequest = JournalQueryRequest.parseFrom((ByteBuffer)request.getContent().asReadOnlyByteBuffer());
            LOG.debug("Received query request: {}", (Object)queryRequest);
            Message reply = this.mSnapshotManager.handleRequest(queryRequest);
            if (reply != null) {
                future.complete(reply);
                return future;
            }
            if (queryRequest.hasAddQuorumServerRequest()) {
                AddQuorumServerRequest addRequest = queryRequest.getAddQuorumServerRequest();
                return CompletableFuture.supplyAsync(() -> {
                    try {
                        this.mJournalSystem.addQuorumServer(addRequest.getServerAddress());
                    }
                    catch (IOException e) {
                        throw new CompletionException(e);
                    }
                    return Message.EMPTY;
                });
            }
        }
        catch (Exception e) {
            LOG.error("failed processing request {}", (Object)request, (Object)e);
            future.completeExceptionally(e);
            return future;
        }
        return super.query(request);
    }

    public void close() {
        this.mSnapshotManager.close();
        this.mClosed = true;
    }

    public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
        try {
            this.applyJournalEntryCommand(trx);
            RaftProtos.LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
            this.updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
            return EMPTY_FUTURE;
        }
        catch (Exception e) {
            return RaftJournalUtils.completeExceptionally(e);
        }
    }

    public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
        this.mIsLeader = false;
        this.mJournalSystem.notifyLeadershipStateChanged(false);
    }

    public void notifyTermIndexUpdated(long term, long index) {
        super.notifyTermIndexUpdated(term, index);
        CompletableFuture.runAsync(this.mJournalSystem::updateGroup, this.mJournalPool);
    }

    private long getNextIndex() {
        try {
            return this.mServer.getDivision(this.mRaftGroupId).getRaftLog().getNextIndex();
        }
        catch (IOException e) {
            throw new IllegalStateException("Cannot obtain raft log index", e);
        }
    }

    public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(RaftProtos.RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
        if (roleInfoProto.getRole() != RaftProtos.RaftPeerRole.FOLLOWER) {
            return RaftJournalUtils.completeExceptionally(new IllegalStateException(String.format("Server should be a follower when installing a snapshot from leader. Actual: %s", roleInfoProto.getRole())));
        }
        return this.mSnapshotManager.installSnapshotFromLeader().thenApply(snapshotIndex -> {
            long latestJournalIndex = this.getNextIndex() - 1L;
            if (latestJournalIndex >= snapshotIndex.getIndex()) {
                throw new IllegalArgumentException(String.format("Downloaded snapshot index %d is older than the latest entry index %d", snapshotIndex.getIndex(), latestJournalIndex));
            }
            this.mSnapshotLastIndex = snapshotIndex.getIndex();
            return snapshotIndex;
        });
    }

    public synchronized void pause() {
        LOG.info("Pausing raft state machine.");
        this.getLifeCycle().transition(LifeCycle.State.PAUSING);
        if (this.mInterruptCallback != null) {
            LOG.info("Invoking suspension interrupt callback.");
            this.mInterruptCallback.run();
            this.mInterruptCallback = null;
        }
        try {
            if (this.mJournalApplier.isSuspended()) {
                LOG.info("Resuming journal applier.");
                this.mJournalApplier.resume();
            }
        }
        catch (IOException e) {
            throw new IllegalStateException("State machine pause failed", e);
        }
        this.getLifeCycle().transition(LifeCycle.State.PAUSED);
        LOG.info("Raft state machine is paused.");
    }

    public synchronized void unpause() {
        LOG.info("Unpausing raft state machine.");
        if (this.mJournalApplier.isSuspended()) {
            LOG.warn("Journal should not be suspended while state machine is paused.");
        }
        this.getLifeCycle().startAndTransition(() -> {}, new Class[0]);
        LOG.info("Raft state machine is unpaused.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void applyJournalEntryCommand(TransactionContext commit) {
        Journal.JournalEntry entry;
        try {
            entry = Journal.JournalEntry.parseFrom((ByteBuffer)commit.getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
        }
        catch (Exception e) {
            ProcessUtils.fatalError(LOG, e, "Encountered invalid journal entry in commit: %s.", commit);
            System.exit(-1);
            throw new IllegalStateException(e);
        }
        try {
            this.applyEntry(entry);
        }
        catch (Throwable throwable) {
            Preconditions.checkState((commit.getLogEntry().getIndex() > this.mLastAppliedCommitIndex ? 1 : 0) != 0);
            this.mLastAppliedCommitIndex = commit.getLogEntry().getIndex();
            throw throwable;
        }
        Preconditions.checkState((commit.getLogEntry().getIndex() > this.mLastAppliedCommitIndex ? 1 : 0) != 0);
        this.mLastAppliedCommitIndex = commit.getLogEntry().getIndex();
    }

    private void applyEntry(Journal.JournalEntry entry) {
        Preconditions.checkState((entry.getAllFields().size() <= 2 || entry.getAllFields().size() == 3 && entry.hasSequenceNumber() ? 1 : 0) != 0, (String)"Raft journal entries should never set multiple fields in addition to sequence number, but found %s", (Object)entry);
        if (entry.getJournalEntriesCount() > 0) {
            for (Journal.JournalEntry e : entry.getJournalEntriesList()) {
                this.applyEntry(e);
            }
        } else if (entry.getSequenceNumber() < 0L) {
            this.mLastPrimaryStartSequenceNumber = entry.getSequenceNumber();
        } else if (!entry.toBuilder().clearSequenceNumber().build().equals((Object)Journal.JournalEntry.getDefaultInstance())) {
            this.applySingleEntry(entry);
        }
    }

    @SuppressFBWarnings(value={"VO_VOLATILE_INCREMENT"}, justification="All calls to applyJournalEntryCommand() are synchronized by ratis")
    private void applySingleEntry(Journal.JournalEntry entry) {
        if (this.mClosed) {
            return;
        }
        long newSN = entry.getSequenceNumber();
        if (newSN < this.mNextSequenceNumberToRead) {
            LOG.debug("Ignoring duplicate journal entry with SN {} when next SN is {}", (Object)newSN, (Object)this.mNextSequenceNumberToRead);
            return;
        }
        if (newSN > this.mNextSequenceNumberToRead) {
            ProcessUtils.fatalError(LOG, "Unexpected journal entry. The next expected SN is %s, but encountered an entry with SN %s. Full journal entry: %s", this.mNextSequenceNumberToRead, newSN, entry);
        }
        ++this.mNextSequenceNumberToRead;
        if (!this.mIgnoreApplys) {
            this.mJournalApplier.processJournalEntry(entry);
        }
    }

    /*
     * Loose catch block
     */
    public synchronized long takeLocalSnapshot() {
        if (this.mClosed) {
            SAMPLING_LOG.info("Skip taking snapshot because state machine is closed.");
            return -1L;
        }
        if (this.mServer.getLifeCycleState() != LifeCycle.State.RUNNING) {
            SAMPLING_LOG.info("Skip taking snapshot because raft server is not in running state: current state is {}.", (Object)this.mServer.getLifeCycleState());
            return -1L;
        }
        if (this.mJournalApplier.isSuspended()) {
            SAMPLING_LOG.info("Skip taking snapshot while journal application is suspended.");
            return -1L;
        }
        if (!this.mJournalSystem.isSnapshotAllowed()) {
            SAMPLING_LOG.info("Skip taking snapshot when it is not allowed by the journal system.");
            return -1L;
        }
        LOG.debug("Calling snapshot");
        Preconditions.checkState((!this.mSnapshotting ? 1 : 0) != 0, (Object)"Cannot call snapshot multiple times concurrently");
        this.mSnapshotting = true;
        try {
            File snapshotFile;
            File tempFile;
            TermIndex last;
            long snapshotId;
            Throwable throwable;
            Timer.Context ctx;
            block73: {
                long l;
                block74: {
                    block75: {
                        ctx = MetricsSystem.timer((String)MetricKey.MASTER_EMBEDDED_JOURNAL_SNAPSHOT_GENERATE_TIMER.getName()).time();
                        throwable = null;
                        this.mLastSnapshotStartTime = System.currentTimeMillis();
                        snapshotId = this.mNextSequenceNumberToRead - 1L;
                        last = this.getLastAppliedTermIndex();
                        try {
                            tempFile = RaftJournalUtils.createTempSnapshotFile(this.mStorage);
                        }
                        catch (IOException e) {
                            LogUtils.warnWithException((Logger)LOG, (String)"Failed to create temp snapshot file", (Object[])new Object[]{e});
                            long l2 = -1L;
                            if (ctx != null) {
                                if (throwable != null) {
                                    try {
                                        ctx.close();
                                    }
                                    catch (Throwable throwable2) {
                                        throwable.addSuppressed(throwable2);
                                    }
                                } else {
                                    ctx.close();
                                }
                            }
                            this.mSnapshotting = false;
                            return l2;
                        }
                        LOG.info("Taking a snapshot to file {}", (Object)tempFile);
                        snapshotFile = this.mStorage.getSnapshotFile(last.getTerm(), last.getIndex());
                        try (DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(tempFile));){
                            outputStream.writeLong(snapshotId);
                            JournalUtils.writeToCheckpoint(outputStream, this.getStateMachines());
                        }
                        catch (Exception e) {
                            tempFile.delete();
                            LogUtils.warnWithException((Logger)LOG, (String)"Failed to write snapshot {} to file {}", (Object[])new Object[]{snapshotId, tempFile, e});
                            long l3 = -1L;
                            if (ctx != null) {
                                if (throwable != null) {
                                    try {
                                        ctx.close();
                                    }
                                    catch (Throwable throwable3) {
                                        throwable.addSuppressed(throwable3);
                                    }
                                } else {
                                    ctx.close();
                                }
                            }
                            this.mSnapshotting = false;
                            return l3;
                        }
                        MD5Hash digest = MD5FileUtil.computeMd5ForFile((File)tempFile);
                        LOG.info("Saving digest for snapshot file {}", (Object)snapshotFile);
                        MD5FileUtil.saveMD5File((File)snapshotFile, (MD5Hash)digest);
                        LOG.info("Renaming a snapshot file {} to {}", (Object)tempFile, (Object)snapshotFile);
                        if (tempFile.renameTo(snapshotFile)) break block73;
                        tempFile.delete();
                        LOG.warn("Failed to rename snapshot from {} to {}", (Object)tempFile, (Object)snapshotFile);
                        l = -1L;
                        if (ctx == null) break block74;
                        if (throwable == null) break block75;
                        try {
                            ctx.close();
                        }
                        catch (Throwable throwable4) {
                            throwable.addSuppressed(throwable4);
                        }
                        break block74;
                    }
                    ctx.close();
                }
                return l;
            }
            try {
                try {
                    LOG.info("Completed snapshot up to SN {} in {}ms", (Object)snapshotId, (Object)(System.currentTimeMillis() - this.mLastSnapshotStartTime));
                }
                catch (Exception e) {
                    tempFile.delete();
                    LogUtils.warnWithException((Logger)LOG, (String)"Failed to complete snapshot: {} - {}", (Object[])new Object[]{snapshotId, snapshotFile, e});
                    long l = -1L;
                    if (ctx != null) {
                        if (throwable != null) {
                            try {
                                ctx.close();
                            }
                            catch (Throwable throwable5) {
                                throwable.addSuppressed(throwable5);
                            }
                        } else {
                            ctx.close();
                        }
                    }
                    this.mSnapshotting = false;
                    return l;
                }
                try {
                    this.mStorage.loadLatestSnapshot();
                }
                catch (Exception e) {
                    snapshotFile.delete();
                    LogUtils.warnWithException((Logger)LOG, (String)"Failed to refresh latest snapshot: {}", (Object[])new Object[]{snapshotId, e});
                    long l = -1L;
                    if (ctx != null) {
                        if (throwable != null) {
                            try {
                                ctx.close();
                            }
                            catch (Throwable throwable6) {
                                throwable.addSuppressed(throwable6);
                            }
                        } else {
                            ctx.close();
                        }
                    }
                    this.mSnapshotting = false;
                    return l;
                }
                this.mSnapshotLastIndex = last.getIndex();
                this.mLastCheckPointTime = System.currentTimeMillis();
                long l = last.getIndex();
                return l;
            }
            catch (Throwable throwable7) {
                throwable = throwable7;
                throw throwable7;
            }
            catch (Throwable throwable8) {
                throw throwable8;
            }
            finally {
                if (ctx != null) {
                    if (throwable != null) {
                        try {
                            ctx.close();
                        }
                        catch (Throwable throwable9) {
                            throwable.addSuppressed(throwable9);
                        }
                    } else {
                        ctx.close();
                    }
                }
            }
            {
                catch (Throwable throwable10) {
                    throw throwable10;
                }
            }
        }
        finally {
            this.mSnapshotting = false;
        }
    }

    private void install(File snapshotFile) {
        long snapshotId;
        block29: {
            if (this.mClosed) {
                return;
            }
            if (this.mIgnoreApplys) {
                LOG.warn("Unexpected request to install a snapshot on a read-only journal state machine");
                return;
            }
            snapshotId = 0L;
            try (Timer.Context ctx = MetricsSystem.timer((String)MetricKey.MASTER_EMBEDDED_JOURNAL_SNAPSHOT_REPLAY_TIMER.getName()).time();
                 DataInputStream stream = new DataInputStream(new FileInputStream(snapshotFile));){
                snapshotId = stream.readLong();
                JournalUtils.restoreFromCheckpoint(new CheckpointInputStream(stream), this.getStateMachines());
            }
            catch (Exception e) {
                JournalUtils.handleJournalReplayFailure(LOG, e, "Failed to install snapshot: %s", snapshotId);
                if (!ServerConfiguration.getBoolean(PropertyKey.MASTER_JOURNAL_TOLERATE_CORRUPTION)) break block29;
                return;
            }
        }
        if (snapshotId < this.mNextSequenceNumberToRead - 1L) {
            LOG.warn("Installed snapshot for SN {} but next SN to read is {}", (Object)snapshotId, (Object)this.mNextSequenceNumberToRead);
        }
        this.mNextSequenceNumberToRead = snapshotId + 1L;
        LOG.info("Successfully installed snapshot up to SN {}", (Object)snapshotId);
    }

    public synchronized void suspend(Runnable interruptCallback) throws IOException {
        LOG.info("Suspending raft state machine.");
        if (!this.getLifeCycleState().isRunning()) {
            throw new UnavailableException("Cannot suspend journal when state machine is paused.");
        }
        this.mJournalApplier.suspend();
        this.mInterruptCallback = interruptCallback;
        LOG.info("Raft state machine is suspended.");
    }

    public synchronized void resume() throws IOException {
        LOG.info("Resuming raft state machine");
        this.mInterruptCallback = null;
        if (this.mJournalApplier.isSuspended()) {
            this.mJournalApplier.resume();
            LOG.info("Raft state machine resumed");
        } else {
            LOG.info("Raft state machine is already resumed");
        }
    }

    public synchronized CatchupFuture catchup(long sequence) {
        return this.mJournalApplier.catchup(sequence);
    }

    private List<Journaled> getStateMachines() {
        return StreamUtils.map(RaftJournal::getStateMachine, this.mJournals.values());
    }

    private synchronized void resetState() {
        if (this.mClosed) {
            return;
        }
        if (this.mIgnoreApplys) {
            LOG.warn("Unexpected call to resetState() on a read-only journal state machine");
            return;
        }
        this.mJournalApplier.close();
        this.mJournalApplier = new BufferedJournalApplier(this.mJournals, () -> this.mJournalSystem.getJournalSinks(null));
        for (RaftJournal journal : this.mJournals.values()) {
            journal.getStateMachine().resetState();
        }
    }

    public synchronized long upgrade() {
        if (this.mJournalApplier.isSuspended()) {
            try {
                this.resume();
            }
            catch (IOException e) {
                ProcessUtils.fatalError(LOG, e, "State-machine failed to catch up after suspension.", new Object[0]);
            }
        }
        this.mIgnoreApplys = true;
        return this.mNextSequenceNumberToRead - 1L;
    }

    public long getLastAppliedSequenceNumber() {
        return this.mNextSequenceNumberToRead - 1L;
    }

    public long getLastPrimaryStartSequenceNumber() {
        return this.mLastPrimaryStartSequenceNumber;
    }

    public long getLastAppliedCommitIndex() {
        return this.mLastAppliedCommitIndex;
    }

    public boolean isSnapshotting() {
        return this.mSnapshotting;
    }

    public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId raftPeerId) {
        if (this.mRaftGroupId == groupMemberId.getGroupId()) {
            this.mIsLeader = groupMemberId.getPeerId() == raftPeerId;
            this.mJournalSystem.notifyLeadershipStateChanged(this.mIsLeader);
        } else {
            LOG.warn("Received notification for unrecognized group {}, current group is {}", (Object)groupMemberId.getGroupId(), (Object)this.mRaftGroupId);
        }
    }

    public SnapshotReplicationManager getSnapshotReplicationManager() {
        return this.mSnapshotManager;
    }

    public synchronized boolean isSuspended() {
        return this.mJournalApplier.isSuspended();
    }
}

