/*
 * Decompiled with CFR 0.152.
 */
package com.google.adk.models;

import com.google.adk.models.BaseLlmConnection;
import com.google.adk.models.LlmResponse;
import com.google.common.collect.ImmutableList;
import com.google.genai.AsyncSession;
import com.google.genai.Client;
import com.google.genai.types.Blob;
import com.google.genai.types.Content;
import com.google.genai.types.FinishReason;
import com.google.genai.types.FunctionCall;
import com.google.genai.types.FunctionResponse;
import com.google.genai.types.LiveConnectConfig;
import com.google.genai.types.LiveSendClientContentParameters;
import com.google.genai.types.LiveSendRealtimeInputParameters;
import com.google.genai.types.LiveSendToolResponseParameters;
import com.google.genai.types.LiveServerContent;
import com.google.genai.types.LiveServerMessage;
import com.google.genai.types.LiveServerToolCall;
import com.google.genai.types.Part;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.processors.PublishProcessor;
import java.net.SocketException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class GeminiLlmConnection
implements BaseLlmConnection {
    private static final Logger logger = LoggerFactory.getLogger(GeminiLlmConnection.class);
    private final Client apiClient;
    private final String modelName;
    private final LiveConnectConfig connectConfig;
    private final CompletableFuture<AsyncSession> sessionFuture;
    private final PublishProcessor<LlmResponse> responseProcessor = PublishProcessor.create();
    private final Flowable<LlmResponse> responseFlowable = this.responseProcessor.serialize();
    private final AtomicBoolean closed = new AtomicBoolean(false);

    GeminiLlmConnection(Client apiClient, String modelName, LiveConnectConfig connectConfig) {
        this.apiClient = Objects.requireNonNull(apiClient);
        this.modelName = Objects.requireNonNull(modelName);
        this.connectConfig = Objects.requireNonNull(connectConfig);
        this.sessionFuture = this.apiClient.async.live.connect(this.modelName, this.connectConfig).whenCompleteAsync((session, throwable) -> {
            if (throwable != null) {
                this.handleConnectionError((Throwable)throwable);
            } else if (session != null) {
                this.setupReceiver((AsyncSession)session);
            } else if (!this.closed.get()) {
                this.handleConnectionError(new SocketException("WebSocket connection failed without explicit error."));
            }
        });
    }

    private void setupReceiver(AsyncSession session) {
        if (this.closed.get()) {
            this.closeSessionIgnoringErrors(session);
            return;
        }
        session.receive(this::handleServerMessage).exceptionally(error -> {
            this.handleReceiveError((Throwable)error);
            return null;
        });
    }

    private void handleServerMessage(LiveServerMessage message) {
        if (this.closed.get()) {
            return;
        }
        logger.debug("Received server message: {}", (Object)message.toJson());
        Optional<LlmResponse> llmResponse = this.convertToServerResponse(message);
        llmResponse.ifPresent(arg_0 -> this.responseProcessor.onNext(arg_0));
    }

    private Optional<LlmResponse> convertToServerResponse(LiveServerMessage message) {
        LlmResponse.Builder builder = LlmResponse.builder();
        if (message.serverContent().isPresent()) {
            LiveServerContent serverContent = (LiveServerContent)message.serverContent().get();
            serverContent.modelTurn().ifPresent(builder::content);
            builder.partial(serverContent.turnComplete().map(completed -> completed == false).orElse(false)).turnComplete(serverContent.turnComplete().orElse(false));
        } else if (message.toolCall().isPresent()) {
            LiveServerToolCall toolCall = (LiveServerToolCall)message.toolCall().get();
            toolCall.functionCalls().ifPresent(calls -> {
                for (FunctionCall call : calls) {
                    builder.content(Content.builder().parts((List)ImmutableList.of((Object)Part.builder().functionCall(call).build())).build());
                }
            });
            builder.partial(false).turnComplete(false);
        } else {
            if (message.usageMetadata().isPresent()) {
                logger.debug("Received usage metadata: {}", message.usageMetadata().get());
                return Optional.empty();
            }
            if (message.toolCallCancellation().isPresent()) {
                logger.debug("Received tool call cancellation: {}", message.toolCallCancellation().get());
                return Optional.empty();
            }
            if (message.setupComplete().isPresent()) {
                logger.debug("Received setup complete.");
                return Optional.empty();
            }
            logger.warn("Received unknown or empty server message: {}", (Object)message.toJson());
            builder.errorCode(new FinishReason("Unknown server message.")).errorMessage("Received unknown server message.");
        }
        return Optional.of(builder.build());
    }

    private void handleConnectionError(Throwable throwable) {
        if (this.closed.compareAndSet(false, true)) {
            logger.error("WebSocket connection failed", throwable);
            Throwable cause = throwable instanceof CompletionException ? throwable.getCause() : throwable;
            this.responseProcessor.onError(cause);
        }
    }

    private void handleReceiveError(Throwable throwable) {
        if (this.closed.compareAndSet(false, true)) {
            logger.error("Error during WebSocket receive operation", throwable);
            this.responseProcessor.onError(throwable);
            ((CompletableFuture)this.sessionFuture.thenAccept(this::closeSessionIgnoringErrors)).exceptionally(err -> null);
        }
    }

    @Override
    public Completable sendHistory(List<Content> history) {
        return this.sendClientContentInternal(LiveSendClientContentParameters.builder().turns(history).build());
    }

    @Override
    public Completable sendContent(Content content) {
        Objects.requireNonNull(content, "content cannot be null");
        Optional<List<FunctionResponse>> functionResponses = this.extractFunctionResponses(content);
        if (functionResponses.isPresent()) {
            return this.sendToolResponseInternal(LiveSendToolResponseParameters.builder().functionResponses(functionResponses.get()).build());
        }
        return this.sendClientContentInternal(LiveSendClientContentParameters.builder().turns((List)ImmutableList.of((Object)content)).turnComplete(true).build());
    }

    private Optional<List<FunctionResponse>> extractFunctionResponses(Content content) {
        if (content.parts().isEmpty() || ((List)content.parts().get()).isEmpty()) {
            return Optional.empty();
        }
        ImmutableList responses = (ImmutableList)((List)content.parts().get()).stream().map(Part::functionResponse).flatMap(Optional::stream).collect(ImmutableList.toImmutableList());
        if (responses.size() == ((List)content.parts().get()).size()) {
            return Optional.of(responses);
        }
        return Optional.empty();
    }

    @Override
    public Completable sendRealtime(Blob blob) {
        return Completable.fromFuture((Future)((Object)this.sessionFuture.thenCompose(session -> session.sendRealtimeInput(LiveSendRealtimeInputParameters.builder().media(blob).build()))));
    }

    private Completable sendClientContentInternal(LiveSendClientContentParameters parameters) {
        return Completable.fromFuture((Future)((Object)this.sessionFuture.thenCompose(session -> session.sendClientContent(parameters))));
    }

    private Completable sendToolResponseInternal(LiveSendToolResponseParameters parameters) {
        return Completable.fromFuture((Future)((Object)this.sessionFuture.thenCompose(session -> session.sendToolResponse(parameters))));
    }

    @Override
    public Flowable<LlmResponse> receive() {
        return this.responseFlowable;
    }

    @Override
    public void close() {
        this.closeInternal(null);
    }

    @Override
    public void close(Throwable throwable) {
        Objects.requireNonNull(throwable, "throwable cannot be null for close");
        this.closeInternal(throwable);
    }

    private void closeInternal(Throwable throwable) {
        if (this.closed.compareAndSet(false, true)) {
            logger.debug("Closing GeminiConnection.", throwable);
            if (throwable == null) {
                this.responseProcessor.onComplete();
            } else {
                this.responseProcessor.onError(throwable);
            }
            if (this.sessionFuture.isDone()) {
                ((CompletableFuture)this.sessionFuture.thenAccept(this::closeSessionIgnoringErrors)).exceptionally(err -> null);
            } else {
                this.sessionFuture.cancel(false);
            }
        }
    }

    private void closeSessionIgnoringErrors(AsyncSession session) {
        if (session != null) {
            session.close().exceptionally(closeError -> {
                logger.warn("Error occurred while closing AsyncSession", closeError);
                return null;
            });
        }
    }
}

