/*
 * Decompiled with CFR 0.152.
 */
package com.google.adk.web.websocket;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.adk.agents.LiveRequest;
import com.google.adk.agents.LiveRequestQueue;
import com.google.adk.agents.RunConfig;
import com.google.adk.runner.Runner;
import com.google.adk.sessions.BaseSessionService;
import com.google.adk.sessions.Session;
import com.google.adk.web.service.RunnerService;
import com.google.common.collect.ImmutableList;
import com.google.genai.types.Blob;
import com.google.genai.types.Content;
import com.google.genai.types.Modality;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.io.IOException;
import java.net.URI;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.springframework.web.server.ResponseStatusException;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import org.springframework.web.util.UriComponentsBuilder;

@Component
public class LiveWebSocketHandler
extends TextWebSocketHandler {
    private static final Logger log = LoggerFactory.getLogger(LiveWebSocketHandler.class);
    private static final String LIVE_REQUEST_QUEUE_ATTR = "liveRequestQueue";
    private static final String LIVE_SUBSCRIPTION_ATTR = "liveSubscription";
    private static final int WEBSOCKET_MAX_BYTES_FOR_REASON = 123;
    private static final int WEBSOCKET_PROTOCOL_ERROR = 1002;
    private static final int WEBSOCKET_INTERNAL_SERVER_ERROR = 1011;
    private final ObjectMapper objectMapper;
    private final BaseSessionService sessionService;
    private final RunnerService runnerService;

    @Autowired
    public LiveWebSocketHandler(ObjectMapper objectMapper, BaseSessionService sessionService, RunnerService runnerService) {
        this.objectMapper = objectMapper;
        this.sessionService = sessionService;
        this.runnerService = runnerService;
    }

    public void afterConnectionEstablished(WebSocketSession wsSession) throws Exception {
        Runner runner;
        Session session;
        URI uri = wsSession.getUri();
        if (uri == null) {
            log.warn("WebSocket session URI is null, cannot establish connection.");
            wsSession.close(CloseStatus.SERVER_ERROR.withReason("Invalid URI"));
            return;
        }
        String path = uri.getPath();
        log.info("WebSocket connection established: {} from {}", (Object)wsSession.getId(), (Object)uri);
        MultiValueMap queryParams = UriComponentsBuilder.fromUri((URI)uri).build().getQueryParams();
        String appName = (String)queryParams.getFirst((Object)"app_name");
        String userId = (String)queryParams.getFirst((Object)"user_id");
        String sessionId = (String)queryParams.getFirst((Object)"session_id");
        if (appName == null || appName.trim().isEmpty()) {
            log.warn("WebSocket connection for session {} rejected: app_name query parameter is required and cannot be empty. URI: {}", (Object)wsSession.getId(), (Object)uri);
            wsSession.close(CloseStatus.POLICY_VIOLATION.withReason("app_name query parameter is required and cannot be empty"));
            return;
        }
        if (sessionId == null || sessionId.trim().isEmpty()) {
            log.warn("WebSocket connection for session {} rejected: session_id query parameter is required and cannot be empty. URI: {}", (Object)wsSession.getId(), (Object)uri);
            wsSession.close(CloseStatus.POLICY_VIOLATION.withReason("session_id query parameter is required and cannot be empty"));
            return;
        }
        log.debug("Extracted params for WebSocket session {}: appName={}, userId={}, sessionId={},", new Object[]{wsSession.getId(), appName, userId, sessionId});
        RunConfig runConfig = RunConfig.builder().setResponseModalities((Iterable)ImmutableList.of((Object)new Modality(Modality.Known.AUDIO))).setStreamingMode(RunConfig.StreamingMode.BIDI).build();
        try {
            session = (Session)this.sessionService.getSession(appName, userId, sessionId, Optional.empty()).blockingGet();
            if (session == null) {
                log.warn("Session not found for WebSocket: app={}, user={}, id={}. Closing connection.", new Object[]{appName, userId, sessionId});
                wsSession.close(new CloseStatus(1002, "Session not found"));
                return;
            }
        }
        catch (Exception e) {
            log.error("Error retrieving session for WebSocket: app={}, user={}, id={}", new Object[]{appName, userId, sessionId, e});
            wsSession.close(CloseStatus.SERVER_ERROR.withReason("Failed to retrieve session"));
            return;
        }
        LiveRequestQueue liveRequestQueue = new LiveRequestQueue();
        wsSession.getAttributes().put(LIVE_REQUEST_QUEUE_ATTR, liveRequestQueue);
        try {
            runner = this.runnerService.getRunner(appName);
        }
        catch (ResponseStatusException e) {
            log.error("Failed to get runner for app {} during WebSocket connection: {}", (Object)appName, (Object)e.getMessage());
            wsSession.close(CloseStatus.SERVER_ERROR.withReason("Runner unavailable: " + e.getReason()));
            return;
        }
        Flowable eventStream = runner.runLive(session, liveRequestQueue, runConfig);
        Disposable disposable = eventStream.subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe(event -> {
            try {
                String jsonEvent = this.objectMapper.writeValueAsString(event);
                log.debug("Sending event via WebSocket session {}: {}", (Object)wsSession.getId(), (Object)jsonEvent);
                wsSession.sendMessage((WebSocketMessage)new TextMessage((CharSequence)jsonEvent));
            }
            catch (JsonProcessingException e) {
                log.error("Error serializing event to JSON for WebSocket session {}", (Object)wsSession.getId(), (Object)e);
            }
            catch (IOException e) {
                log.error("IOException sending message via WebSocket session {}", (Object)wsSession.getId(), (Object)e);
                try {
                    wsSession.close(CloseStatus.SERVER_ERROR.withReason("Error sending message"));
                }
                catch (IOException closeException) {
                    log.warn("Failed to close WebSocket connection after send error: {}", (Object)closeException.getMessage());
                }
            }
        }, error -> {
            log.error("Error in run_live stream for WebSocket session {}: {}", new Object[]{wsSession.getId(), error.getMessage(), error});
            String reason = error.getMessage() != null ? error.getMessage() : "Unknown error";
            try {
                wsSession.close(new CloseStatus(1011, reason.substring(0, Math.min(reason.length(), 123))));
            }
            catch (IOException closeException) {
                log.warn("Failed to close WebSocket connection after stream error: {}", (Object)closeException.getMessage());
            }
        }, () -> {
            log.debug("run_live stream completed for WebSocket session {}", (Object)wsSession.getId());
            try {
                wsSession.close(CloseStatus.NORMAL);
            }
            catch (IOException closeException) {
                log.warn("Failed to close WebSocket connection normally: {}", (Object)closeException.getMessage());
            }
        });
        wsSession.getAttributes().put(LIVE_SUBSCRIPTION_ATTR, disposable);
        log.debug("Live run started for WebSocket session {}", (Object)wsSession.getId());
    }

    protected void handleTextMessage(WebSocketSession wsSession, TextMessage message) throws Exception {
        LiveRequestQueue liveRequestQueue = (LiveRequestQueue)wsSession.getAttributes().get(LIVE_REQUEST_QUEUE_ATTR);
        if (liveRequestQueue == null) {
            log.warn("Received message on WebSocket session {} but LiveRequestQueue is not available (null). Message: {}", (Object)wsSession.getId(), message.getPayload());
            return;
        }
        try {
            String payload = (String)message.getPayload();
            log.debug("Received text message on WebSocket session {}: {}", (Object)wsSession.getId(), (Object)payload);
            JsonNode rootNode = this.objectMapper.readTree(payload);
            LiveRequest.Builder liveRequestBuilder = LiveRequest.builder();
            if (rootNode.has("content")) {
                Content content = (Content)this.objectMapper.treeToValue((TreeNode)rootNode.get("content"), Content.class);
                liveRequestBuilder.content(content);
            }
            if (rootNode.has("blob")) {
                String mimeType;
                JsonNode blobNode = rootNode.get("blob");
                Blob.Builder blobBuilder = Blob.builder();
                if (blobNode.has("displayName")) {
                    blobBuilder.displayName(blobNode.get("displayName").asText());
                }
                if (blobNode.has("data")) {
                    blobBuilder.data(blobNode.get("data").binaryValue());
                }
                String string = blobNode.has("mimeType") ? blobNode.get("mimeType").asText() : (mimeType = blobNode.has("mime_type") ? blobNode.get("mime_type").asText() : null);
                if (mimeType != null) {
                    blobBuilder.mimeType(mimeType);
                }
                liveRequestBuilder.blob(blobBuilder.build());
            }
            LiveRequest liveRequest = liveRequestBuilder.build();
            liveRequestQueue.send(liveRequest);
        }
        catch (JsonProcessingException e) {
            log.error("Error deserializing LiveRequest from WebSocket message for session {}: {}", new Object[]{wsSession.getId(), message.getPayload(), e});
            wsSession.sendMessage((WebSocketMessage)new TextMessage((CharSequence)("{\"error\":\"Invalid JSON format for LiveRequest\", \"details\":\"" + e.getMessage() + "\"}")));
        }
        catch (Exception e) {
            log.error("Unexpected error processing text message for WebSocket session {}: {}", new Object[]{wsSession.getId(), message.getPayload(), e});
            String reason = e.getMessage() != null ? e.getMessage() : "Error processing message";
            wsSession.close(new CloseStatus(1011, reason.substring(0, Math.min(reason.length(), 123))));
        }
    }

    public void handleTransportError(WebSocketSession wsSession, Throwable exception) throws Exception {
        log.error("WebSocket transport error for session {}: {}", new Object[]{wsSession.getId(), exception.getMessage(), exception});
        this.cleanupSession(wsSession);
        if (wsSession.isOpen()) {
            String reason = exception.getMessage() != null ? exception.getMessage() : "Transport error";
            wsSession.close(CloseStatus.PROTOCOL_ERROR.withReason(reason.substring(0, Math.min(reason.length(), 123))));
        }
    }

    public void afterConnectionClosed(WebSocketSession wsSession, CloseStatus status) throws Exception {
        log.info("WebSocket connection closed: {} with status {}", (Object)wsSession.getId(), (Object)status.toString());
        this.cleanupSession(wsSession);
    }

    private void cleanupSession(WebSocketSession wsSession) {
        Disposable disposable;
        LiveRequestQueue liveRequestQueue = (LiveRequestQueue)wsSession.getAttributes().remove(LIVE_REQUEST_QUEUE_ATTR);
        if (liveRequestQueue != null) {
            liveRequestQueue.close();
            log.debug("Called close() on LiveRequestQueue for session {}", (Object)wsSession.getId());
        }
        if ((disposable = (Disposable)wsSession.getAttributes().remove(LIVE_SUBSCRIPTION_ATTR)) != null && !disposable.isDisposed()) {
            disposable.dispose();
        }
        log.debug("Cleaned up resources for WebSocket session {}", (Object)wsSession.getId());
    }
}

