/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.journal;

import alluxio.annotation.SuppressFBWarnings;
import alluxio.collections.ConcurrentHashSet;
import alluxio.concurrent.ForkJoinPoolHelper;
import alluxio.concurrent.jsr.ForkJoinPool;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.JournalClosedException;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.master.journal.JournalUtils;
import alluxio.master.journal.JournalWriter;
import alluxio.master.journal.sink.JournalSink;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.proto.journal.Journal;
import alluxio.util.logging.SamplingLogger;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Status;
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
@SuppressFBWarnings(value={"RV_RETURN_VALUE_IGNORED"})
public final class AsyncJournalWriter {
    private static final Logger SAMPLING_LOG = new SamplingLogger(LoggerFactory.getLogger(AsyncJournalWriter.class), 30000L);
    private final JournalWriter mJournalWriter;
    private final ConcurrentLinkedQueue<Journal.JournalEntry> mQueue;
    private final AtomicLong mCounter;
    private final AtomicLong mFlushCounter;
    private Long mWriteCounter;
    private final long mFlushBatchTimeNs;
    private final Set<FlushTicket> mTicketSet = new ConcurrentHashSet();
    private String mJournalName = "Raft";
    private Thread mFlushThread = new Thread(this::doFlush, "AsyncJournalWriterThread-" + this.mJournalName);
    private final Semaphore mFlushSemaphore = new Semaphore(0, true);
    private volatile boolean mStopFlushing = false;
    private final Supplier<Set<JournalSink>> mJournalSinks;

    public AsyncJournalWriter(JournalWriter journalWriter, Supplier<Set<JournalSink>> journalSinks) {
        this.mJournalWriter = (JournalWriter)Preconditions.checkNotNull((Object)journalWriter, (Object)"journalWriter");
        this.mQueue = new ConcurrentLinkedQueue();
        this.mCounter = new AtomicLong(0L);
        this.mFlushCounter = new AtomicLong(0L);
        this.mWriteCounter = 0L;
        this.mFlushBatchTimeNs = TimeUnit.NANOSECONDS.convert(ServerConfiguration.getMs(PropertyKey.MASTER_JOURNAL_FLUSH_BATCH_TIME_MS), TimeUnit.MILLISECONDS);
        this.mJournalSinks = journalSinks;
        this.mFlushThread.start();
    }

    public AsyncJournalWriter(JournalWriter journalWriter, Supplier<Set<JournalSink>> journalSinks, String journalName) {
        this(journalWriter, journalSinks);
        this.mJournalName = journalName;
    }

    public long appendEntry(Journal.JournalEntry entry) {
        this.mCounter.incrementAndGet();
        this.mQueue.offer(entry);
        return this.mCounter.get();
    }

    public void close() {
        this.stop();
    }

    @VisibleForTesting
    protected void stop() {
        this.mStopFlushing = true;
        this.mFlushSemaphore.release();
        try {
            this.mFlushThread.join();
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            return;
        }
        finally {
            this.mFlushThread = null;
            this.mFlushSemaphore.tryAcquire();
        }
    }

    @VisibleForTesting
    protected void start() {
        if (this.mFlushThread != null) {
            this.close();
        }
        this.mFlushThread = new Thread(this::doFlush);
        this.mStopFlushing = false;
        this.mFlushThread.start();
    }

