/*
 * Decompiled with CFR 0.152.
 */
package com.arcadedb.server.http.ws;

import com.arcadedb.GlobalConfiguration;
import com.arcadedb.database.Database;
import com.arcadedb.log.LogManager;
import com.arcadedb.server.ArcadeDBServer;
import com.arcadedb.server.http.ws.ChangeEvent;
import com.arcadedb.server.http.ws.DatabaseEventWatcherThread;
import com.arcadedb.server.http.ws.EventWatcherSubscription;
import io.undertow.websockets.core.WebSocketCallback;
import io.undertow.websockets.core.WebSocketChannel;
import io.undertow.websockets.core.WebSockets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;

public class WebSocketEventBus {
    private final ConcurrentHashMap<String, ConcurrentHashMap<UUID, EventWatcherSubscription>> subscribers = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, DatabaseEventWatcherThread> databaseWatchers = new ConcurrentHashMap();
    private final ArcadeDBServer arcadeServer;
    public static final String CHANNEL_ID = "ID";

    public WebSocketEventBus(ArcadeDBServer server) {
        this.arcadeServer = server;
    }

    public void stop() {
        this.subscribers.values().forEach(x -> x.values().forEach(y -> y.close()));
        this.subscribers.clear();
        this.databaseWatchers.values().forEach(x -> x.shutdown());
        this.databaseWatchers.clear();
    }

    public void subscribe(String databaseName, String type, Set<ChangeEvent.TYPE> changeTypes, WebSocketChannel channel) {
        UUID channelId = (UUID)channel.getAttribute(CHANNEL_ID);
        ConcurrentHashMap databaseSubscribers = this.subscribers.computeIfAbsent(databaseName, k -> new ConcurrentHashMap());
        databaseSubscribers.computeIfAbsent(channelId, k -> new EventWatcherSubscription(databaseName, channel)).add(type, changeTypes);
        if (!this.databaseWatchers.containsKey(databaseName)) {
            this.startDatabaseWatcher(databaseName);
        }
    }

    public void unsubscribe(String databaseName, UUID channelId) {
        ConcurrentHashMap<UUID, EventWatcherSubscription> databaseSubscribers = this.subscribers.get(databaseName);
        if (databaseSubscribers == null) {
            return;
        }
        databaseSubscribers.remove(channelId);
        if (databaseSubscribers.isEmpty()) {
            this.stopDatabaseWatcher(databaseName);
        }
    }

    public void publish(ChangeEvent event) {
        String databaseName = event.getRecord().getDatabase().getName();
        final ArrayList<UUID> zombieConnections = new ArrayList<UUID>();
        this.subscribers.get(databaseName).values().forEach(subscription -> {
            if (subscription.isMatch(event)) {
                WebSockets.sendText((String)event.toJSON(), (WebSocketChannel)subscription.getChannel(), (WebSocketCallback)new WebSocketCallback<Void>(this){

                    public void complete(WebSocketChannel webSocketChannel, Void unused) {
                        webSocketChannel.flush();
                    }

                    public void onError(WebSocketChannel webSocketChannel, Void unused, Throwable throwable) {
                        UUID channelId = (UUID)webSocketChannel.getAttribute(WebSocketEventBus.CHANNEL_ID);
                        if (throwable instanceof IOException) {
                            LogManager.instance().log((Object)this, Level.FINE, "Closing zombie connection: %s", null, (Object)channelId);
                            zombieConnections.add(channelId);
                        } else {
                            LogManager.instance().log((Object)this, Level.SEVERE, "Unexpected error while sending message.", throwable);
                        }
                    }
                });
            }
        });
        zombieConnections.forEach(this::unsubscribeAll);
    }

    public Collection<EventWatcherSubscription> getDatabaseSubscriptions(String database) {
        return this.subscribers.get(database).values();
    }

    public void unsubscribeAll(UUID channelId) {
        this.subscribers.forEach((databaseName, channels) -> {
            channels.remove(channelId);
            if (channels.isEmpty()) {
                this.stopDatabaseWatcher((String)databaseName);
            }
        });
    }

    private void startDatabaseWatcher(String database) {
        int queueSize = this.arcadeServer.getConfiguration().getValueAsInteger(GlobalConfiguration.SERVER_WS_EVENT_BUS_QUEUE_SIZE);
        DatabaseEventWatcherThread watcherThread = new DatabaseEventWatcherThread(this, (Database)this.arcadeServer.getDatabase(database), queueSize);
        watcherThread.start();
        this.databaseWatchers.put(database, watcherThread);
    }

    private void stopDatabaseWatcher(String database) {
        DatabaseEventWatcherThread watcher = this.databaseWatchers.remove(database);
        if (watcher != null) {
            watcher.shutdown();
        }
    }
}

