/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master;

import alluxio.ProcessUtils;
import alluxio.Server;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.master.Master;
import alluxio.master.MasterRegistry;
import alluxio.master.journal.JournalContext;
import alluxio.master.journal.JournalEntryAssociation;
import alluxio.master.journal.JournalEntryStreamReader;
import alluxio.master.journal.JournalUtils;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.proto.journal.Journal;
import alluxio.resource.CloseableIterator;
import alluxio.util.ThreadFactoryUtils;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BackupManager {
    private static final Logger LOG = LoggerFactory.getLogger(BackupManager.class);
    private static final long TERMINATION_SEQ = -1L;
    public static final String BACKUP_FILE_FORMAT = "alluxio-backup-%s-%s.gz";
    public static final Pattern BACKUP_FILE_PATTERN = Pattern.compile("alluxio-backup-[0-9]+-[0-9]+-[0-9]+-([0-9]+).gz");
    private final MasterRegistry mRegistry;
    private long mBackupEntriesCount = -1L;
    private long mRestoreEntriesCount = -1L;
    private long mBackupTimeMs = -1L;
    private long mRestoreTimeMs = -1L;

    public BackupManager(MasterRegistry registry) {
        this.mRegistry = registry;
        MetricsSystem.registerGaugeIfAbsent((String)MetricKey.MASTER_LAST_BACKUP_ENTRIES_COUNT.getName(), () -> this.mBackupEntriesCount);
        MetricsSystem.registerGaugeIfAbsent((String)MetricKey.MASTER_LAST_BACKUP_RESTORE_COUNT.getName(), () -> this.mRestoreEntriesCount);
        MetricsSystem.registerGaugeIfAbsent((String)MetricKey.MASTER_LAST_BACKUP_RESTORE_TIME_MS.getName(), () -> this.mRestoreTimeMs);
        MetricsSystem.registerGaugeIfAbsent((String)MetricKey.MASTER_LAST_BACKUP_TIME_MS.getName(), () -> this.mBackupTimeMs);
    }

    public void backup(OutputStream os, AtomicLong entryCount) throws IOException {
        GzipCompressorOutputStream zipStream = new GzipCompressorOutputStream(os);
        ExecutorCompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(4, ThreadFactoryUtils.build((String)"master-backup-%d", (boolean)true)));
        HashSet activeTasks = new HashSet();
        LinkedBlockingQueue journalEntryQueue = new LinkedBlockingQueue(ServerConfiguration.getInt(PropertyKey.MASTER_BACKUP_ENTRY_BUFFER_COUNT));
        AtomicBoolean bufferingActive = new AtomicBoolean(true);
        long startBackupTime = System.currentTimeMillis();
        activeTasks.add(completionService.submit(() -> {
            try {
                for (Master master : this.mRegistry.getServers()) {
                    CloseableIterator<Journal.JournalEntry> it = master.getJournalEntryIterator();
                    Throwable throwable = null;
                    try {
                        while (((Iterator)it.get()).hasNext()) {
                            journalEntryQueue.put(((Iterator)it.get()).next());
                            if (!Thread.interrupted()) continue;
                            throw new InterruptedException();
                        }
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (it == null) continue;
                        if (throwable != null) {
                            try {
                                it.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        it.close();
                    }
                }
                journalEntryQueue.put(Journal.JournalEntry.newBuilder().setSequenceNumber(-1L).build());
                Boolean bl = true;
                return bl;
            }
            catch (InterruptedException ie) {
                LOG.info("Backup reader task interrupted");
                Thread.currentThread().interrupt();
                throw new RuntimeException("Thread interrupted while reading master state.", ie);
            }
            finally {
                bufferingActive.set(false);
            }
        }));
        activeTasks.add(completionService.submit(() -> {
            try {
                LinkedList pendingEntries = new LinkedList();
                while (bufferingActive.get() || journalEntryQueue.size() > 0) {
                    if (0 == journalEntryQueue.drainTo(pendingEntries)) {
                        pendingEntries.add(journalEntryQueue.take());
                    }
                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }
                    for (Journal.JournalEntry journalEntry : pendingEntries) {
                        if (journalEntry.getSequenceNumber() == -1L) {
                            return true;
                        }
                        journalEntry.writeDelimitedTo((OutputStream)zipStream);
                        entryCount.incrementAndGet();
                    }
                    pendingEntries.clear();
                }
                return true;
            }
            catch (InterruptedException ie) {
                LOG.info("Backup writer task interrupted");
                Thread.currentThread().interrupt();
                throw new RuntimeException("Thread interrupted while writing to backup stream.", ie);
            }
        }));
        this.safeWaitTasks(activeTasks, completionService);
        this.mBackupTimeMs = System.currentTimeMillis() - startBackupTime;
        this.mBackupEntriesCount = entryCount.get();
        zipStream.finish();
        LOG.info("Created backup with {} entries", (Object)entryCount.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initFromBackup(InputStream is) throws IOException {
        try (GzipCompressorInputStream gzIn = new GzipCompressorInputStream(is);
             JournalEntryStreamReader reader = new JournalEntryStreamReader((InputStream)gzIn);){
            List masters = this.mRegistry.getServers();
            ExecutorCompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(2, ThreadFactoryUtils.build((String)"master-backup-%d", (boolean)true)));
            HashSet activeTasks = new HashSet();
            LinkedBlockingQueue journalEntryQueue = new LinkedBlockingQueue(ServerConfiguration.getInt(PropertyKey.MASTER_BACKUP_ENTRY_BUFFER_COUNT));
            AtomicBoolean readingActive = new AtomicBoolean(true);
            ImmutableMap mastersByName = Maps.uniqueIndex(masters, Server::getName);
            AtomicLong appliedEntryCount = new AtomicLong(0L);
            ScheduledExecutorService traceExecutor = Executors.newScheduledThreadPool(1, ThreadFactoryUtils.build((String)"master-backup-tracer-%d", (boolean)true));
            traceExecutor.scheduleAtFixedRate(() -> LOG.info("{} entries from backup applied so far...", (Object)appliedEntryCount.get()), 30L, 30L, TimeUnit.SECONDS);
            long startRestoreTime = System.currentTimeMillis();
            activeTasks.add(completionService.submit(() -> {
                try {
                    Journal.JournalEntry entry;
                    while ((entry = reader.readEntry()) != null) {
                        journalEntryQueue.put(entry);
                    }
                    journalEntryQueue.put(Journal.JournalEntry.newBuilder().setSequenceNumber(-1L).build());
                    Boolean bl = true;
                    return bl;
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("Thread interrupted while reading from backup stream.", ie);
                }
                finally {
                    readingActive.set(false);
                }
            }));
            activeTasks.add(completionService.submit(() -> BackupManager.lambda$initFromBackup$8(readingActive, journalEntryQueue, masters, (Map)mastersByName, appliedEntryCount)));
            try {
                this.safeWaitTasks(activeTasks, completionService);
            }
            finally {
                this.mRestoreTimeMs = System.currentTimeMillis() - startRestoreTime;
                this.mRestoreEntriesCount = appliedEntryCount.get();
                traceExecutor.shutdownNow();
            }
            LOG.info("Restored {} entries from backup", (Object)appliedEntryCount.get());
        }
    }

    private void safeWaitTasks(Set<Future<?>> activeTasks, CompletionService<?> completionService) throws IOException {
        while (activeTasks.size() > 0) {
            try {
                Future<?> resultFuture = completionService.take();
                activeTasks.remove(resultFuture);
                resultFuture.get();
            }
            catch (InterruptedException ie) {
                activeTasks.forEach(future -> future.cancel(true));
                Thread.currentThread().interrupt();
                throw new RuntimeException("Thread interrupted while waiting for backup threads.", ie);
            }
            catch (ExecutionException ee) {
                activeTasks.forEach(future -> future.cancel(true));
                Throwable cause = ee.getCause();
                if (cause instanceof IOException) {
                    throw (IOException)cause;
                }
                throw new IOException(cause);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive exception aggregation
     */
    private static /* synthetic */ Boolean lambda$initFromBackup$8(AtomicBoolean readingActive, LinkedBlockingQueue journalEntryQueue, List masters, Map mastersByName, AtomicLong appliedEntryCount) throws Exception {
        try {
            while (readingActive.get() || journalEntryQueue.size() > 0) {
                LinkedList<Journal.JournalEntry> drainedEntries = new LinkedList<Journal.JournalEntry>();
                if (0 == journalEntryQueue.drainTo(drainedEntries)) {
                    Journal.JournalEntry entry = (Journal.JournalEntry)journalEntryQueue.poll(10L, TimeUnit.MILLISECONDS);
                    if (entry == null) continue;
                    drainedEntries.add(entry);
                }
                HashMap<Master, JournalContext> masterJCMap = new HashMap<Master, JournalContext>();
                try {
                    for (Master master : masters) {
                        masterJCMap.put(master, master.createJournalContext());
                    }
                    for (Journal.JournalEntry entry : drainedEntries) {
                        String masterName;
                        if (entry.getSequenceNumber() == -1L) {
                            Boolean bl = true;
                            return bl;
                        }
                        try {
                            masterName = JournalEntryAssociation.getMasterForEntry(entry);
                        }
                        catch (IllegalStateException ise) {
                            ProcessUtils.fatalError(LOG, ise, "Unrecognized journal entry: %s", entry);
                            throw ise;
                        }
                        try {
                            Master master = (Master)mastersByName.get(masterName);
                            master.applyAndJournal((Supplier)masterJCMap.get(master), entry);
                            appliedEntryCount.incrementAndGet();
                        }
                        catch (Exception e) {
                            JournalUtils.handleJournalReplayFailure(LOG, e, "Failed to apply journal entry to master %s. Entry: %s", masterName, entry);
                        }
                    }
                }
                finally {
                    for (JournalContext journalContext : masterJCMap.values()) {
                        journalContext.close();
                    }
                }
            }
            return true;
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Thread interrupted while applying backup content.", ie);
        }
    }
}

