/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.journal.ufs;

import alluxio.RuntimeConstants;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.JournalClosedException;
import alluxio.master.journal.JournalEntryStreamReader;
import alluxio.master.journal.JournalWriter;
import alluxio.master.journal.ufs.UfsJournal;
import alluxio.master.journal.ufs.UfsJournalFile;
import alluxio.master.journal.ufs.UfsJournalGarbageCollector;
import alluxio.master.journal.ufs.UfsJournalSnapshot;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.proto.journal.Journal;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.CreateOptions;
import alluxio.underfs.options.OpenOptions;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
final class UfsJournalLogWriter
implements JournalWriter {
    private static final Logger LOG = LoggerFactory.getLogger(UfsJournalLogWriter.class);
    private final UfsJournal mJournal;
    private final UnderFileSystem mUfs;
    private final long mMaxLogSize;
    private long mNextSequenceNumber;
    private boolean mRotateLogForNextWrite;
    private JournalOutputStream mJournalOutputStream;
    private UfsJournalGarbageCollector mGarbageCollector;
    private boolean mClosed;
    private boolean mNeedsRecovery = false;
    private Queue<Journal.JournalEntry> mEntriesToFlush;

    UfsJournalLogWriter(UfsJournal journal, long nextSequenceNumber) throws IOException {
        this.mJournal = (UfsJournal)Preconditions.checkNotNull((Object)journal, (Object)"journal");
        this.mUfs = this.mJournal.getUfs();
        this.mNextSequenceNumber = nextSequenceNumber;
        this.mMaxLogSize = ServerConfiguration.getBytes(PropertyKey.MASTER_JOURNAL_LOG_SIZE_BYTES_MAX);
        this.mRotateLogForNextWrite = true;
        UfsJournalFile currentLog = UfsJournalSnapshot.getCurrentLog(this.mJournal);
        if (currentLog != null) {
            this.mJournalOutputStream = new JournalOutputStream(currentLog, ByteStreams.nullOutputStream());
        }
        this.mGarbageCollector = new UfsJournalGarbageCollector(this.mJournal);
        this.mEntriesToFlush = new ArrayDeque<Journal.JournalEntry>();
    }

    @Override
    public synchronized void write(Journal.JournalEntry entry) throws IOException, JournalClosedException {
        this.checkIsWritable();
        try {
            this.maybeRecoverFromUfsFailures();
            this.maybeRotateLog();
        }
        catch (JournalClosedException.IOJournalClosedException e) {
            throw e.toJournalClosedException();
        }
        try {
            Journal.JournalEntry entryToWrite = entry.toBuilder().setSequenceNumber(this.mNextSequenceNumber).build();
            entryToWrite.writeDelimitedTo((OutputStream)this.mJournalOutputStream);
            LOG.debug("Adding journal entry (seq={}) to retryList with {} entries. currentLog: {}", new Object[]{entryToWrite.getSequenceNumber(), this.mEntriesToFlush.size(), this.currentLogName()});
            this.mEntriesToFlush.add(entryToWrite);
            ++this.mNextSequenceNumber;
        }
        catch (JournalClosedException.IOJournalClosedException e) {
            throw e.toJournalClosedException();
        }
        catch (IOException e) {
            this.mNeedsRecovery = true;
            throw new IOException(ExceptionMessage.JOURNAL_WRITE_FAILURE.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, new Object[]{this.mJournalOutputStream.currentLog(), e.getMessage()}), e);
        }
    }

    private void maybeRecoverFromUfsFailures() throws IOException, JournalClosedException {
        this.checkIsWritable();
        if (!this.mNeedsRecovery) {
            return;
        }
        try (Timer.Context ctx = MetricsSystem.timer((String)MetricKey.MASTER_UFS_JOURNAL_FAILURE_RECOVER_TIMER.getName()).time();){
            long lastPersistSeq = this.recoverLastPersistedJournalEntry();
            if (lastPersistSeq == -1L) {
                throw new RuntimeException("Cannot find any journal entry to recover. location: " + this.mJournal.getLocation());
            }
            this.createNewLogFile(lastPersistSeq + 1L);
            if (!this.mEntriesToFlush.isEmpty()) {
                Journal.JournalEntry firstEntryToFlush = this.mEntriesToFlush.peek();
                if (firstEntryToFlush.getSequenceNumber() > lastPersistSeq + 1L) {
                    throw new RuntimeException(ExceptionMessage.JOURNAL_ENTRY_MISSING.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, new Object[]{lastPersistSeq + 1L, firstEntryToFlush.getSequenceNumber()}));
                }
                long retryEndSeq = lastPersistSeq;
                LOG.info("Retry writing unwritten journal entries from seq {} to currentLog {}", (Object)(lastPersistSeq + 1L), (Object)this.currentLogName());
                for (Journal.JournalEntry entry : this.mEntriesToFlush) {
                    if (entry.getSequenceNumber() <= lastPersistSeq) continue;
                    try {
                        entry.toBuilder().build().writeDelimitedTo((OutputStream)this.mJournalOutputStream);
                        retryEndSeq = entry.getSequenceNumber();
                    }
                    catch (JournalClosedException.IOJournalClosedException e) {
                        throw e.toJournalClosedException();
                    }
                    catch (IOException e) {
                        throw new IOException(ExceptionMessage.JOURNAL_WRITE_FAILURE.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, new Object[]{this.mJournalOutputStream.currentLog(), e.getMessage()}), e);
                    }
                }
                LOG.info("Finished writing unwritten journal entries from {} to {}. currentLog: {}", new Object[]{lastPersistSeq + 1L, retryEndSeq, this.currentLogName()});
                if (retryEndSeq != this.mNextSequenceNumber - 1L) {
                    throw new RuntimeException("Failed to recover all entries to flush, expecting " + (this.mNextSequenceNumber - 1L) + " but only found entry " + retryEndSeq + " currentLog: " + this.currentLogName());
                }
            }
        }
        this.mNeedsRecovery = false;
    }

    private long recoverLastPersistedJournalEntry() throws IOException {
        List<UfsJournalFile> journalFiles;
        UfsJournalSnapshot snapshot = UfsJournalSnapshot.getSnapshot(this.mJournal);
        long lastPersistSeq = -1L;
        UfsJournalFile currentLog = UfsJournalSnapshot.getCurrentLog(this.mJournal);
        if (currentLog != null) {
            LOG.info("Recovering from previous UFS journal write failure. Scanning for the last persisted journal entry. currentLog: " + currentLog.toString());
            try (JournalEntryStreamReader reader = new JournalEntryStreamReader(this.mUfs.open(currentLog.getLocation().toString(), OpenOptions.defaults().setRecoverFailedOpen(true)));){
                Journal.JournalEntry entry;
                while ((entry = reader.readEntry()) != null) {
                    if (entry.getSequenceNumber() <= lastPersistSeq) continue;
                    lastPersistSeq = entry.getSequenceNumber();
                }
            }
            if (lastPersistSeq != -1L) {
                this.completeLog(currentLog, lastPersistSeq + 1L);
            }
        }
        if (lastPersistSeq < 0L && !(journalFiles = (snapshot = UfsJournalSnapshot.getSnapshot(this.mJournal)).getLogs()).isEmpty()) {
            for (int i = journalFiles.size() - 1; i >= 0; --i) {
                UfsJournalFile journal = journalFiles.get(i);
                if (journal.isIncompleteLog()) continue;
                lastPersistSeq = journal.getEnd() - 1L;
                LOG.info("Found last persisted journal entry with seq {} in {}.", (Object)lastPersistSeq, (Object)journal.getLocation().toString());
                break;
            }
        }
        return lastPersistSeq;
    }

    private void maybeRotateLog() throws IOException, JournalClosedException {
        this.checkIsWritable();
        if (!this.mRotateLogForNextWrite) {
            return;
        }
        if (this.mJournalOutputStream != null) {
            this.mJournalOutputStream.close();
            this.mJournalOutputStream = null;
        }
        this.createNewLogFile(this.mNextSequenceNumber);
        this.mRotateLogForNextWrite = false;
    }

    private void createNewLogFile(long startSequenceNumber) throws IOException, JournalClosedException {
        this.checkIsWritable();
        URI newLog = UfsJournalFile.encodeLogFileLocation(this.mJournal, startSequenceNumber, Long.MAX_VALUE);
        UfsJournalFile currentLog = UfsJournalFile.createLogFile(newLog, startSequenceNumber, Long.MAX_VALUE);
        OutputStream outputStream = this.mUfs.create(currentLog.getLocation().toString(), CreateOptions.defaults((AlluxioConfiguration)ServerConfiguration.global()).setEnsureAtomic(false).setCreateParent(true));
        this.mJournalOutputStream = new JournalOutputStream(currentLog, outputStream);
        LOG.info("Created current log file: {}", (Object)currentLog);
    }

    private void completeLog(UfsJournalFile currentLog, long nextSequenceNumber) throws IOException {
        try {
            this.checkIsWritable();
        }
        catch (JournalClosedException e) {
            LOG.warn("Skipping completeLog() since journal is not writable. error: {}", (Object)e.toString());
            return;
        }
        String current = currentLog.getLocation().toString();
        if (nextSequenceNumber <= currentLog.getStart()) {
            LOG.info("No journal entry found in current journal file {}. Deleting it", (Object)current);
            if (!this.mUfs.deleteFile(current)) {
                LOG.warn("Failed to delete empty journal file {}", (Object)current);
            }
            return;
        }
        String completed = UfsJournalFile.encodeLogFileLocation(this.mJournal, currentLog.getStart(), nextSequenceNumber).toString();
        try {
            this.checkIsWritable();
        }
        catch (JournalClosedException e) {
            LOG.warn("Skipping completeLog() since journal is not writable. error: {}", (Object)e.toString());
            return;
        }
        LOG.info(String.format("Completing log %s with next sequence number %d", current, nextSequenceNumber));
        if (!this.mUfs.renameFile(current, completed)) {
            if (!this.mUfs.exists(completed)) {
                throw new IOException(String.format("Failed to rename journal log from %s to %s", current, completed));
            }
            if (this.mUfs.exists(current)) {
                LOG.info("Deleting current log {}", (Object)current);
                if (!this.mUfs.deleteFile(current)) {
                    LOG.warn("Failed to delete current log file {}", (Object)current);
                }
            }
        }
    }

    @Override
    public synchronized void flush() throws IOException, JournalClosedException {
        boolean overSize;
        this.checkIsWritable();
        this.maybeRecoverFromUfsFailures();
        if (this.mJournalOutputStream == null || this.mJournalOutputStream.bytesWritten() == 0L) {
            return;
        }
        try {
            this.mJournalOutputStream.flush();
            this.mEntriesToFlush.clear();
        }
        catch (JournalClosedException.IOJournalClosedException e) {
            throw e.toJournalClosedException();
        }
        catch (IOException e) {
            this.mNeedsRecovery = true;
            UfsJournalFile currentLog = this.mJournalOutputStream.currentLog();
            this.mJournalOutputStream = null;
            throw new IOException(ExceptionMessage.JOURNAL_FLUSH_FAILURE.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, new Object[]{currentLog, e.getMessage()}), e);
        }
        boolean bl = overSize = this.mJournalOutputStream.bytesWritten() >= this.mMaxLogSize;
        if (overSize || !this.mUfs.supportsFlush()) {
            if (overSize) {
                LOG.info("Rotating log file {}. size: {} maxSize: {}", new Object[]{this.currentLogName(), this.mJournalOutputStream.bytesWritten(), this.mMaxLogSize});
            }
            this.mRotateLogForNextWrite = true;
        }
    }

    @Override
    public synchronized void close() throws IOException {
        Closer closer = Closer.create();
        if (this.mJournalOutputStream != null) {
            closer.register((Closeable)this.mJournalOutputStream);
        }
        closer.register((Closeable)this.mGarbageCollector);
        closer.close();
        this.mClosed = true;
    }

    private static DataOutputStream wrapDataOutputStream(OutputStream stream) {
        if (stream instanceof DataOutputStream) {
            return (DataOutputStream)stream;
        }
        return new DataOutputStream(stream);
    }

    public synchronized long getNextSequenceNumber() {
        return this.mNextSequenceNumber;
    }

    @VisibleForTesting
    synchronized JournalOutputStream getJournalOutputStream() {
        return this.mJournalOutputStream;
    }

    private void checkIsWritable() throws JournalClosedException {
        if (!this.mJournal.isWritable()) {
            throw new JournalClosedException(String.format("writer not allowed to write (no longer primary). location: %s currentLog: %s", this.mJournal.getLocation(), this.currentLogName()));
        }
    }

    private String currentLogName() {
        if (this.mJournalOutputStream != null) {
            return this.mJournalOutputStream.currentLog().toString();
        }
        return "(null output stream)";
    }

    private class JournalOutputStream
    extends OutputStream {
        private final DataOutputStream mOutputStream;
        private final UfsJournalFile mCurrentLog;

        JournalOutputStream(UfsJournalFile currentLog, OutputStream stream) throws IOException {
            this.mOutputStream = UfsJournalLogWriter.wrapDataOutputStream(stream);
            this.mCurrentLog = currentLog;
        }

        long bytesWritten() {
            if (this.mOutputStream == null) {
                return 0L;
            }
            return this.mOutputStream.size();
        }

        UfsJournalFile currentLog() {
            return this.mCurrentLog;
        }

        @Override
        public void write(int b) throws IOException {
            this.checkJournalWriterOpen();
            this.mOutputStream.write(b);
        }

        @Override
        public void write(byte[] b) throws IOException {
            this.checkJournalWriterOpen();
            this.mOutputStream.write(b);
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            this.checkJournalWriterOpen();
            this.mOutputStream.write(b, off, len);
        }

        @Override
        public void flush() throws IOException {
            this.checkJournalWriterOpen();
            this.mOutputStream.flush();
        }

        @Override
        public void close() throws IOException {
            this.checkJournalWriterOpen();
            this.mOutputStream.close();
            LOG.info("Marking {} as complete with log entries within [{}, {}).", new Object[]{this.mCurrentLog.getLocation(), this.mCurrentLog.getStart(), UfsJournalLogWriter.this.mNextSequenceNumber});
            UfsJournalLogWriter.this.completeLog(this.mCurrentLog, UfsJournalLogWriter.this.mNextSequenceNumber);
        }

        private void checkJournalWriterOpen() throws JournalClosedException.IOJournalClosedException {
            if (UfsJournalLogWriter.this.mClosed) {
                throw new JournalClosedException("Journal writer is closed. currentLog: " + this.mCurrentLog).toIOException();
            }
        }
    }
}

