package com.arcadedb.server.http.ws;

import com.arcadedb.GlobalConfiguration;
import com.arcadedb.log.LogManager;
import com.arcadedb.server.ArcadeDBServer;
import com.arcadedb.server.http.ws.ChangeEvent;
import io.undertow.websockets.core.WebSocketCallback;
import io.undertow.websockets.core.WebSocketChannel;
import io.undertow.websockets.core.WebSockets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;

/* loaded from: input_file:com/arcadedb/server/http/ws/WebSocketEventBus.class */
public class WebSocketEventBus {
    private final ConcurrentHashMap<String, ConcurrentHashMap<UUID, EventWatcherSubscription>> subscribers = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, DatabaseEventWatcherThread> databaseWatchers = new ConcurrentHashMap<>();
    private final ArcadeDBServer arcadeServer;
    public static final String CHANNEL_ID = "ID";

    public WebSocketEventBus(ArcadeDBServer arcadeDBServer) {
        this.arcadeServer = arcadeDBServer;
    }

    public void stop() {
        this.subscribers.values().forEach(concurrentHashMap -> {
            concurrentHashMap.values().forEach(eventWatcherSubscription -> {
                eventWatcherSubscription.close();
            });
        });
        this.subscribers.clear();
        this.databaseWatchers.values().forEach(databaseEventWatcherThread -> {
            databaseEventWatcherThread.shutdown();
        });
        this.databaseWatchers.clear();
    }

    public void subscribe(String str, String str2, Set<ChangeEvent.TYPE> set, WebSocketChannel webSocketChannel) {
        this.subscribers.computeIfAbsent(str, str3 -> {
            return new ConcurrentHashMap();
        }).computeIfAbsent((UUID) webSocketChannel.getAttribute(CHANNEL_ID), uuid -> {
            return new EventWatcherSubscription(str, webSocketChannel);
        }).add(str2, set);
        if (this.databaseWatchers.containsKey(str)) {
            return;
        }
        startDatabaseWatcher(str);
    }

    public void unsubscribe(String str, UUID uuid) {
        ConcurrentHashMap<UUID, EventWatcherSubscription> concurrentHashMap = this.subscribers.get(str);
        if (concurrentHashMap == null) {
            return;
        }
        concurrentHashMap.remove(uuid);
        if (concurrentHashMap.isEmpty()) {
            stopDatabaseWatcher(str);
        }
    }

    public void publish(ChangeEvent changeEvent) {
        String name = changeEvent.getRecord().getDatabase().getName();
        ArrayList arrayList = new ArrayList();
        this.subscribers.get(name).values().forEach(eventWatcherSubscription -> {
            if (eventWatcherSubscription.isMatch(changeEvent)) {
                WebSockets.sendText(changeEvent.toJSON(), eventWatcherSubscription.getChannel(), new WebSocketCallback<Void>() { // from class: com.arcadedb.server.http.ws.WebSocketEventBus.1
                    public void complete(WebSocketChannel webSocketChannel, Void r4) {
                        webSocketChannel.flush();
                    }

                    public void onError(WebSocketChannel webSocketChannel, Void r9, Throwable th) {
                        UUID uuid = (UUID) webSocketChannel.getAttribute(WebSocketEventBus.CHANNEL_ID);
                        if (!(th instanceof IOException)) {
                            LogManager.instance().log(this, Level.SEVERE, "Unexpected error while sending message.", th);
                        } else {
                            LogManager.instance().log(this, Level.FINE, "Closing zombie connection: %s", (Throwable) null, uuid);
                            arrayList.add(uuid);
                        }
                    }
                });
            }
        });
        arrayList.forEach(this::unsubscribeAll);
    }

    public Collection<EventWatcherSubscription> getDatabaseSubscriptions(String str) {
        return this.subscribers.get(str).values();
    }

    public void unsubscribeAll(UUID uuid) {
        this.subscribers.forEach((str, concurrentHashMap) -> {
            concurrentHashMap.remove(uuid);
            if (concurrentHashMap.isEmpty()) {
                stopDatabaseWatcher(str);
            }
        });
    }

    private void startDatabaseWatcher(String str) {
        DatabaseEventWatcherThread databaseEventWatcherThread = new DatabaseEventWatcherThread(this, this.arcadeServer.getDatabase(str), this.arcadeServer.getConfiguration().getValueAsInteger(GlobalConfiguration.SERVER_WS_EVENT_BUS_QUEUE_SIZE));
        databaseEventWatcherThread.start();
        this.databaseWatchers.put(str, databaseEventWatcherThread);
    }

    private void stopDatabaseWatcher(String str) {
        DatabaseEventWatcherThread remove = this.databaseWatchers.remove(str);
        if (remove != null) {
            remove.shutdown();
        }
    }
}
