/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.journal.raft;

import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.JournalClosedException;
import alluxio.master.journal.JournalWriter;
import alluxio.master.journal.raft.RaftJournalAppender;
import alluxio.master.journal.raft.RaftJournalSystem;
import alluxio.proto.journal.Journal;
import alluxio.util.FormatUtils;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class RaftJournalWriter
implements JournalWriter {
    private static final Logger LOG = LoggerFactory.getLogger(RaftJournalWriter.class);
    private final long mWriteTimeoutMs;
    private final long mEntrySizeMax;
    private final long mFlushBatchBytes;
    private final AtomicLong mNextSequenceNumberToWrite;
    private final AtomicLong mLastSubmittedSequenceNumber;
    private final AtomicLong mLastCommittedSequenceNumber;
    private final RaftJournalAppender mClient;
    private volatile boolean mClosed;
    private Journal.JournalEntry.Builder mJournalEntryBuilder;
    private final AtomicLong mCurrentJournalEntrySize;

    public RaftJournalWriter(long nextSequenceNumberToWrite, RaftJournalAppender client) {
        LOG.debug("Journal writer created starting at SN#{}", (Object)nextSequenceNumberToWrite);
        this.mNextSequenceNumberToWrite = new AtomicLong(nextSequenceNumberToWrite);
        this.mLastSubmittedSequenceNumber = new AtomicLong(-1L);
        this.mLastCommittedSequenceNumber = new AtomicLong(-1L);
        this.mCurrentJournalEntrySize = new AtomicLong(0L);
        this.mClient = client;
        this.mClosed = false;
        this.mWriteTimeoutMs = ServerConfiguration.getMs(PropertyKey.MASTER_EMBEDDED_JOURNAL_WRITE_TIMEOUT);
        this.mEntrySizeMax = ServerConfiguration.getBytes(PropertyKey.MASTER_EMBEDDED_JOURNAL_ENTRY_SIZE_MAX);
        this.mFlushBatchBytes = this.mEntrySizeMax / 3L;
    }

    @Override
    public void write(Journal.JournalEntry entry) throws IOException, JournalClosedException {
        if (this.mClosed) {
            throw new JournalClosedException("Cannot write to journal. Journal writer has been closed");
        }
        Preconditions.checkState((entry.getAllFields().size() <= 2 ? 1 : 0) != 0, (String)"Raft journal entries should never set multiple fields, but found %s", (Object)entry);
        if (this.mCurrentJournalEntrySize.get() > this.mFlushBatchBytes) {
            this.flush();
        }
        if (this.mJournalEntryBuilder == null) {
            this.mJournalEntryBuilder = Journal.JournalEntry.newBuilder();
            this.mCurrentJournalEntrySize.set(0L);
        }
        LOG.trace("Writing entry {}: {}", (Object)this.mNextSequenceNumberToWrite, (Object)entry);
        this.mJournalEntryBuilder.addJournalEntries(entry.toBuilder().setSequenceNumber(this.mNextSequenceNumberToWrite.getAndIncrement()).build());
        long size = entry.getSerializedSize();
        if (size > this.mEntrySizeMax) {
            LOG.error("Journal entry size ({}) is bigger than the max allowed size ({}) defined by {}", new Object[]{FormatUtils.getSizeFromBytes((long)size), FormatUtils.getSizeFromBytes((long)this.mEntrySizeMax), PropertyKey.MASTER_EMBEDDED_JOURNAL_ENTRY_SIZE_MAX.getName()});
        }
        this.mCurrentJournalEntrySize.addAndGet(size);
    }

    @Override
    public void flush() throws IOException, JournalClosedException {
        if (this.mClosed) {
            throw new JournalClosedException("Cannot flush. Journal writer has been closed");
        }
        if (this.mJournalEntryBuilder != null) {
            long flushSN = this.mNextSequenceNumberToWrite.get() - 1L;
            try {
                Journal.JournalEntry entry = this.mJournalEntryBuilder.build();
                Message message = RaftJournalSystem.toRaftMessage(entry);
                this.mLastSubmittedSequenceNumber.set(flushSN);
                LOG.trace("Flushing entry {} ({})", (Object)entry, (Object)message);
                RaftClientReply reply = this.mClient.sendAsync(message, TimeDuration.valueOf((long)this.mWriteTimeoutMs, (TimeUnit)TimeUnit.MILLISECONDS)).get(this.mWriteTimeoutMs, TimeUnit.MILLISECONDS);
                this.mLastCommittedSequenceNumber.set(flushSN);
                if (reply.getException() != null) {
                    throw reply.getException();
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException(e);
            }
            catch (ExecutionException e) {
                throw new IOException(e.getCause());
            }
            catch (TimeoutException e) {
                throw new IOException(String.format("Timed out after waiting %s milliseconds for journal entries to be processed", this.mWriteTimeoutMs), e);
            }
            this.mJournalEntryBuilder = null;
        }
    }

    @Override
    public void close() throws IOException {
        if (this.mClosed) {
            return;
        }
        this.mClosed = true;
        LOG.info("Closing journal writer. Last sequence numbers written/submitted/committed: {}/{}/{}", new Object[]{this.mNextSequenceNumberToWrite.get() - 1L, this.mLastSubmittedSequenceNumber.get(), this.mLastCommittedSequenceNumber.get()});
        this.closeClient();
    }

    public long getNextSequenceNumberToWrite() {
        return this.mNextSequenceNumberToWrite.get();
    }

    private void closeClient() {
        try {
            this.mClient.close();
        }
        catch (IOException e) {
            LOG.warn("Failed to close raft client: {}", (Object)e.toString());
        }
    }
}

