/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.journalv0.ufs;

import alluxio.RuntimeConstants;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.ExceptionMessage;
import alluxio.master.journalv0.JournalFormatter;
import alluxio.master.journalv0.JournalOutputStream;
import alluxio.master.journalv0.JournalWriter;
import alluxio.master.journalv0.ufs.UfsCheckpointManager;
import alluxio.master.journalv0.ufs.UfsJournal;
import alluxio.proto.journal.Journal;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.UnderFileSystemConfiguration;
import alluxio.underfs.options.CreateOptions;
import alluxio.util.UnderFileSystemUtils;
import com.google.common.base.Preconditions;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public final class UfsJournalWriter
implements JournalWriter {
    private static final Logger LOG = LoggerFactory.getLogger(UfsJournalWriter.class);
    private final UfsJournal mJournal;
    private final URI mCompletedLocation;
    private final URI mTempCheckpoint;
    private final UnderFileSystem mUfs;
    private long mNextCompleteLogNumber = 1L;
    private CheckpointOutputStream mCheckpointOutputStream = null;
    private EntryOutputStream mEntryOutputStream = null;
    private long mNextEntrySequenceNumber = 1L;
    private UfsCheckpointManager mCheckpointManager;

    UfsJournalWriter(UfsJournal journal) {
        this.mJournal = (UfsJournal)Preconditions.checkNotNull((Object)journal, (Object)"journal");
        this.mCompletedLocation = this.mJournal.getCompletedLocation();
        try {
            this.mTempCheckpoint = new URI(this.mJournal.getCheckpoint() + ".tmp");
        }
        catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
        this.mUfs = UnderFileSystem.Factory.create((String)this.mJournal.getLocation().toString(), (UnderFileSystemConfiguration)UnderFileSystemConfiguration.defaults((AlluxioConfiguration)ServerConfiguration.global()));
        this.mCheckpointManager = new UfsCheckpointManager(this.mUfs, this.mJournal.getCheckpoint(), this);
    }

    @Override
    public synchronized void completeLogs() throws IOException {
        LOG.info("Marking all logs as complete.");
        this.mNextCompleteLogNumber = 1L;
        URI log = this.mJournal.getCompletedLog(this.mNextCompleteLogNumber);
        while (this.mUfs.isFile(log.toString())) {
            ++this.mNextCompleteLogNumber;
            log = this.mJournal.getCompletedLog(this.mNextCompleteLogNumber);
        }
        this.completeCurrentLog();
    }

    @Override
    public synchronized JournalOutputStream getCheckpointOutputStream(long latestSequenceNumber) throws IOException {
        if (this.mCheckpointOutputStream == null) {
            this.mCheckpointManager.recover();
            LOG.info("Creating tmp checkpoint file: {}", (Object)this.mTempCheckpoint);
            if (!this.mUfs.isDirectory(this.mJournal.getLocation().toString())) {
                LOG.info("Creating journal folder: {}", (Object)this.mJournal.getLocation());
                this.mUfs.mkdirs(this.mJournal.getLocation().toString());
            }
            this.mNextEntrySequenceNumber = latestSequenceNumber + 1L;
            LOG.info("Latest journal sequence number: {} Next journal sequence number: {}", (Object)latestSequenceNumber, (Object)this.mNextEntrySequenceNumber);
            UnderFileSystemUtils.deleteFileIfExists((UnderFileSystem)this.mUfs, (String)this.mTempCheckpoint.toString());
            this.mCheckpointOutputStream = new CheckpointOutputStream(new DataOutputStream(this.mUfs.create(this.mTempCheckpoint.toString())));
        }
        return this.mCheckpointOutputStream;
    }

    @Override
    public synchronized void write(Journal.JournalEntry entry) throws IOException {
        if (this.mCheckpointOutputStream == null || !this.mCheckpointOutputStream.isClosed()) {
            throw new IOException("The checkpoint must be written and closed before writing entries.");
        }
        if (this.mEntryOutputStream == null) {
            this.mEntryOutputStream = new EntryOutputStream(this.mUfs, this.mJournal.getCurrentLog(), this.mJournal.getJournalFormatter(), this);
        }
        this.mEntryOutputStream.write(entry);
    }

    @Override
    public synchronized void flush() throws IOException {
        if (this.mCheckpointOutputStream == null || !this.mCheckpointOutputStream.isClosed()) {
            throw new IOException("The checkpoint must be written and closed before writing entries.");
        }
        if (this.mEntryOutputStream == null) {
            return;
        }
        this.mEntryOutputStream.flush();
    }

    @Override
    public synchronized long getNextSequenceNumber() {
        return this.mNextEntrySequenceNumber++;
    }

    @Override
    public synchronized void close() throws IOException {
        if (this.mCheckpointOutputStream != null) {
            this.mCheckpointOutputStream.close();
        }
        if (this.mEntryOutputStream != null) {
            this.mEntryOutputStream.close();
        }
        this.mUfs.close();
    }

    @Override
    public void recover() {
        this.mCheckpointManager.recover();
    }

    @Override
    public synchronized void deleteCompletedLogs() throws IOException {
        LOG.info("Deleting all completed log files...");
        long logNumber = 1L;
        while (this.mUfs.isFile(this.mJournal.getCompletedLog(logNumber).toString())) {
            ++logNumber;
        }
        for (long i = logNumber - 1L; i >= 0L; --i) {
            URI log = this.mJournal.getCompletedLog(i);
            LOG.info("Deleting completed log: {}", (Object)log);
            this.mUfs.deleteFile(log.toString());
        }
        LOG.info("Finished deleting all completed log files.");
        this.mNextCompleteLogNumber = 1L;
    }

    @Override
    public synchronized void completeCurrentLog() throws IOException {
        URI currentLog = this.mJournal.getCurrentLog();
        if (!this.mUfs.isFile(currentLog.toString())) {
            return;
        }
        if (!this.mUfs.isDirectory(this.mCompletedLocation.toString())) {
            this.mUfs.mkdirs(this.mCompletedLocation.toString());
        }
        URI completedLog = this.mJournal.getCompletedLog(this.mNextCompleteLogNumber);
        this.mUfs.renameFile(currentLog.toString(), completedLog.toString());
        LOG.info("Completed current log: {} to completed log: {}", (Object)currentLog, (Object)completedLog);
        ++this.mNextCompleteLogNumber;
    }

    @ThreadSafe
    protected static class EntryOutputStream
    implements JournalOutputStream {
        private final UnderFileSystem mUfs;
        private final URI mCurrentLog;
        private final JournalFormatter mJournalFormatter;
        private final UfsJournalWriter mJournalWriter;
        private final long mMaxLogSize;
        private OutputStream mRawOutputStream;
        private DataOutputStream mDataOutputStream;
        private boolean mIsClosed = false;
        private boolean mRotateLogForNextWrite = false;

        public EntryOutputStream(UnderFileSystem ufs, URI log, JournalFormatter journalFormatter, UfsJournalWriter journalWriter) throws IOException {
            this.mUfs = ufs;
            this.mCurrentLog = log;
            this.mJournalFormatter = journalFormatter;
            this.mJournalWriter = journalWriter;
            this.mMaxLogSize = ServerConfiguration.getBytes(PropertyKey.MASTER_JOURNAL_LOG_SIZE_BYTES_MAX);
            this.mRawOutputStream = this.mUfs.create(this.mCurrentLog.toString(), CreateOptions.defaults((AlluxioConfiguration)ServerConfiguration.global()).setEnsureAtomic(false).setCreateParent(true));
            LOG.info("Opened current log file: {}", (Object)this.mCurrentLog);
            this.mDataOutputStream = new DataOutputStream(this.mRawOutputStream);
        }

        @Override
        public synchronized void write(Journal.JournalEntry entry) throws IOException {
            if (this.mIsClosed) {
                throw new IOException(ExceptionMessage.JOURNAL_WRITE_AFTER_CLOSE.getMessage(new Object[0]));
            }
            if (this.mRotateLogForNextWrite) {
                this.rotateLog();
                this.mRotateLogForNextWrite = false;
            }
            try {
                this.mJournalFormatter.serialize(entry.toBuilder().setSequenceNumber(this.mJournalWriter.getNextSequenceNumber()).build(), this.mDataOutputStream);
            }
            catch (IOException e) {
                this.mRotateLogForNextWrite = true;
                throw new IOException(ExceptionMessage.JOURNAL_WRITE_FAILURE.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, new Object[]{this.mCurrentLog, e.getMessage()}), e);
            }
        }

        @Override
        public synchronized void close() throws IOException {
            if (this.mIsClosed) {
                return;
            }
            if (this.mDataOutputStream != null) {
                this.mDataOutputStream.close();
            }
            this.mIsClosed = true;
        }

        @Override
        public synchronized void flush() throws IOException {
            boolean overSize;
            if (this.mIsClosed || this.mDataOutputStream.size() == 0) {
                return;
            }
            try {
                this.mDataOutputStream.flush();
            }
            catch (IOException e) {
                this.mRotateLogForNextWrite = true;
                throw new IOException(ExceptionMessage.JOURNAL_FLUSH_FAILURE.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, new Object[]{this.mCurrentLog, e.getMessage()}), e);
            }
            boolean bl = overSize = (long)this.mDataOutputStream.size() >= this.mMaxLogSize;
            if (overSize || !this.mUfs.supportsFlush()) {
                if (overSize) {
                    LOG.info("Rotating log file. size: {} maxSize: {}", (Object)this.mDataOutputStream.size(), (Object)this.mMaxLogSize);
                }
                this.mRotateLogForNextWrite = true;
            }
        }

        private void rotateLog() throws IOException {
            this.mDataOutputStream.close();
            this.mJournalWriter.completeCurrentLog();
            this.mRawOutputStream = this.mUfs.create(this.mCurrentLog.toString(), CreateOptions.defaults((AlluxioConfiguration)ServerConfiguration.global()).setEnsureAtomic(false).setCreateParent(true));
            LOG.info("Opened current log file: {}", (Object)this.mCurrentLog);
            this.mDataOutputStream = new DataOutputStream(this.mRawOutputStream);
        }
    }

    private class CheckpointOutputStream
    implements JournalOutputStream {
        private final DataOutputStream mOutputStream;
        private boolean mIsClosed = false;

        CheckpointOutputStream(DataOutputStream outputStream) {
            this.mOutputStream = outputStream;
        }

        boolean isClosed() {
            return this.mIsClosed;
        }

        @Override
        public synchronized void write(Journal.JournalEntry entry) throws IOException {
            if (this.mIsClosed) {
                throw new IOException(ExceptionMessage.JOURNAL_WRITE_AFTER_CLOSE.getMessage(new Object[0]));
            }
            UfsJournalWriter.this.mJournal.getJournalFormatter().serialize(entry.toBuilder().setSequenceNumber(UfsJournalWriter.this.getNextSequenceNumber()).build(), this.mOutputStream);
        }

        @Override
        public synchronized void close() throws IOException {
            if (this.mIsClosed) {
                return;
            }
            this.mOutputStream.flush();
            this.mOutputStream.close();
            LOG.info("Successfully created tmp checkpoint file: {}", (Object)UfsJournalWriter.this.mTempCheckpoint);
            UfsJournalWriter.this.mCheckpointManager.update(UfsJournalWriter.this.mTempCheckpoint);
            UfsJournalWriter.this.completeCurrentLog();
            this.mIsClosed = true;
        }

        @Override
        public synchronized void flush() throws IOException {
            if (this.mIsClosed) {
                return;
            }
            this.mOutputStream.flush();
        }
    }
}

