/*
 * Decompiled with CFR 0.152.
 */
package org.jahia.ajax.gwt.commons.server;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.apache.jackrabbit.core.cluster.ClusterNode;
import org.apache.jackrabbit.core.journal.JournalException;
import org.apache.jackrabbit.core.journal.Record;
import org.apache.jackrabbit.core.journal.RecordConsumer;
import org.atmosphere.cpr.Broadcaster;
import org.jahia.ajax.gwt.commons.server.BroadcastMessage;
import org.jahia.ajax.gwt.commons.server.JGroupsChannel;
import org.jahia.bin.listeners.JahiaContextLoaderListener;
import org.jahia.services.content.impl.jackrabbit.SpringJackrabbitRepository;
import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.ReceiverAdapter;
import org.jgroups.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JGroupsChannelImpl
extends ReceiverAdapter
implements JGroupsChannel,
RecordConsumer {
    private static final Logger logger = LoggerFactory.getLogger(JGroupsChannelImpl.class);
    final JChannel jChannel;
    private final String clusterName;
    private final Map<String, Broadcaster> broadcasters = new HashMap<String, Broadcaster>();
    private final ConcurrentLinkedQueue<Object> receivedMessages = new ConcurrentLinkedQueue();
    private final ClusterNode clusterNode;
    private long revision;
    private static final Queue<BroadcastMessage> bcMessages = new ConcurrentLinkedQueue<BroadcastMessage>();

    public JGroupsChannelImpl(JChannel jChannel, String clusterName, SpringJackrabbitRepository jackrabbitRepository) {
        if (jChannel.isConnected()) {
            throw new IllegalArgumentException("JChannel already connected");
        }
        logger.info("Starting JGroupsChannelImpl");
        this.jChannel = jChannel;
        this.clusterName = clusterName;
        this.clusterNode = jackrabbitRepository.getClusterNode();
        try {
            this.clusterNode.getJournal().register((RecordConsumer)this);
            this.setRevision(this.clusterNode.getRevision());
        }
        catch (JournalException e) {
            logger.debug(e.getMessage(), (Throwable)e);
        }
    }

    public void init() throws Exception {
        logger.info("Starting Atmosphere JGroups Clustering support with group name {}", (Object)this.clusterName);
        try {
            this.jChannel.setReceiver((Receiver)this);
            this.jChannel.connect(this.clusterName);
            this.jChannel.setDiscardOwnMessages(true);
        }
        catch (Exception e) {
            logger.warn("Failed to connect to cluster: {}", (Object)this.clusterName);
            throw e;
        }
    }

    public void destroy() {
        try {
            Util.shutdown((Channel)this.jChannel);
        }
        catch (Exception t) {
            Util.close((Channel)this.jChannel);
            logger.warn("failed to properly shutdown jgroups channel, closing abnormally", (Throwable)t);
        }
        this.receivedMessages.clear();
        this.broadcasters.clear();
    }

    public void receive(Message jgroupMessage) {
        Object payload = jgroupMessage.getObject();
        if (payload == null) {
            return;
        }
        if (JahiaContextLoaderListener.isContextInitialized() && BroadcastMessage.class.isAssignableFrom(payload.getClass())) {
            BroadcastMessage broadcastMsg = (BroadcastMessage)payload;
            if (this.clusterNode.getRevision() >= broadcastMsg.getRevision()) {
                this.broadcastMessage(broadcastMsg);
            } else {
                bcMessages.add(broadcastMsg);
            }
        }
    }

    private void broadcastMessage(BroadcastMessage broadcastMsg) {
        Object origMessage = broadcastMsg.getMessage();
        this.receivedMessages.offer(origMessage);
        String topicId = broadcastMsg.getTopic();
        if (this.broadcasters.containsKey(topicId)) {
            Broadcaster bc = this.broadcasters.get(topicId);
            try {
                bc.broadcast(origMessage).get(30L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                logger.warn("Failed to broadcast message received over the JGroups cluster {}", (Object)this.clusterName, (Object)e);
                Thread.currentThread().interrupt();
            }
            catch (Exception ex) {
                logger.error("Failed to broadcast message received over the JGroups cluster {}", (Object)this.clusterName, (Object)ex);
            }
        }
    }

    @Override
    public void send(String topic, Object message) {
        if (this.jChannel.isConnected() && !this.receivedMessages.remove(message)) {
            try {
                BroadcastMessage broadcastMsg = new BroadcastMessage(topic, message, this.clusterNode.getRevision());
                Message jgroupMsg = new Message(null, null, (Object)broadcastMsg);
                this.jChannel.send(jgroupMsg);
            }
            catch (Exception e) {
                logger.warn("Failed to send message {}", message, (Object)e);
            }
        }
    }

    public void addBroadcaster(Broadcaster broadcaster) {
        this.broadcasters.put(broadcaster.getID(), broadcaster);
    }

    public void removeBroadcaster(Broadcaster broadcaster) {
        this.broadcasters.remove(broadcaster.getID());
    }

    public String getId() {
        return "ATMOSPHERE_BROADCAST";
    }

    public long getRevision() {
        return this.revision;
    }

    public void consume(Record notUsedRecord) {
        logger.error("This consumer can not handle records.");
    }

    public void setRevision(long revision) {
        logger.debug("Broadcasting messages previous to revision: {}", (Object)revision);
        BroadcastMessage peek = bcMessages.peek();
        while (peek != null && peek.getRevision() <= revision) {
            bcMessages.remove();
            BroadcastMessage finalPeek = peek;
            CompletableFuture.runAsync(() -> this.broadcastMessage(finalPeek));
            peek = bcMessages.peek();
        }
        if (logger.isDebugEnabled()) {
            if (bcMessages.isEmpty()) {
                logger.debug("No more message to broadcast");
            } else {
                logger.debug("Still some messages to broadcast. Next revision to broadcast is {}", (Object)bcMessages.peek().getRevision());
            }
        }
        this.revision = revision;
    }
}

