/*
 * Decompiled with CFR 0.152.
 */
package io.sealights.onpremise.agents.commons.queues;

import io.sealights.onpremise.agentevents.eventservice.proxy.api.types.AgentEventCode;
import io.sealights.onpremise.agents.commons.OutgoingMessageQueue;
import io.sealights.onpremise.agents.commons.configuration.ConfigurationListener;
import io.sealights.onpremise.agents.commons.configuration.ServiceProxyCfgListener;
import io.sealights.onpremise.agents.commons.functions.Procedure;
import io.sealights.onpremise.agents.commons.queues.QueueDataSendMonitor;
import io.sealights.onpremise.agents.commons.watchdog.Watchdog;
import io.sealights.onpremise.agents.infra.configuration.SLAgentConfiguration;
import io.sealights.onpremise.agents.infra.logging.LogFactory;
import io.sealights.onpremise.agents.infra.utils.threads.ThreadUtils;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.slf4j.Logger;

public abstract class QueueSender<T, Q extends OutgoingMessageQueue<T>, C extends SLAgentConfiguration, S extends ServiceProxyCfgListener<C>>
extends Procedure
implements ConfigurationListener<C> {
    private static Logger LOG = LogFactory.getLogger(QueueSender.class);
    protected final int RETRY_DELAY_IN_MILLISECONDS = 5000;
    protected final int REQUEUE_DELAY_IN_MILLISECONDS = 2000;
    private final int LOG_CTR = 10;
    private final int SLEEP_INTERVAL_100_MILLIS = 100;
    private Q queue;
    private S serviceProxy;
    private Watchdog watchdog;
    private AtomicBoolean isSending = new AtomicBoolean(false);
    private int waitForAllDataToBeSentLogsCounter = 0;
    private QueueDataSendMonitor dataSendMonitor;

    public QueueSender(Q queue, S serviceProxy, Watchdog watchDog, String queueItemName, AgentEventCode shutdownErrorCode) {
        this.queue = queue;
        this.serviceProxy = serviceProxy;
        this.watchdog = watchDog;
        this.dataSendMonitor = new QueueDataSendMonitor(queueItemName, shutdownErrorCode);
        this.watchdog.addOnDoWorkHandler(this);
    }

    public void start() {
        LOG.info("{} started.", (Object)this.getClass().getSimpleName());
        this.watchdog.start();
    }

    public void shutdown() {
        LOG.info("EventQueueSender.shutdown - Flushing events and sending all data. ");
        this.dataSendMonitor.setShuttingDown();
        this.watchdog.stop();
        this.execute();
        this.dataSendMonitor.notifyWorkSummary();
        this.waitForAllDataToBeSentBeforeShutdown();
    }

    @Override
    public void execute() {
        boolean isAlreadySending = this.getIsSending().getAndSet(true);
        if (isAlreadySending) {
            LOG.info("Already sending. No need to send another set of items.");
            return;
        }
        try {
            this.sendData();
        }
        finally {
            this.getIsSending().set(false);
        }
    }

    @Override
    public void onConfigurationChanged(C configuration) {
        ((ServiceProxyCfgListener)this.serviceProxy).onConfigurationChanged(configuration);
    }

    public void onExecutionEnded(String testStage) {
        this.dataSendMonitor.notifyErrorsOnEndExecution(testStage);
    }

    protected void handleSendFailure(List<T> data2) {
        if (((OutgoingMessageQueue)this.queue).isQueueFull()) {
            this.dataSendMonitor.notifyCurrentError(true);
            return;
        }
        this.dataSendMonitor.notifyCurrentError(false);
        if (!this.dataSendMonitor.isShuttingDown()) {
            LOG.error("Waiting {} milliseconds and then re-queuing data after failed submission", (Object)2000);
            ThreadUtils.sleepMillis(2000L);
            ((OutgoingMessageQueue)this.queue).requeueMessages(data2);
        } else {
            LOG.error("Not re-queuing data after failed submissions -- system is in the process of shutting down. Waiting {} seconds and then trying to send the same footprints one last time.", (Object)5000);
            ThreadUtils.sleepMillis(5000L);
            this.resendDataOnShutdown(data2);
        }
    }

    protected void waitForAllDataToBeSentBeforeShutdown() {
        while (true) {
            if (!this.isSending.get()) {
                LOG.info("waitForAllDataToBeSentBeforeShutdown - No need to wait since the sending thread is done.");
                return;
            }
            if (this.waitForAllDataToBeSentLogsCounter % 10 == 0) {
                LOG.info("waitForAllDataToBeSentBeforeShutdown - Sending previous data. Wait and loop.");
            }
            ++this.waitForAllDataToBeSentLogsCounter;
            ThreadUtils.sleepMillis(100L);
        }
    }

    protected abstract void sendData();

    protected abstract void resendDataOnShutdown(List<T> var1);

    @Generated
    public int getRETRY_DELAY_IN_MILLISECONDS() {
        return this.RETRY_DELAY_IN_MILLISECONDS;
    }

    @Generated
    public int getREQUEUE_DELAY_IN_MILLISECONDS() {
        return this.REQUEUE_DELAY_IN_MILLISECONDS;
    }

    @Generated
    public int getLOG_CTR() {
        return this.LOG_CTR;
    }

    @Generated
    public int getSLEEP_INTERVAL_100_MILLIS() {
        return this.SLEEP_INTERVAL_100_MILLIS;
    }

    @Generated
    public Q getQueue() {
        return this.queue;
    }

    @Generated
    public S getServiceProxy() {
        return this.serviceProxy;
    }

    @Generated
    public Watchdog getWatchdog() {
        return this.watchdog;
    }

    @Generated
    public AtomicBoolean getIsSending() {
        return this.isSending;
    }

    @Generated
    public int getWaitForAllDataToBeSentLogsCounter() {
        return this.waitForAllDataToBeSentLogsCounter;
    }

    @Generated
    public QueueDataSendMonitor getDataSendMonitor() {
        return this.dataSendMonitor;
    }

    @Generated
    public void setQueue(Q queue) {
        this.queue = queue;
    }

    @Generated
    public void setServiceProxy(S serviceProxy) {
        this.serviceProxy = serviceProxy;
    }

    @Generated
    public void setWatchdog(Watchdog watchdog) {
        this.watchdog = watchdog;
    }

    @Generated
    public void setIsSending(AtomicBoolean isSending) {
        this.isSending = isSending;
    }

    @Generated
    public void setWaitForAllDataToBeSentLogsCounter(int waitForAllDataToBeSentLogsCounter) {
        this.waitForAllDataToBeSentLogsCounter = waitForAllDataToBeSentLogsCounter;
    }

    @Generated
    public void setDataSendMonitor(QueueDataSendMonitor dataSendMonitor) {
        this.dataSendMonitor = dataSendMonitor;
    }

    @Generated
    public String toString() {
        return "QueueSender(RETRY_DELAY_IN_MILLISECONDS=" + this.getRETRY_DELAY_IN_MILLISECONDS() + ", REQUEUE_DELAY_IN_MILLISECONDS=" + this.getREQUEUE_DELAY_IN_MILLISECONDS() + ", LOG_CTR=" + this.getLOG_CTR() + ", SLEEP_INTERVAL_100_MILLIS=" + this.getSLEEP_INTERVAL_100_MILLIS() + ", queue=" + this.getQueue() + ", serviceProxy=" + this.getServiceProxy() + ", watchdog=" + this.getWatchdog() + ", isSending=" + this.getIsSending() + ", waitForAllDataToBeSentLogsCounter=" + this.getWaitForAllDataToBeSentLogsCounter() + ", dataSendMonitor=" + this.getDataSendMonitor() + ")";
    }

    @Generated
    public boolean equals(Object o) {
        if (o == this) {
            return true;
        }
        if (!(o instanceof QueueSender)) {
            return false;
        }
        QueueSender other = (QueueSender)o;
        if (!other.canEqual(this)) {
            return false;
        }
        if (!super.equals(o)) {
            return false;
        }
        if (this.getRETRY_DELAY_IN_MILLISECONDS() != other.getRETRY_DELAY_IN_MILLISECONDS()) {
            return false;
        }
        if (this.getREQUEUE_DELAY_IN_MILLISECONDS() != other.getREQUEUE_DELAY_IN_MILLISECONDS()) {
            return false;
        }
        if (this.getLOG_CTR() != other.getLOG_CTR()) {
            return false;
        }
        if (this.getSLEEP_INTERVAL_100_MILLIS() != other.getSLEEP_INTERVAL_100_MILLIS()) {
            return false;
        }
        Q this$queue = this.getQueue();
        Q other$queue = other.getQueue();
        if (this$queue == null ? other$queue != null : !this$queue.equals(other$queue)) {
            return false;
        }
        S this$serviceProxy = this.getServiceProxy();
        S other$serviceProxy = other.getServiceProxy();
        if (this$serviceProxy == null ? other$serviceProxy != null : !this$serviceProxy.equals(other$serviceProxy)) {
            return false;
        }
        Watchdog this$watchdog = this.getWatchdog();
        Watchdog other$watchdog = other.getWatchdog();
        if (this$watchdog == null ? other$watchdog != null : !this$watchdog.equals(other$watchdog)) {
            return false;
        }
        AtomicBoolean this$isSending = this.getIsSending();
        AtomicBoolean other$isSending = other.getIsSending();
        if (this$isSending == null ? other$isSending != null : !this$isSending.equals(other$isSending)) {
            return false;
        }
        if (this.getWaitForAllDataToBeSentLogsCounter() != other.getWaitForAllDataToBeSentLogsCounter()) {
            return false;
        }
        QueueDataSendMonitor this$dataSendMonitor = this.getDataSendMonitor();
        QueueDataSendMonitor other$dataSendMonitor = other.getDataSendMonitor();
        return !(this$dataSendMonitor == null ? other$dataSendMonitor != null : !this$dataSendMonitor.equals(other$dataSendMonitor));
    }

    @Generated
    protected boolean canEqual(Object other) {
        return other instanceof QueueSender;
    }

    @Generated
    public int hashCode() {
        int PRIME = 59;
        int result = 1;
        result = result * 59 + super.hashCode();
        result = result * 59 + this.getRETRY_DELAY_IN_MILLISECONDS();
        result = result * 59 + this.getREQUEUE_DELAY_IN_MILLISECONDS();
        result = result * 59 + this.getLOG_CTR();
        result = result * 59 + this.getSLEEP_INTERVAL_100_MILLIS();
        Q $queue = this.getQueue();
        result = result * 59 + ($queue == null ? 43 : $queue.hashCode());
        S $serviceProxy = this.getServiceProxy();
        result = result * 59 + ($serviceProxy == null ? 43 : $serviceProxy.hashCode());
        Watchdog $watchdog = this.getWatchdog();
        result = result * 59 + ($watchdog == null ? 43 : $watchdog.hashCode());
        AtomicBoolean $isSending = this.getIsSending();
        result = result * 59 + ($isSending == null ? 43 : $isSending.hashCode());
        result = result * 59 + this.getWaitForAllDataToBeSentLogsCounter();
        QueueDataSendMonitor $dataSendMonitor = this.getDataSendMonitor();
        result = result * 59 + ($dataSendMonitor == null ? 43 : $dataSendMonitor.hashCode());
        return result;
    }
}

