package org.jahia.ajax.gwt.commons.server;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.apache.jackrabbit.core.cluster.ClusterNode;
import org.apache.jackrabbit.core.journal.JournalException;
import org.apache.jackrabbit.core.journal.Record;
import org.apache.jackrabbit.core.journal.RecordConsumer;
import org.atmosphere.cpr.Broadcaster;
import org.jahia.bin.listeners.JahiaContextLoaderListener;
import org.jahia.services.content.impl.jackrabbit.SpringJackrabbitRepository;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/jahia/ajax/gwt/commons/server/JGroupsChannelImpl.class */
public class JGroupsChannelImpl extends ReceiverAdapter implements JGroupsChannel, RecordConsumer {
    final JChannel jChannel;
    private final String clusterName;
    private final Map<String, Broadcaster> broadcasters = new HashMap();
    private final ConcurrentLinkedQueue<Object> receivedMessages = new ConcurrentLinkedQueue<>();
    private final ClusterNode clusterNode;
    private long revision;
    private static final Logger logger = LoggerFactory.getLogger(JGroupsChannelImpl.class);
    private static final Queue<BroadcastMessage> bcMessages = new ConcurrentLinkedQueue();

    public JGroupsChannelImpl(JChannel jChannel, String str, SpringJackrabbitRepository springJackrabbitRepository) {
        if (jChannel.isConnected()) {
            throw new IllegalArgumentException("JChannel already connected");
        }
        logger.info("Starting JGroupsChannelImpl");
        this.jChannel = jChannel;
        this.clusterName = str;
        this.clusterNode = springJackrabbitRepository.getClusterNode();
        try {
            this.clusterNode.getJournal().register(this);
            setRevision(this.clusterNode.getRevision());
        } catch (JournalException e) {
            logger.debug(e.getMessage(), e);
        }
    }

    public void init() throws Exception {
        logger.info("Starting Atmosphere JGroups Clustering support with group name {}", this.clusterName);
        try {
            this.jChannel.setReceiver(this);
            this.jChannel.connect(this.clusterName);
            this.jChannel.setDiscardOwnMessages(true);
        } catch (Exception e) {
            logger.warn("Failed to connect to cluster: {}", this.clusterName);
            throw e;
        }
    }

    public void destroy() {
        try {
            Util.shutdown(this.jChannel);
        } catch (Exception e) {
            Util.close(this.jChannel);
            logger.warn("failed to properly shutdown jgroups channel, closing abnormally", e);
        }
        this.receivedMessages.clear();
        this.broadcasters.clear();
    }

    public void receive(Message message) {
        Object object = message.getObject();
        if (object != null && JahiaContextLoaderListener.isContextInitialized() && BroadcastMessage.class.isAssignableFrom(object.getClass())) {
            BroadcastMessage broadcastMessage = (BroadcastMessage) object;
            if (this.clusterNode.getRevision() >= broadcastMessage.getRevision()) {
                broadcastMessage(broadcastMessage);
            } else {
                bcMessages.add(broadcastMessage);
            }
        }
    }

    private void broadcastMessage(BroadcastMessage broadcastMessage) {
        Object message = broadcastMessage.getMessage();
        this.receivedMessages.offer(message);
        String topic = broadcastMessage.getTopic();
        if (this.broadcasters.containsKey(topic)) {
            try {
                this.broadcasters.get(topic).broadcast(message).get(30L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                logger.warn("Failed to broadcast message received over the JGroups cluster {}", this.clusterName, e);
                Thread.currentThread().interrupt();
            } catch (Exception e2) {
                logger.error("Failed to broadcast message received over the JGroups cluster {}", this.clusterName, e2);
            }
        }
    }

    @Override // org.jahia.ajax.gwt.commons.server.JGroupsChannel
    public void send(String str, Object obj) {
        if (!this.jChannel.isConnected() || this.receivedMessages.remove(obj)) {
            return;
        }
        try {
            this.jChannel.send(new Message((Address) null, (Address) null, new BroadcastMessage(str, obj, this.clusterNode.getRevision())));
        } catch (Exception e) {
            logger.warn("Failed to send message {}", obj, e);
        }
    }

    public void addBroadcaster(Broadcaster broadcaster) {
        this.broadcasters.put(broadcaster.getID(), broadcaster);
    }

    public void removeBroadcaster(Broadcaster broadcaster) {
        this.broadcasters.remove(broadcaster.getID());
    }

    public String getId() {
        return "ATMOSPHERE_BROADCAST";
    }

    public long getRevision() {
        return this.revision;
    }

    public void consume(Record record) {
        logger.error("This consumer can not handle records.");
    }

    public void setRevision(long j) {
        logger.debug("Broadcasting messages previous to revision: {}", Long.valueOf(j));
        BroadcastMessage peek = bcMessages.peek();
        while (true) {
            BroadcastMessage broadcastMessage = peek;
            if (broadcastMessage == null || broadcastMessage.getRevision() > j) {
                break;
            }
            bcMessages.remove();
            CompletableFuture.runAsync(() -> {
                broadcastMessage(broadcastMessage);
            });
            peek = bcMessages.peek();
        }
        if (logger.isDebugEnabled()) {
            if (bcMessages.isEmpty()) {
                logger.debug("No more message to broadcast");
            } else {
                logger.debug("Still some messages to broadcast. Next revision to broadcast is {}", Long.valueOf(bcMessages.peek().getRevision()));
            }
        }
        this.revision = j;
    }
}
