/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.journal.raft;

import alluxio.ProcessUtils;
import alluxio.master.journal.AbstractCatchupThread;
import alluxio.master.journal.CatchupFuture;
import alluxio.master.journal.JournalEntryAssociation;
import alluxio.master.journal.JournalUtils;
import alluxio.master.journal.Journaled;
import alluxio.master.journal.raft.RaftJournal;
import alluxio.master.journal.sink.JournalSink;
import alluxio.proto.journal.Journal;
import alluxio.resource.LockResource;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class BufferedJournalApplier {
    private static final Logger LOG = LoggerFactory.getLogger(BufferedJournalApplier.class);
    private static final int RESUME_LOCK_BUFFER_SIZE_WATERMARK = 100;
    private static final int RESUME_LOCK_TIME_LIMIT_MS = 30000;
    private final Map<String, RaftJournal> mJournals;
    private final Supplier<Set<JournalSink>> mJournalSinks;
    private long mLastAppliedSequence = -1L;
    @GuardedBy(value="mStateLock")
    private boolean mSuspended = false;
    @GuardedBy(value="mStateLock")
    private boolean mResumeInProgress = false;
    private Queue<Journal.JournalEntry> mSuspendBuffer = new ConcurrentLinkedQueue<Journal.JournalEntry>();
    private AbstractCatchupThread mCatchupThread;
    private ReentrantLock mStateLock = new ReentrantLock(true);

    public BufferedJournalApplier(Map<String, RaftJournal> journals, Supplier<Set<JournalSink>> journalSinks) {
        this.mJournals = journals;
        this.mJournalSinks = journalSinks;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processJournalEntry(Journal.JournalEntry journalEntry) {
        block16: {
            try (LockResource stateLock = new LockResource((Lock)this.mStateLock);){
                if (this.mSuspended) {
                    Queue<Journal.JournalEntry> queue = this.mSuspendBuffer;
                    synchronized (queue) {
                        this.mSuspendBuffer.offer(journalEntry);
                        this.mSuspendBuffer.notifyAll();
                        break block16;
                    }
                }
                this.applyToMaster(journalEntry);
            }
        }
    }

    public boolean isSuspended() {
        try (LockResource stateLock = new LockResource((Lock)this.mStateLock);){
            boolean bl = this.mSuspended;
            return bl;
        }
    }

    public void suspend() throws IOException {
        try (LockResource stateLock = new LockResource((Lock)this.mStateLock);){
            Preconditions.checkState((!this.mSuspended ? 1 : 0) != 0, (Object)"Already suspended");
            this.mSuspended = true;
            LOG.info("Suspended state machine at sequence: {}", (Object)this.mLastAppliedSequence);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resume() throws IOException {
        try (LockResource stateLock = new LockResource((Lock)this.mStateLock);){
            Preconditions.checkState((boolean)this.mSuspended, (Object)"Not suspended");
            Preconditions.checkState((!this.mResumeInProgress ? 1 : 0) != 0, (Object)"Resume in progress");
            this.mResumeInProgress = true;
            LOG.info("Resuming state machine from sequence: {}", (Object)this.mLastAppliedSequence);
        }
        this.cancelCatchup();
        try {
            long resumeStartTimeMs = System.currentTimeMillis();
            if (this.mSuspendBuffer.size() <= 100) {
                this.mStateLock.lock();
            }
            while (!this.mSuspendBuffer.isEmpty()) {
                this.applyToMaster(this.mSuspendBuffer.remove());
                boolean lockSubmission = !this.mStateLock.isHeldByCurrentThread() && (this.mSuspendBuffer.size() <= 100 || System.currentTimeMillis() - resumeStartTimeMs > 30000L);
                if (!lockSubmission) continue;
                this.mStateLock.lock();
            }
        }
        finally {
            this.mSuspended = false;
            this.mResumeInProgress = false;
            this.mCatchupThread = null;
            this.mStateLock.unlock();
        }
    }

    private void cancelCatchup() {
        if (this.mCatchupThread != null && this.mCatchupThread.isAlive()) {
            this.mCatchupThread.cancel();
            this.mCatchupThread.waitTermination();
        }
    }

    public CatchupFuture catchup(long sequence) {
        try (LockResource stateLock = new LockResource((Lock)this.mStateLock);){
            Preconditions.checkState((boolean)this.mSuspended, (Object)"Not suspended");
            Preconditions.checkState((!this.mResumeInProgress ? 1 : 0) != 0, (Object)"Resume in progress");
            Preconditions.checkState((this.mCatchupThread == null || !this.mCatchupThread.isAlive() ? 1 : 0) != 0, (Object)"Catch-up task in progress.");
            Preconditions.checkState((sequence >= 0L ? 1 : 0) != 0, (String)"Invalid negative sequence: %d", (long)sequence);
            Preconditions.checkState((this.mLastAppliedSequence <= sequence ? 1 : 0) != 0, (String)"Can't catchup to past. Current: %d, Requested: %d", (long)this.mLastAppliedSequence, (long)sequence);
            LOG.info("Catching up state machine to sequence: {}", (Object)sequence);
            if (this.mLastAppliedSequence == sequence) {
                CatchupFuture catchupFuture = CatchupFuture.completed();
                return catchupFuture;
            }
            this.mCatchupThread = new RaftJournalCatchupThread(sequence);
            this.mCatchupThread.start();
            CatchupFuture catchupFuture = new CatchupFuture(this.mCatchupThread);
            return catchupFuture;
        }
    }

    private void applyToMaster(Journal.JournalEntry entry) {
        String masterName;
        try {
            masterName = JournalEntryAssociation.getMasterForEntry(entry);
        }
        catch (Exception t) {
            ProcessUtils.fatalError(LOG, t, "Unrecognized journal entry: %s", entry);
            throw new IllegalStateException();
        }
        try {
            Journaled master = this.mJournals.get(masterName).getStateMachine();
            LOG.trace("Applying entry to master {}: {} ", (Object)masterName, (Object)entry);
            master.processJournalEntry(entry);
            JournalUtils.sinkAppend(this.mJournalSinks, entry);
        }
        catch (Exception t) {
            JournalUtils.handleJournalReplayFailure(LOG, t, "Failed to apply journal entry to master %s. Entry: %s", masterName, entry);
        }
        this.mLastAppliedSequence = entry.getSequenceNumber();
    }

    public void close() {
        try (LockResource stateLock = new LockResource((Lock)this.mStateLock);){
            this.cancelCatchup();
            this.mSuspendBuffer.clear();
        }
    }

    class RaftJournalCatchupThread
    extends AbstractCatchupThread {
        private long mCatchUpEndSequence;
        private boolean mStopCatchingUp = false;

        public RaftJournalCatchupThread(long sequence) {
            this.mCatchUpEndSequence = sequence;
            this.setName("raft-catchup-thread");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void cancel() {
            Queue queue = BufferedJournalApplier.this.mSuspendBuffer;
            synchronized (queue) {
                this.mStopCatchingUp = true;
                BufferedJournalApplier.this.mSuspendBuffer.notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void runCatchup() {
            while (!this.mStopCatchingUp && BufferedJournalApplier.this.mLastAppliedSequence < this.mCatchUpEndSequence) {
                Queue queue = BufferedJournalApplier.this.mSuspendBuffer;
                synchronized (queue) {
                    while (!this.mStopCatchingUp && BufferedJournalApplier.this.mSuspendBuffer.size() == 0) {
                        try {
                            BufferedJournalApplier.this.mSuspendBuffer.wait();
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            throw new RuntimeException("Interrupted while catching up.");
                        }
                    }
                    while (!BufferedJournalApplier.this.mSuspendBuffer.isEmpty() && BufferedJournalApplier.this.mLastAppliedSequence < this.mCatchUpEndSequence) {
                        BufferedJournalApplier.this.applyToMaster((Journal.JournalEntry)BufferedJournalApplier.this.mSuspendBuffer.remove());
                    }
                }
            }
        }
    }
}