    private void doFlush() {
        while (!this.mStopFlushing) {
            while (this.mQueue.isEmpty() && !this.mStopFlushing) {
                try {
                    if (!this.mFlushSemaphore.tryAcquire(this.mFlushBatchTimeNs, TimeUnit.NANOSECONDS)) continue;
                    break;
                }
                catch (InterruptedException ie) {
                    // empty catch block
                    break;
                }
            }
            try {
                Serializable serializable;
                Journal.JournalEntry entry;
                long startTime = System.nanoTime();
                while (!this.mQueue.isEmpty() && (entry = this.mQueue.peek()) != null) {
                    this.mJournalWriter.write(entry);
                    JournalUtils.sinkAppend(this.mJournalSinks, entry);
                    this.mQueue.poll();
                    serializable = this.mWriteCounter;
                    Long l = this.mWriteCounter = Long.valueOf(this.mWriteCounter + 1L);
                    if (System.nanoTime() - startTime < this.mFlushBatchTimeNs || this.mStopFlushing) continue;
                    break;
                }
                if (this.mFlushCounter.get() < this.mWriteCounter) {
                    Timer.Context ctx = MetricsSystem.timer((String)MetricKey.MASTER_JOURNAL_FLUSH_TIMER.getName()).time();
                    serializable = null;
                    try {
                        this.mJournalWriter.flush();
                    }
                    catch (Throwable throwable) {
                        serializable = throwable;
                        throw throwable;
                    }
                    finally {
                        if (ctx != null) {
                            if (serializable != null) {
                                try {
                                    ctx.close();
                                }
                                catch (Throwable throwable) {
                                    ((Throwable)serializable).addSuppressed(throwable);
                                }
                            } else {
                                ctx.close();
                            }
                        }
                    }
                    JournalUtils.sinkFlush(this.mJournalSinks);
                    this.mFlushCounter.set(this.mWriteCounter);
                }
                Iterator<FlushTicket> ticketIterator = this.mTicketSet.iterator();
                while (ticketIterator.hasNext()) {
                    FlushTicket ticket = ticketIterator.next();
                    if (ticket.getTargetCounter() > this.mFlushCounter.get()) continue;
                    ticket.setCompleted();
                    ticketIterator.remove();
                }
            }
            catch (JournalClosedException | IOException exc) {
                SAMPLING_LOG.warn("Failed to flush journal entry: " + exc.getMessage(), exc);
                Metrics.JOURNAL_FLUSH_FAILURE.inc();
                Iterator<FlushTicket> ticketIterator = this.mTicketSet.iterator();
                while (ticketIterator.hasNext()) {
                    FlushTicket ticket = ticketIterator.next();
                    ticketIterator.remove();
                    if (ticket.getTargetCounter() <= this.mFlushCounter.get()) {
                        ticket.setCompleted();
                        continue;
                    }
                    ticket.setError(exc);
                }
            }
        }
    }

    public void flush(long targetCounter) throws IOException, JournalClosedException {
        if (targetCounter <= this.mFlushCounter.get()) {
            return;
        }
        FlushTicket ticket = new FlushTicket(targetCounter);
        this.mTicketSet.add(ticket);
        try {
            this.mFlushSemaphore.release();
            ticket.waitCompleted();
        }
        catch (InterruptedException ie) {
            throw new AlluxioStatusException(Status.CANCELLED.withCause((Throwable)ie));
        }
        catch (Throwable e) {
            if (e instanceof IOException) {
                throw (IOException)e;
            }
            if (e instanceof JournalClosedException) {
                throw (JournalClosedException)e;
            }
            throw new AlluxioStatusException(Status.INTERNAL.withCause(e));
        }
        finally {
            this.mFlushSemaphore.tryAcquire();
        }
    }

    @ThreadSafe
    private static final class Metrics {
        private static final Counter JOURNAL_FLUSH_FAILURE = MetricsSystem.counter((String)MetricKey.MASTER_JOURNAL_FLUSH_FAILURE.getName());

        private Metrics() {
        }
    }

    private class FlushTicket
    implements ForkJoinPool.ManagedBlocker {
        private final long mTargetCounter;
        private SettableFuture<Void> mIsCompleted;
        private Throwable mError;

        public FlushTicket(long targetCounter) {
            this.mTargetCounter = targetCounter;
            this.mIsCompleted = SettableFuture.create();
            this.mError = null;
        }

        public long getTargetCounter() {
            return this.mTargetCounter;
        }

        public void setCompleted() {
            this.mIsCompleted.set(null);
        }

        public void setError(Throwable exc) {
            this.mIsCompleted.setException(exc);
            this.mError = exc;
        }

        public void waitCompleted() throws Throwable {
            ForkJoinPoolHelper.safeManagedBlock((ForkJoinPool.ManagedBlocker)this);
            if (this.mError != null) {
                throw this.mError;
            }
        }

        public boolean block() throws InterruptedException {
            try {
                this.mIsCompleted.get();
            }
            catch (ExecutionException exc) {
                this.mError = exc.getCause();
            }
            return true;
        }

        public boolean isReleasable() {
            return this.mIsCompleted.isDone() || this.mIsCompleted.isCancelled();
        }
    }
}

