package io.axoniq.axonserver.connector.query.impl;

import io.axoniq.axonserver.connector.AxonServerException;
import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.connector.impl.AbstractBufferedStream;
import io.axoniq.axonserver.connector.impl.FlowControlledStream;
import io.axoniq.axonserver.connector.impl.SynchronizedRequestStream;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import io.axoniq.axonserver.grpc.query.QueryUpdate;
import io.axoniq.axonserver.grpc.query.SubscriptionQuery;
import io.axoniq.axonserver.grpc.query.SubscriptionQueryRequest;
import io.axoniq.axonserver.grpc.query.SubscriptionQueryResponse;
import io.grpc.stub.ClientCallStreamObserver;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/axonserver/connector/query/impl/SubscriptionQueryStream.class */
public class SubscriptionQueryStream extends FlowControlledStream<SubscriptionQueryResponse, SubscriptionQueryRequest> {
    private static final Logger logger = LoggerFactory.getLogger(SubscriptionQueryStream.class);
    private final String subscriptionQueryId;
    private final CompletableFuture<QueryResponse> initialResultFuture;
    private final AbstractBufferedStream<QueryUpdate, SubscriptionQueryRequest> updateBuffer;

    public SubscriptionQueryStream(String str, CompletableFuture<QueryResponse> completableFuture, String str2, int i, int i2) {
        super(str2, i, i2);
        this.subscriptionQueryId = str;
        this.initialResultFuture = completableFuture;
        this.updateBuffer = new SubscriptionQueryUpdateBuffer(str2, str, i, i2);
    }

    public ResultStream<QueryUpdate> buffer() {
        return this.updateBuffer;
    }

    public void onNext(SubscriptionQueryResponse subscriptionQueryResponse) {
        switch (subscriptionQueryResponse.getResponseCase()) {
            case UPDATE:
                this.updateBuffer.onNext(subscriptionQueryResponse.getUpdate());
                return;
            case COMPLETE:
                this.updateBuffer.onCompleted();
                return;
            case COMPLETE_EXCEPTIONALLY:
                AxonServerException axonServerException = new AxonServerException(ErrorCategory.getFromCode(subscriptionQueryResponse.getCompleteExceptionally().getErrorCode()), subscriptionQueryResponse.getCompleteExceptionally().getErrorMessage().getMessage(), subscriptionQueryResponse.getCompleteExceptionally().getClientId());
                this.updateBuffer.onError(axonServerException);
                if (this.initialResultFuture.isDone()) {
                    return;
                }
                this.initialResultFuture.completeExceptionally(axonServerException);
                return;
            case INITIAL_RESULT:
                this.initialResultFuture.complete(subscriptionQueryResponse.getInitialResult());
                return;
            default:
                logger.info("Received unsupported message from SubscriptionQuery. It doesn't declare one of the expected types");
                return;
        }
    }

    public void onError(Throwable th) {
        this.initialResultFuture.completeExceptionally(th);
        this.updateBuffer.onError(th);
        try {
            outboundStream().onNext(SubscriptionQueryRequest.newBuilder().setUnsubscribe(SubscriptionQuery.newBuilder().setSubscriptionIdentifier(this.subscriptionQueryId).m2651build()).m2699build());
            outboundStream().onCompleted();
        } catch (Exception e) {
            logger.debug("Cannot complete stream. Already completed.", e);
        }
    }

    public void onCompleted() {
        this.updateBuffer.onCompleted();
    }

    @Override // io.axoniq.axonserver.connector.impl.FlowControlledStream
    public void beforeStart(ClientCallStreamObserver<SubscriptionQueryRequest> clientCallStreamObserver) {
        SynchronizedRequestStream synchronizedRequestStream = new SynchronizedRequestStream(clientCallStreamObserver);
        super.beforeStart(synchronizedRequestStream);
        this.updateBuffer.beforeStart(synchronizedRequestStream);
    }

    @Override // io.axoniq.axonserver.connector.impl.FlowControlledStream
    public void enableFlowControl() {
        this.updateBuffer.enableFlowControl();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.axoniq.axonserver.connector.impl.FlowControlledStream
    public SubscriptionQueryRequest buildFlowControlMessage(FlowControl flowControl) {
        return null;
    }
}
