package io.quarkus.reactivemessaging.http.runtime;

import io.quarkus.reactivemessaging.http.runtime.ReactiveHandlerBeanBase;
import io.quarkus.reactivemessaging.http.runtime.config.ReactiveHttpConfig;
import io.quarkus.reactivemessaging.http.runtime.config.WebSocketStreamConfig;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.ext.web.RoutingContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.Collection;
import org.jboss.logging.Logger;

@Singleton
/* loaded from: input_file:io/quarkus/reactivemessaging/http/runtime/ReactiveWebSocketHandlerBean.class */
public class ReactiveWebSocketHandlerBean extends ReactiveHandlerBeanBase<WebSocketStreamConfig, WebSocketMessage<?>> {
    private static final Logger log = Logger.getLogger(ReactiveWebSocketHandlerBean.class);

    @Inject
    ReactiveHttpConfig config;

    @Override // io.quarkus.reactivemessaging.http.runtime.ReactiveHandlerBeanBase
    protected void handleRequest(RoutingContext routingContext, MultiEmitter<? super WebSocketMessage<?>> multiEmitter, StrictQueueSizeGuard strictQueueSizeGuard, String str) {
        routingContext.request().toWebSocket(asyncResult -> {
            if (asyncResult.failed()) {
                log.error("failed to connect web socket", asyncResult.cause());
            } else {
                ServerWebSocket serverWebSocket = (ServerWebSocket) asyncResult.result();
                serverWebSocket.handler(buffer -> {
                    if (multiEmitter == null) {
                        onUnexpectedError(serverWebSocket, null, "No consumer subscribed for messages sent to Reactive Messaging WebSocket endpoint on path: " + str);
                        return;
                    }
                    if (!strictQueueSizeGuard.prepareToEmit()) {
                        serverWebSocket.write(Buffer.buffer("BUFFER_OVERFLOW"));
                        return;
                    }
                    try {
                        multiEmitter.emit(new WebSocketMessage(buffer, new RequestMetadata(routingContext), () -> {
                            serverWebSocket.write(Buffer.buffer("ACK"));
                        }, th -> {
                            onUnexpectedError(serverWebSocket, th, "Failed to process incoming web socket message.");
                        }));
                    } catch (Exception e) {
                        strictQueueSizeGuard.dequeue();
                        onUnexpectedError(serverWebSocket, e, "Emitting message failed");
                    }
                });
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.quarkus.reactivemessaging.http.runtime.ReactiveHandlerBeanBase
    public String description(WebSocketStreamConfig webSocketStreamConfig) {
        return String.format("path %s", webSocketStreamConfig.path);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.quarkus.reactivemessaging.http.runtime.ReactiveHandlerBeanBase
    public String key(WebSocketStreamConfig webSocketStreamConfig) {
        return webSocketStreamConfig.path;
    }

    @Override // io.quarkus.reactivemessaging.http.runtime.ReactiveHandlerBeanBase
    protected String key(RoutingContext routingContext) {
        return routingContext.currentRoute().getPath();
    }

    @Override // io.quarkus.reactivemessaging.http.runtime.ReactiveHandlerBeanBase
    protected Collection<WebSocketStreamConfig> configs() {
        return this.config.getWebSocketConfigs();
    }

    private void onUnexpectedError(ServerWebSocket serverWebSocket, Throwable th, String str) {
        log.error(str, th);
        serverWebSocket.close((short) 3500, "Unexpected error while processing the message");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Multi<WebSocketMessage<?>> getProcessor(String str) {
        ReactiveHandlerBeanBase.Bundle bundle = (ReactiveHandlerBeanBase.Bundle) this.processors.get(str);
        if (bundle == null) {
            throw new IllegalStateException("No incoming stream defined for path " + str);
        }
        return bundle.getProcessor();
    }
}
