package com.enioka.jqm.tools;

import com.enioka.jqm.jpamodel.JobInstance;
import com.enioka.jqm.jpamodel.Queue;
import com.enioka.jqm.jpamodel.State;
import java.lang.management.ManagementFactory;
import java.util.Calendar;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.persistence.EntityManager;
import javax.persistence.EntityNotFoundException;
import javax.persistence.LockModeType;
import javax.persistence.LockTimeoutException;
import org.apache.log4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/enioka/jqm/tools/QueuePoller.class */
public class QueuePoller implements Runnable, QueuePollerMBean {
    private static Logger jqmlogger = Logger.getLogger(QueuePoller.class);
    private Queue queue;
    private JqmEngine engine;
    private int maxNbThread;
    private int pollingInterval;
    private boolean run = true;
    private AtomicInteger actualNbThread = new AtomicInteger(0);
    private boolean hasStopped = true;
    private Calendar lastLoop = null;
    private ObjectName name = null;
    private Thread localThread = null;
    private Semaphore loop;

    @Override // com.enioka.jqm.tools.QueuePollerMBean
    public void stop() {
        jqmlogger.info("Poller has received a stop order");
        this.run = false;
        if (this.localThread != null) {
            this.localThread.interrupt();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reset() {
        if (!this.hasStopped) {
            throw new IllegalStateException("cannot reset a non stopped queue poller");
        }
        this.hasStopped = false;
        this.run = true;
        this.lastLoop = null;
        this.loop = new Semaphore(0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueuePoller(JqmEngine jqmEngine, Queue queue, int i, int i2) {
        this.queue = null;
        this.maxNbThread = 10;
        this.pollingInterval = 10000;
        jqmlogger.info("Engine " + jqmEngine.getNode().getName() + " will poll JobInstances on queue " + queue.getName() + " every " + (i2 / 1000) + "s with " + i + " threads for concurrent instances");
        EntityManager newEm = Helpers.getNewEm();
        this.engine = jqmEngine;
        this.queue = queue;
        this.pollingInterval = i2;
        this.maxNbThread = i;
        newEm.close();
        reset();
        registerMBean();
    }

    private void registerMBean() {
        try {
            if (this.engine.loadJmxBeans) {
                MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
                this.name = new ObjectName("com.enioka.jqm:type=Node.Queue,Node=" + this.engine.getNode().getName() + ",name=" + this.queue.getName());
                try {
                    platformMBeanServer.getMBeanInfo(this.name);
                    platformMBeanServer.unregisterMBean(this.name);
                } catch (InstanceNotFoundException e) {
                }
                platformMBeanServer.registerMBean(this, this.name);
            }
        } catch (Exception e2) {
            throw new JqmInitError("Could not create JMX beans", e2);
        }
    }

    protected JobInstance dequeue(EntityManager entityManager, int i) {
        if (this.actualNbThread.get() >= this.maxNbThread) {
            return null;
        }
        List<JobInstance> resultList = entityManager.createQuery("SELECT j FROM JobInstance j LEFT JOIN FETCH j.jd WHERE j.queue = :q AND j.state = :s ORDER BY j.internalPosition ASC", JobInstance.class).setParameter("q", this.queue).setParameter("s", State.SUBMITTED).setMaxResults(this.maxNbThread + i).getResultList();
        entityManager.getTransaction().begin();
        int i2 = 0;
        for (JobInstance jobInstance : resultList) {
            try {
                entityManager.refresh(jobInstance, LockModeType.PESSIMISTIC_WRITE);
                if (jobInstance.getState().equals(State.SUBMITTED)) {
                    if (!jobInstance.getJd().isHighlander() || highlanderPollingMode(jobInstance, entityManager)) {
                        entityManager.createQuery("UPDATE JobInstance j SET j.state = 'ATTRIBUTED', j.node = :n, j.attributionDate = current_timestamp() WHERE id=:i").setParameter("i", Integer.valueOf(jobInstance.getId())).setParameter("n", this.engine.getNode()).executeUpdate();
                        entityManager.getTransaction().commit();
                        return jobInstance;
                    }
                    i2++;
                }
            } catch (EntityNotFoundException e) {
                entityManager.getTransaction().rollback();
                return dequeue(entityManager, i2);
            } catch (LockTimeoutException e2) {
                entityManager.getTransaction().rollback();
                return null;
            }
        }
        entityManager.getTransaction().rollback();
        if (i2 > i) {
            return dequeue(entityManager, i2);
        }
        return null;
    }

    protected boolean highlanderPollingMode(JobInstance jobInstance, EntityManager entityManager) {
        return entityManager.createQuery("SELECT j FROM JobInstance j WHERE j IS NOT :refid AND j.jd = :jd AND (j.state = 'RUNNING' OR j.state = 'ATTRIBUTED')", JobInstance.class).setParameter("refid", jobInstance).setParameter("jd", jobInstance.getJd()).getResultList().isEmpty();
    }

    @Override // java.lang.Runnable
    public void run() {
        this.localThread = Thread.currentThread();
        this.localThread.setName("QUEUE_POLLER;polling;" + this.queue.getName());
        EntityManager entityManager = null;
        do {
            this.lastLoop = Calendar.getInstance();
            try {
                try {
                    entityManager = Helpers.getNewEm();
                    JobInstance dequeue = dequeue(entityManager, 0);
                    while (dequeue != null) {
                        jqmlogger.trace("JI number " + dequeue.getId() + " will be run by this poller this loop (already " + this.actualNbThread + "/" + this.maxNbThread + " on " + this.queue.getName() + ")");
                        this.actualNbThread.incrementAndGet();
                        if (dequeue.getJd().isExternal()) {
                            new Thread(new LoaderExternal(entityManager, dequeue, this)).start();
                        } else {
                            new Thread(new Loader(dequeue, this.engine.getCache(), this)).start();
                        }
                        dequeue = dequeue(entityManager, 0);
                    }
                    Helpers.closeQuietly(entityManager);
                    try {
                        this.loop.tryAcquire(this.pollingInterval, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        this.run = false;
                    }
                } catch (Throwable th) {
                    Helpers.closeQuietly(entityManager);
                    throw th;
                }
            } catch (RuntimeException e2) {
                if (!Helpers.testDbFailure(e2)) {
                    throw e2;
                }
                jqmlogger.error("connection to database lost - stopping poller");
                jqmlogger.trace("connection error was:", e2.getCause());
                this.engine.pollerRestartNeeded(this);
                Helpers.closeQuietly(entityManager);
            }
        } while (this.run);
        if (this.run) {
            this.run = false;
            this.hasStopped = true;
            return;
        }
        jqmlogger.info("Poller loop on queue " + this.queue.getName() + " is stopping [engine " + this.engine.getNode().getName() + "]");
        waitForAllThreads(60000L);
        jqmlogger.info("Poller on queue " + this.queue.getName() + " has ended normally");
        if (this.engine.loadJmxBeans) {
            try {
                ManagementFactory.getPlatformMBeanServer().unregisterMBean(this.name);
            } catch (Exception e3) {
                jqmlogger.error("Could not unregister JMX beans", e3);
            }
        }
        this.hasStopped = true;
        this.engine.checkEngineEnd();
    }

    @Override // com.enioka.jqm.tools.QueuePollerMBean
    public Integer getCurrentActiveThreadCount() {
        return Integer.valueOf(this.actualNbThread.get());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void decreaseNbThread() {
        this.actualNbThread.decrementAndGet();
        this.loop.release(1);
        this.engine.signalEndOfRun();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isRunning() {
        return !this.hasStopped;
    }

    private void waitForAllThreads(long j) {
        long j2 = 0;
        while (j2 <= j) {
            jqmlogger.trace("Waiting the end of " + this.actualNbThread + " job(s)");
            if (this.actualNbThread.get() == 0) {
                break;
            }
            if (j2 == 0) {
                jqmlogger.info("Waiting for the end of " + this.actualNbThread + " jobs on queue " + this.queue.getName() + " - timeout is " + j + "ms");
            }
            try {
                Thread.sleep(1000L);
                j2 += 1000;
            } catch (InterruptedException e) {
                jqmlogger.warn("Some job instances did not finish in time - wait was interrupted");
                return;
            }
        }
        if (j2 > j) {
            jqmlogger.warn("Some job instances did not finish in time - they will be killed for the poller to be able to stop");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Queue getQueue() {
        return this.queue;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JqmEngine getEngine() {
        return this.engine;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setMaxThreads(int i) {
        if (this.maxNbThread > 0 && i == 0) {
            jqmlogger.info("Poller is being paused - it won't fetch any new job instances until it is resumed.");
        } else if (this.maxNbThread == 0 && i > 0) {
            jqmlogger.info("Poller is being resumed");
        }
        this.maxNbThread = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setPollingInterval(int i) {
        this.pollingInterval = i;
    }

    @Override // com.enioka.jqm.tools.QueuePollerMBean
    public long getCumulativeJobInstancesCount() {
        EntityManager newEm = Helpers.getNewEm();
        Long l = (Long) newEm.createQuery("SELECT COUNT(i) From History i WHERE i.node = :n AND i.queue = :q", Long.class).setParameter("n", this.engine.getNode()).setParameter("q", this.queue).getSingleResult();
        newEm.close();
        return l.longValue();
    }

    @Override // com.enioka.jqm.tools.QueuePollerMBean
    public float getJobsFinishedPerSecondLastMinute() {
        EntityManager newEm = Helpers.getNewEm();
        Calendar calendar = Calendar.getInstance();
        calendar.add(12, -1);
        Float valueOf = Float.valueOf(((float) ((Long) newEm.createQuery("SELECT COUNT(i) From History i WHERE i.endDate >= :d and i.node = :n AND i.queue = :q", Long.class).setParameter("d", calendar).setParameter("n", this.engine.getNode()).setParameter("q", this.queue).getSingleResult()).longValue()) / 60.0f);
        newEm.close();
        return valueOf.floatValue();
    }

    @Override // com.enioka.jqm.tools.QueuePollerMBean
    public long getCurrentlyRunningJobCount() {
        return this.actualNbThread.get();
    }

    @Override // com.enioka.jqm.tools.QueuePollerMBean
    public Integer getPollingIntervalMilliseconds() {
        return Integer.valueOf(this.pollingInterval);
    }

    @Override // com.enioka.jqm.tools.QueuePollerMBean
    public Integer getMaxConcurrentJobInstanceCount() {
        return Integer.valueOf(this.maxNbThread);
    }

    @Override // com.enioka.jqm.tools.QueuePollerMBean
    public boolean isActuallyPolling() {
        return Calendar.getInstance().getTimeInMillis() - this.lastLoop.getTimeInMillis() <= ((long) (this.pollingInterval + 100));
    }

    @Override // com.enioka.jqm.tools.QueuePollerMBean
    public boolean isFull() {
        return this.actualNbThread.get() >= this.maxNbThread;
    }
}
