/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.notification.spool;

import com.google.common.annotations.VisibleForTesting;
import java.io.DataInput;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.spool.IndexManagement;
import org.apache.atlas.notification.spool.SpoolConfiguration;
import org.apache.atlas.notification.spool.SpoolUtils;
import org.apache.atlas.notification.spool.models.IndexRecord;
import org.apache.atlas.notification.spool.utils.local.FileLockedReadWrite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Publisher
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(Publisher.class);
    private final SpoolConfiguration configuration;
    private final IndexManagement indexManagement;
    private final AbstractNotification notificationHandler;
    private final String notificationHandlerName;
    private final int retryDestinationMS;
    private final int messageBatchSize;
    private String source;
    private boolean isDrain;
    private boolean isDestDown;

    public Publisher(SpoolConfiguration configuration, IndexManagement indexManagement, AbstractNotification notificationHandler) {
        this.configuration = configuration;
        this.indexManagement = indexManagement;
        this.notificationHandler = notificationHandler;
        this.notificationHandlerName = notificationHandler.getClass().getSimpleName();
        this.retryDestinationMS = configuration.getRetryDestinationMS();
        this.messageBatchSize = configuration.getMessageBatchSize();
    }

    @Override
    public void run() {
        this.source = this.configuration.getSourceName();
        LOG.info("Publisher.run(source={}): starting publisher {}", (Object)this.source, (Object)this.notificationHandlerName);
        try {
            IndexRecord record = null;
            while (true) {
                this.checkAndWaitIfDestinationDown();
                if (!this.isDrain) {
                    if (this.isDestDown) continue;
                    if ((record = this.fetchNext(record)) != null && this.processAndDispatch(record)) {
                        this.indexManagement.removeAsDone(record);
                        record = null;
                        continue;
                    }
                    this.indexManagement.rolloverSpoolFileIfNeeded();
                    continue;
                }
                break;
            }
        }
        catch (InterruptedException e) {
            LOG.error("Publisher.run(source={}): {}: Publisher: Shutdown might be in progress!", (Object)this.source, (Object)this.notificationHandlerName);
        }
        catch (Exception e) {
            LOG.error("Publisher.run(source={}): {}: Publisher: Exception in destination writing!", new Object[]{this.source, this.notificationHandlerName, e});
        }
        LOG.info("Publisher.run(source={}): publisher {} exited!", (Object)this.source, (Object)this.notificationHandlerName);
    }

    public void setDestinationDown() {
        this.isDestDown = true;
        this.indexManagement.updateFailedAttempt();
    }

    public void setDrain() {
        this.isDrain = true;
    }

    public boolean isDestinationDown() {
        return this.isDestDown;
    }

    private void checkAndWaitIfDestinationDown() throws InterruptedException {
        boolean bl = this.isDestDown = !this.notificationHandler.isReady(NotificationInterface.NotificationType.HOOK);
        if (this.isDestDown) {
            LOG.info("Publisher.waitIfDestinationDown(source={}): {}: Destination is down. Sleeping for: {} ms. Queue: {} items", new Object[]{this.source, this.notificationHandlerName, this.retryDestinationMS, this.indexManagement.getQueueSize()});
            Thread.sleep(this.retryDestinationMS);
        }
    }

    private IndexRecord fetchNext(IndexRecord record) {
        if (record == null) {
            try {
                record = this.indexManagement.next();
            }
            catch (Exception e) {
                LOG.error("Publisher.fetchNext(source={}): failed!. publisher={}", new Object[]{this.source, this.notificationHandlerName, e});
            }
        }
        return record;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    boolean processAndDispatch(IndexRecord record) throws IOException {
        boolean ret = true;
        if (SpoolUtils.fileExists(record)) {
            try (FileLockedReadWrite fileLockedRead = new FileLockedReadWrite(this.source);){
                DataInput dataInput = fileLockedRead.getInput(new File(record.getPath()));
                int lineInSpoolFile = 0;
                ArrayList<String> messages = new ArrayList<String>();
                String message = dataInput.readLine();
                while (message != null) {
                    if (++lineInSpoolFile >= record.getLine()) {
                        messages.add(message);
                        if (messages.size() == this.messageBatchSize) {
                            this.dispatch(record, lineInSpoolFile, messages);
                        }
                    }
                    message = dataInput.readLine();
                }
                this.dispatch(record, lineInSpoolFile, messages);
                LOG.info("Publisher.processAndDispatch(source={}): consumer={}: done reading file {}", new Object[]{this.source, this.notificationHandlerName, record.getPath()});
                ret = true;
            }
        } else {
            LOG.error("Publisher.processAndDispatch(source={}): publisher={}: file '{}' not found!", new Object[]{this.source, this.notificationHandlerName, record.getPath()});
            ret = true;
        }
        return ret;
    }

    private void dispatch(IndexRecord record, int lineInSpoolFile, List<String> messages) throws Exception {
        if (this.notificationHandler == null || messages == null || messages.size() == 0) {
            LOG.error("Publisher.dispatch(source={}): consumer={}: error sending logs", (Object)this.source, (Object)this.notificationHandlerName);
        } else {
            this.dispatch(record.getPath(), messages);
            record.setCurrentLine(lineInSpoolFile);
            this.indexManagement.update(record);
            this.isDestDown = false;
        }
    }

    private void dispatch(String filePath, List<String> messages) throws Exception {
        try {
            this.pauseBeforeSend();
            this.notificationHandler.sendInternal(NotificationInterface.NotificationType.HOOK, messages);
            if (this.isDestDown) {
                LOG.info("Publisher.dispatch(source={}): consumer={}: destination is now up. file={}", new Object[]{this.source, this.notificationHandlerName, filePath});
            }
        }
        catch (Exception exception) {
            this.setDestinationDown();
            LOG.error("Publisher.dispatch(source={}): consumer={}: error while sending logs to consumer", new Object[]{this.source, this.notificationHandlerName, exception});
            throw new NotificationException(exception, String.format("%s: %s: Publisher: Destination down!", this.source, this.notificationHandlerName));
        }
        finally {
            messages.clear();
        }
    }

    private void pauseBeforeSend() throws InterruptedException {
        if (!this.configuration.isHiveMetaStore()) {
            int waitMs = this.configuration.getPauseBeforeSendSec() * 1000;
            LOG.info("Waiting before dispatch: {}", (Object)waitMs);
            Thread.sleep(waitMs);
        }
    }
}

