/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.journal.ufs;

import alluxio.ProcessUtils;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.master.Master;
import alluxio.master.journal.JournalReader;
import alluxio.master.journal.JournalUtils;
import alluxio.master.journal.sink.JournalSink;
import alluxio.master.journal.ufs.UfsJournal;
import alluxio.master.journal.ufs.UfsJournalCheckpointWriter;
import alluxio.master.journal.ufs.UfsJournalProgressLogger;
import alluxio.master.journal.ufs.UfsJournalReader;
import alluxio.proto.journal.Journal;
import alluxio.retry.ExponentialBackoffRetry;
import alluxio.util.CommonUtils;
import alluxio.util.ExceptionUtils;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public final class UfsJournalCheckpointThread
extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(UfsJournalCheckpointThread.class);
    private final Master mMaster;
    private final UfsJournal mJournal;
    private final long mShutdownQuietWaitTimeMs;
    private final int mJournalCheckpointSleepTimeMs;
    private final long mCheckpointPeriodEntries;
    private final Object mCheckpointingLock = new Object();
    @GuardedBy(value="mCheckpointingLock")
    private boolean mCheckpointing = false;
    private volatile boolean mShutdownInitiated = false;
    private volatile boolean mStopped = false;
    private volatile boolean mWaitQuietPeriod = true;
    private JournalReader mJournalReader;
    private long mNextSequenceNumberToCheckpoint;
    private final Supplier<Set<JournalSink>> mJournalSinks;
    private volatile long mLastAppliedSN;
    private volatile CatchupState mCatchupState = CatchupState.NOT_STARTED;

    public UfsJournalCheckpointThread(Master master, UfsJournal journal, Supplier<Set<JournalSink>> journalSinks) {
        this(master, journal, 0L, journalSinks);
    }

    public UfsJournalCheckpointThread(Master master, UfsJournal journal, long startSequence, Supplier<Set<JournalSink>> journalSinks) {
        this.mMaster = (Master)Preconditions.checkNotNull((Object)master, (Object)"master");
        this.mJournal = (UfsJournal)Preconditions.checkNotNull((Object)journal, (Object)"journal");
        this.mShutdownQuietWaitTimeMs = journal.getQuietPeriodMs();
        this.mJournalCheckpointSleepTimeMs = (int)ServerConfiguration.getMs(PropertyKey.MASTER_JOURNAL_TAILER_SLEEP_TIME_MS);
        this.mJournalReader = new UfsJournalReader(this.mJournal, startSequence, false);
        this.mCheckpointPeriodEntries = ServerConfiguration.getLong(PropertyKey.MASTER_JOURNAL_CHECKPOINT_PERIOD_ENTRIES);
        this.mJournalSinks = journalSinks;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void awaitTermination(boolean waitQuietPeriod) {
        LOG.info("{}: Journal checkpointer shutdown has been initiated.", (Object)this.mMaster.getName());
        this.mWaitQuietPeriod = waitQuietPeriod;
        this.mShutdownInitiated = true;
        Object object = this.mCheckpointingLock;
        synchronized (object) {
            if (this.mCheckpointing) {
                this.interrupt();
            }
        }
        try {
            this.join();
            LOG.info("{}: Journal shutdown complete", (Object)this.mMaster.getName());
        }
        catch (InterruptedException e) {
            LOG.error("{}: journal checkpointer shutdown is interrupted.", (Object)this.mMaster.getName(), (Object)e);
            throw new RuntimeException(e);
        }
        this.mStopped = true;
    }

    public long getNextSequenceNumber() {
        Preconditions.checkState((boolean)this.mStopped);
        return this.mJournalReader.getNextSequenceNumber();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 30000, Integer.MAX_VALUE);
        long start = System.currentTimeMillis();
        OptionalLong finalSN = UfsJournalReader.getLastSN(this.mJournal);
        Thread t = new Thread(() -> {
            UfsJournalProgressLogger progressLogger = new UfsJournalProgressLogger(this.mJournal, finalSN, () -> this.mLastAppliedSN);
            while (!Thread.currentThread().isInterrupted() && retry.attempt()) {
                progressLogger.logProgress();
            }
        });
        try {
            t.start();
            this.runInternal();
        }
        catch (Throwable e) {
            t.interrupt();
            ProcessUtils.fatalError(LOG, e, "%s: Failed to run journal checkpoint thread, crashing.", this.mMaster.getName());
            System.exit(-1);
        }
        finally {
            t.interrupt();
            try {
                t.join();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.warn("interrupted while waiting for journal stats thread to shut down.");
            }
        }
    }

    private void runInternal() {
        LOG.info("{}: Journal checkpoint thread started.", (Object)this.mMaster.getName());
        boolean quietPeriodWaited = false;
        this.mCatchupState = CatchupState.IN_PROGRESS;
        while (true) {
            Journal.JournalEntry entry = null;
            try {
                switch (this.mJournalReader.advance()) {
                    case CHECKPOINT: {
                        LOG.debug("{}: Restoring from checkpoint", (Object)this.mMaster.getName());
                        this.mMaster.restoreFromCheckpoint(this.mJournalReader.getCheckpoint());
                        LOG.debug("{}: Finished restoring from checkpoint", (Object)this.mMaster.getName());
                        break;
                    }
                    case LOG: {
                        entry = this.mJournalReader.getEntry();
                        try {
                            if (!this.mMaster.processJournalEntry(entry)) {
                                JournalUtils.handleJournalReplayFailure(LOG, null, "%s: Unrecognized journal entry: %s", this.mMaster.getName(), entry);
                            } else {
                                JournalUtils.sinkAppend(this.mJournalSinks, entry);
                            }
                        }
                        catch (Throwable t) {
                            JournalUtils.handleJournalReplayFailure(LOG, t, "%s: Failed to read or process journal entry %s.", this.mMaster.getName(), entry);
                        }
                        if (quietPeriodWaited) {
                            LOG.info("Quiet period interrupted by new journal entry");
                            quietPeriodWaited = false;
                        }
                        this.mLastAppliedSN = entry.getSequenceNumber();
                        break;
                    }
                    default: {
                        this.mCatchupState = CatchupState.DONE;
                        break;
                    }
                }
            }
            catch (IOException e) {
                LOG.error("{}: Failed to read or process a journal entry.", (Object)this.mMaster.getName(), (Object)e);
                try {
                    this.mJournalReader.close();
                }
                catch (IOException ee) {
                    LOG.warn("{}: Failed to close the journal reader with error {}.", (Object)this.mMaster.getName(), (Object)ee.toString());
                }
                long nextSequenceNumber = this.mJournalReader.getNextSequenceNumber();
                this.mJournalReader = new UfsJournalReader(this.mJournal, nextSequenceNumber, false);
                quietPeriodWaited = false;
                continue;
            }
            if (entry == null) {
                this.maybeCheckpoint();
                if (this.mShutdownInitiated) {
                    if (quietPeriodWaited || !this.mWaitQuietPeriod) {
                        this.mCatchupState = CatchupState.DONE;
                        LOG.info("{}: Journal checkpoint thread has been shutdown. No new logs have been found during the quiet period.", (Object)this.mMaster.getName());
                        if (this.mJournalReader != null) {
                            try {
                                this.mJournalReader.close();
                            }
                            catch (IOException e) {
                                LOG.warn("{}: Failed to close the journal reader with error {}.", (Object)this.mMaster.getName(), (Object)e.toString());
                            }
                        }
                        return;
                    }
                    CommonUtils.sleepMs((Logger)LOG, (long)this.mShutdownQuietWaitTimeMs);
                    quietPeriodWaited = true;
                } else {
                    CommonUtils.sleepMs((Logger)LOG, (long)this.mJournalCheckpointSleepTimeMs);
                }
            }
            if (Thread.interrupted() && !this.mShutdownInitiated) break;
        }
        LOG.info("{}: Checkpoint thread interrupted, shutting down", (Object)this.mMaster.getName());
    }

    public CatchupState getCatchupState() {
        return this.mCatchupState;
    }

    private void maybeCheckpoint() {
        if (this.mShutdownInitiated) {
            return;
        }
        long nextSequenceNumber = this.mJournalReader.getNextSequenceNumber();
        if (nextSequenceNumber - this.mNextSequenceNumberToCheckpoint < this.mCheckpointPeriodEntries) {
            return;
        }
        try {
            this.mNextSequenceNumberToCheckpoint = this.mJournal.getNextSequenceNumberToCheckpoint();
        }
        catch (IOException e) {
            LOG.warn("{}: Failed to get the next sequence number to checkpoint with error {}.", (Object)this.mMaster.getName(), (Object)e.toString());
            return;
        }
        if (nextSequenceNumber - this.mNextSequenceNumberToCheckpoint < this.mCheckpointPeriodEntries) {
            return;
        }
        this.writeCheckpoint(nextSequenceNumber);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeCheckpoint(long nextSequenceNumber) {
        LOG.info("{}: Writing checkpoint [sequence number {}].", (Object)this.mMaster.getName(), (Object)nextSequenceNumber);
        try {
            UfsJournalCheckpointWriter journalWriter = this.mJournal.getCheckpointWriter(nextSequenceNumber);
            try {
                Object object = this.mCheckpointingLock;
                synchronized (object) {
                    block27: {
                        if (!this.mShutdownInitiated) break block27;
                        journalWriter.cancel();
                        return;
                    }
                    this.mCheckpointing = true;
                }
                this.mMaster.writeToCheckpoint(journalWriter);
            }
            catch (Throwable t) {
                if (ExceptionUtils.containsInterruptedException((Throwable)t)) {
                    Thread.currentThread().interrupt();
                } else {
                    LOG.error("{}: Failed to create checkpoint", (Object)this.mMaster.getName(), (Object)t);
                }
                journalWriter.cancel();
                LOG.info("{}: Cancelled checkpoint [sequence number {}].", (Object)this.mMaster.getName(), (Object)nextSequenceNumber);
                return;
            }
            finally {
                Object object = this.mCheckpointingLock;
                synchronized (object) {
                    this.mCheckpointing = false;
                }
                if (Thread.interrupted() && !this.mShutdownInitiated) {
                    LOG.warn("{}: Checkpoint was interrupted but shutdown has not be initiated", (Object)this.mMaster.getName());
                    Thread.currentThread().interrupt();
                }
                journalWriter.close();
            }
            LOG.info("{}: Finished checkpoint [sequence number {}].", (Object)this.mMaster.getName(), (Object)nextSequenceNumber);
            this.mNextSequenceNumberToCheckpoint = nextSequenceNumber;
        }
        catch (IOException e) {
            LOG.error("{}: Failed to checkpoint.", (Object)this.mMaster.getName(), (Object)e);
        }
    }

    public static enum CatchupState {
        NOT_STARTED,
        IN_PROGRESS,
        DONE;

    }
}

