/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.journal.ufs;

import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.JournalClosedException;
import alluxio.exception.status.CancelledException;
import alluxio.exception.status.UnavailableException;
import alluxio.master.Master;
import alluxio.master.journal.AbstractCatchupThread;
import alluxio.master.journal.AsyncJournalWriter;
import alluxio.master.journal.CatchupFuture;
import alluxio.master.journal.Journal;
import alluxio.master.journal.JournalContext;
import alluxio.master.journal.JournalReader;
import alluxio.master.journal.JournalUtils;
import alluxio.master.journal.MasterJournalContext;
import alluxio.master.journal.sink.JournalSink;
import alluxio.master.journal.ufs.UfsJournalCheckpointThread;
import alluxio.master.journal.ufs.UfsJournalCheckpointWriter;
import alluxio.master.journal.ufs.UfsJournalLogWriter;
import alluxio.master.journal.ufs.UfsJournalReader;
import alluxio.master.journal.ufs.UfsJournalSnapshot;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.proto.journal.Journal;
import alluxio.resource.CloseableResource;
import alluxio.retry.ExponentialTimeBoundedRetry;
import alluxio.underfs.UfsStatus;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.UnderFileSystemConfiguration;
import alluxio.underfs.options.DeleteOptions;
import alluxio.util.URIUtils;
import alluxio.util.UnderFileSystemUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class UfsJournal
implements Journal {
    private static final Logger LOG = LoggerFactory.getLogger(UfsJournal.class);
    public static final long UNKNOWN_SEQUENCE_NUMBER = Long.MAX_VALUE;
    public static final String VERSION = "v1";
    private static final String LOG_DIRNAME = "logs";
    private static final String CHECKPOINT_DIRNAME = "checkpoints";
    private static final String TMP_DIRNAME = ".tmp";
    private final URI mLogDir;
    private final URI mCheckpointDir;
    private final URI mTmpDir;
    private final URI mLocation;
    private final Master mMaster;
    private final UnderFileSystem mUfs;
    private final long mQuietPeriodMs;
    private UfsJournalLogWriter mWriter;
    private volatile AsyncJournalWriter mAsyncWriter;
    private UfsJournalCheckpointThread mTailerThread;
    private volatile boolean mSuspended = false;
    private volatile long mSuspendSequence = -1L;
    private volatile AbstractCatchupThread mCatchupThread;
    private volatile boolean mStopCatchingUp = false;
    private long mLastCheckPointTime = -1L;
    private long mEntriesSinceLastCheckPoint = 0L;
    private AtomicReference<State> mState = new AtomicReference<State>(State.STANDBY);
    private final Supplier<Set<JournalSink>> mJournalSinks;

    public static UnderFileSystemConfiguration getJournalUfsConf() {
        Map<String, String> ufsConf = ServerConfiguration.getNestedProperties(PropertyKey.MASTER_JOURNAL_UFS_OPTION);
        return UnderFileSystemConfiguration.defaults((AlluxioConfiguration)ServerConfiguration.global()).createMountSpecificConf(ufsConf);
    }

    public UfsJournal(URI location, Master master, long quietPeriodMs, Supplier<Set<JournalSink>> journalSinks) {
        try (CloseableResource<UnderFileSystem> ufs = master.getMasterContext().getUfsManager().getJournal(location).acquireUfsResource();){
            this.mLocation = URIUtils.appendPathOrDie((URI)location, (String)VERSION);
            this.mMaster = master;
            this.mUfs = (UnderFileSystem)ufs.get();
            this.mQuietPeriodMs = quietPeriodMs;
            this.mLogDir = URIUtils.appendPathOrDie((URI)this.mLocation, (String)LOG_DIRNAME);
            this.mCheckpointDir = URIUtils.appendPathOrDie((URI)this.mLocation, (String)CHECKPOINT_DIRNAME);
            this.mTmpDir = URIUtils.appendPathOrDie((URI)this.mLocation, (String)TMP_DIRNAME);
            this.mJournalSinks = journalSinks;
            this.init();
        }
    }

    UfsJournal(URI location, Master master, UnderFileSystem ufs, long quietPeriodMs, Supplier<Set<JournalSink>> journalSinks) {
        this.mLocation = URIUtils.appendPathOrDie((URI)location, (String)VERSION);
        this.mMaster = master;
        this.mUfs = ufs;
        this.mQuietPeriodMs = quietPeriodMs;
        this.mLogDir = URIUtils.appendPathOrDie((URI)this.mLocation, (String)LOG_DIRNAME);
        this.mCheckpointDir = URIUtils.appendPathOrDie((URI)this.mLocation, (String)CHECKPOINT_DIRNAME);
        this.mTmpDir = URIUtils.appendPathOrDie((URI)this.mLocation, (String)TMP_DIRNAME);
        this.mJournalSinks = journalSinks;
        this.init();
    }

    protected void init() {
        this.mState.set(State.STANDBY);
        MetricsSystem.registerGaugeIfAbsent((String)(MetricKey.MASTER_JOURNAL_ENTRIES_SINCE_CHECKPOINT.getName() + "." + this.mMaster.getName()), this::getEntriesSinceLastCheckPoint);
        MetricsSystem.registerGaugeIfAbsent((String)(MetricKey.MASTER_JOURNAL_LAST_CHECKPOINT_TIME.getName() + "." + this.mMaster.getName()), this::getLastCheckPointTime);
    }

    @Override
    public URI getLocation() {
        return this.mLocation;
    }

    private synchronized long getEntriesSinceLastCheckPoint() {
        return this.mEntriesSinceLastCheckPoint;
    }

    private synchronized long getLastCheckPointTime() {
        return this.mLastCheckPointTime;
    }

    @VisibleForTesting
    synchronized void write(Journal.JournalEntry entry) throws IOException, JournalClosedException {
        this.writer().write(entry);
        ++this.mEntriesSinceLastCheckPoint;
    }

    @VisibleForTesting
    public synchronized void flush() throws IOException, JournalClosedException {
        this.writer().flush();
    }

    @Override
    public synchronized JournalContext createJournalContext() throws UnavailableException {
        if (this.mState.get() != State.PRIMARY) {
            throw new UnavailableException(this.mMaster.getName() + ": Not allowed to write to journal in state: " + (Object)((Object)this.mState.get()));
        }
        AsyncJournalWriter writer = this.mAsyncWriter;
        if (writer == null) {
            throw new UnavailableException(this.mMaster.getName() + ": Failed to write to journal: journal is shutdown.");
        }
        return new MasterJournalContext(writer);
    }

    private synchronized UfsJournalLogWriter writer() {
        Preconditions.checkState((this.mState.get() == State.PRIMARY ? 1 : 0) != 0, (Object)("Cannot write to the journal in state " + (Object)((Object)this.mState.get())));
        return this.mWriter;
    }

    public synchronized void start() throws IOException {
        this.mMaster.resetState();
        this.mTailerThread = new UfsJournalCheckpointThread(this.mMaster, this, this.mJournalSinks);
        this.mTailerThread.start();
    }

    public synchronized void gainPrimacy() throws IOException {
        Preconditions.checkState((this.mWriter == null ? 1 : 0) != 0, (Object)"writer must be null in standby mode");
        Preconditions.checkState((this.mSuspended || this.mTailerThread != null ? 1 : 0) != 0, (Object)"tailer thread must not be null in standby mode");
        if (this.mSuspended) {
            this.resume();
        }
        this.mTailerThread.awaitTermination(true);
        long nextSequenceNumber = this.mTailerThread.getNextSequenceNumber();
        this.mTailerThread = null;
        nextSequenceNumber = this.catchUp(nextSequenceNumber);
        this.mWriter = new UfsJournalLogWriter(this, nextSequenceNumber);
        this.mAsyncWriter = new AsyncJournalWriter(this.mWriter, this.mJournalSinks, this.mMaster.getName());
        this.mState.set(State.PRIMARY);
        LOG.info("{}: journal switched to primary mode. location: {}", (Object)this.mMaster.getName(), (Object)this.mLocation);
    }

    public synchronized void signalLosePrimacy() {
        Preconditions.checkState((this.mState.get() == State.PRIMARY ? 1 : 0) != 0, (Object)("unexpected journal state " + (Object)((Object)this.mState.get())));
        this.mState.set(State.STANDBY);
        LOG.info("{}: journal switched to standby mode, starting transition. location: {}", (Object)this.mMaster.getName(), (Object)this.mLocation);
    }

    public synchronized void awaitLosePrimacy() throws IOException {
        Preconditions.checkState((this.mState.get() == State.STANDBY ? 1 : 0) != 0, (Object)("Should already be set to STANDBY state. unexpected state: " + (Object)((Object)this.mState.get())));
        Preconditions.checkState((this.mWriter != null ? 1 : 0) != 0, (Object)"writer thread must not be null in primary mode");
        Preconditions.checkState((this.mTailerThread == null ? 1 : 0) != 0, (Object)"tailer thread must be null in primary mode");
        this.mAsyncWriter.close();
        this.mAsyncWriter = null;
        this.mWriter.close();
        this.mWriter = null;
        this.mMaster.resetState();
        this.mTailerThread = new UfsJournalCheckpointThread(this.mMaster, this, this.mJournalSinks);
        this.mTailerThread.start();
    }

    public synchronized void suspend() throws IOException {
        Preconditions.checkState((!this.mSuspended ? 1 : 0) != 0, (Object)"journal is already suspended");
        Preconditions.checkState((this.mState.get() == State.STANDBY ? 1 : 0) != 0, (Object)("unexpected state " + (Object)((Object)this.mState.get())));
        Preconditions.checkState((this.mSuspendSequence == -1L ? 1 : 0) != 0, (Object)"suspend sequence already set");
        this.mTailerThread.awaitTermination(false);
        this.mSuspendSequence = this.mTailerThread.getNextSequenceNumber() - 1L;
        this.mTailerThread = null;
        this.mSuspended = true;
    }

    public synchronized CatchupFuture catchup(long sequence) throws IOException {
        Preconditions.checkState((boolean)this.mSuspended, (Object)"journal is not suspended");
        Preconditions.checkState((this.mState.get() == State.STANDBY ? 1 : 0) != 0, (Object)("unexpected state " + (Object)((Object)this.mState.get())));
        Preconditions.checkState((this.mTailerThread == null ? 1 : 0) != 0, (Object)"tailer is not null");
        Preconditions.checkState((sequence >= this.mSuspendSequence ? 1 : 0) != 0, (Object)"can't catch-up before suspend");
        Preconditions.checkState((this.mCatchupThread == null || !this.mCatchupThread.isAlive() ? 1 : 0) != 0, (Object)"Catch-up thread active");
        if (sequence == this.mSuspendSequence) {
            return CatchupFuture.completed();
        }
        this.mCatchupThread = new UfsJournalCatchupThread(this.mSuspendSequence + 1L, sequence);
        this.mCatchupThread.start();
        return new CatchupFuture(this.mCatchupThread);
    }

    public synchronized void resume() throws IOException {
        Preconditions.checkState((boolean)this.mSuspended, (Object)"journal is not suspended");
        Preconditions.checkState((this.mState.get() == State.STANDBY ? 1 : 0) != 0, (Object)("unexpected state " + (Object)((Object)this.mState.get())));
        Preconditions.checkState((this.mTailerThread == null ? 1 : 0) != 0, (Object)"tailer is not null");
        if (this.mCatchupThread != null && this.mCatchupThread.isAlive()) {
            this.mCatchupThread.cancel();
            this.mCatchupThread.waitTermination();
            this.mStopCatchingUp = false;
        }
        this.mTailerThread = new UfsJournalCheckpointThread(this.mMaster, this, this.mSuspendSequence + 1L, this.mJournalSinks);
        this.mTailerThread.start();
        this.mSuspendSequence = -1L;
        this.mSuspended = false;
    }

    public long getQuietPeriodMs() {
        return this.mQuietPeriodMs;
    }

    public UfsJournalReader getReader(boolean readIncompleteLogs) {
        return new UfsJournalReader(this, readIncompleteLogs);
    }

    public UfsJournalCheckpointWriter getCheckpointWriter(long checkpointSequenceNumber) throws IOException {
        return UfsJournalCheckpointWriter.create(this, checkpointSequenceNumber);
    }

    public long getNextSequenceNumberToWrite() {
        return this.writer().getNextSequenceNumber();
    }

    public long getNextSequenceNumberToCheckpoint() throws IOException {
        return UfsJournalSnapshot.getNextLogSequenceNumberToCheckpoint(this);
    }

    public boolean isFormatted() throws IOException {
        UfsStatus[] files = this.mUfs.listStatus(this.mLocation.toString());
        if (files == null) {
            return false;
        }
        String formatFilePrefix = ServerConfiguration.get(PropertyKey.MASTER_FORMAT_FILE_PREFIX);
        for (UfsStatus file : files) {
            if (!file.getName().startsWith(formatFilePrefix)) continue;
            return true;
        }
        return false;
    }

    public boolean isWritable() {
        return this.mState.get() == State.PRIMARY;
    }

    public void format() throws IOException {
        URI location = this.getLocation();
        LOG.info("Formatting {}", (Object)location);
        if (this.mUfs.isDirectory(location.toString())) {
            for (UfsStatus status : this.mUfs.listStatus(location.toString())) {
                String childPath = URIUtils.appendPathOrDie((URI)location, (String)status.getName()).toString();
                if ((!status.isDirectory() || this.mUfs.deleteDirectory(childPath, DeleteOptions.defaults().setRecursive(true))) && (!status.isFile() || this.mUfs.deleteFile(childPath))) continue;
                throw new IOException(String.format("Failed to delete %s", childPath));
            }
        } else if (!this.mUfs.mkdirs(location.toString())) {
            throw new IOException(String.format("Failed to create %s", location));
        }
        UnderFileSystemUtils.touch((UnderFileSystem)this.mUfs, (String)URIUtils.appendPathOrDie((URI)location, (String)(ServerConfiguration.get(PropertyKey.MASTER_FORMAT_FILE_PREFIX) + System.currentTimeMillis())).toString());
    }

    public synchronized void checkpoint() throws IOException {
        long nextSequenceNumber = this.getNextSequenceNumberToWrite();
        if (nextSequenceNumber == this.getNextSequenceNumberToCheckpoint()) {
            LOG.info("{}: No entries have been written since the last checkpoint.", (Object)this.mMaster.getName());
            return;
        }
        try (UfsJournalCheckpointWriter journalWriter = this.getCheckpointWriter(nextSequenceNumber);){
            LOG.info("{}: Writing checkpoint [sequence number {}].", (Object)this.mMaster.getName(), (Object)nextSequenceNumber);
            this.mMaster.writeToCheckpoint(journalWriter);
            LOG.info("{}: Finished checkpoint [sequence number {}].", (Object)this.mMaster.getName(), (Object)nextSequenceNumber);
            this.mEntriesSinceLastCheckPoint = 0L;
            this.mLastCheckPointTime = System.currentTimeMillis();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new CancelledException(this.mMaster.getName() + ": Checkpoint is interrupted");
        }
    }

    public synchronized UfsJournalCheckpointThread.CatchupState getCatchupState() {
        if (this.mTailerThread == null) {
            return UfsJournalCheckpointThread.CatchupState.NOT_STARTED;
        }
        return this.mTailerThread.getCatchupState();
    }

    @VisibleForTesting
    public URI getLogDir() {
        return this.mLogDir;
    }

    URI getCheckpointDir() {
        return this.mCheckpointDir;
    }

    URI getTmpDir() {
        return this.mTmpDir;
    }

    UnderFileSystem getUfs() {
        return this.mUfs;
    }

    private synchronized long catchUp(long nextSequenceNumber) {
        return this.catchUp(nextSequenceNumber, -1L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long catchUp(long nextSequenceNumber, long endSequenceNumber) {
        UfsJournalReader journalReader = new UfsJournalReader(this, nextSequenceNumber, true);
        try {
            long l = this.catchUp(journalReader, endSequenceNumber);
            return l;
        }
        finally {
            try {
                journalReader.close();
            }
            catch (IOException e) {
                LOG.warn("Failed to close journal reader: {}", (Object)e.toString());
            }
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private long catchUp(JournalReader journalReader, long limit) {
        ExponentialTimeBoundedRetry retry = ExponentialTimeBoundedRetry.builder().withInitialSleep(Duration.ofSeconds(1L)).withMaxSleep(Duration.ofSeconds(10L)).withMaxDuration(Duration.ofDays(365L)).build();
        while (limit == -1L || journalReader.getNextSequenceNumber() <= limit) {
            if (this.mStopCatchingUp) {
                return journalReader.getNextSequenceNumber();
            }
            try {
                switch (journalReader.advance()) {
                    case CHECKPOINT: {
                        this.mMaster.restoreFromCheckpoint(journalReader.getCheckpoint());
                        break;
                    }
                    case LOG: {
                        Journal.JournalEntry entry = journalReader.getEntry();
                        try {
                            if (!this.mMaster.processJournalEntry(entry)) {
                                JournalUtils.handleJournalReplayFailure(LOG, null, "%s: Unrecognized journal entry: %s", this.mMaster.getName(), entry);
                                break;
                            }
                            JournalUtils.sinkAppend(this.mJournalSinks, entry);
                        }
                        catch (Throwable t) {
                            JournalUtils.handleJournalReplayFailure(LOG, t, "%s: Failed to process journal entry %s", this.mMaster.getName(), entry);
                        }
                        break;
                    }
                    default: {
                        return journalReader.getNextSequenceNumber();
                    }
                }
                continue;
            }
            catch (IOException e) {
                LOG.warn("{}: Failed to read from journal: {}", (Object)this.mMaster.getName(), (Object)e);
                if (!retry.attempt()) throw new RuntimeException(String.format("%s: failed to catch up journal", this.mMaster.getName()), e);
                continue;
            }
            break;
        }
        return journalReader.getNextSequenceNumber();
    }

    public String toString() {
        return "UfsJournal(" + this.mLocation + ")";
    }

    @Override
    public synchronized void close() throws IOException {
        if (this.mAsyncWriter != null) {
            this.mAsyncWriter.close();
            this.mAsyncWriter = null;
        }
        if (this.mWriter != null) {
            this.mWriter.close();
            this.mWriter = null;
        }
        if (this.mTailerThread != null) {
            this.mTailerThread.awaitTermination(false);
            this.mTailerThread = null;
        }
        this.mState.set(State.CLOSED);
    }

    class UfsJournalCatchupThread
    extends AbstractCatchupThread {
        private long mCatchUpStartSequence;
        private long mCatchUpEndSequence;

        public UfsJournalCatchupThread(long start, long end) {
            this.mCatchUpStartSequence = start;
            this.mCatchUpEndSequence = end;
            this.setName(String.format("ufs-catchup-thread-%s", UfsJournal.this.mMaster.getName()));
        }

        @Override
        public void cancel() {
            UfsJournal.this.mStopCatchingUp = true;
        }

        @Override
        protected void runCatchup() {
            UfsJournal.this.mSuspendSequence = UfsJournal.this.catchUp(this.mCatchUpStartSequence, this.mCatchUpEndSequence) - 1L;
        }
    }

    private static enum State {
        STANDBY,
        PRIMARY,
        CLOSED;

    }
}

