package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.InstructionResult;
import io.grpc.stub.StreamObserver;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

/* loaded from: input_file:io/axoniq/axonserver/connector/impl/ForwardingReplyChannel.class */
public class ForwardingReplyChannel<T> implements ReplyChannel<T> {
    private final String instructionId;
    private final String clientId;
    private final StreamObserver<T> stream;
    private final Function<InstructionResult, Optional<T>> resultBuilder;
    private final Runnable onConsumed;
    private final AtomicBoolean resultSent = new AtomicBoolean(false);
    private final AtomicBoolean completed = new AtomicBoolean();

    public ForwardingReplyChannel(String str, String str2, StreamObserver<T> streamObserver, Function<InstructionResult, Optional<T>> function, Runnable runnable) {
        this.instructionId = str;
        this.clientId = str2;
        this.stream = streamObserver;
        this.resultBuilder = function;
        this.onConsumed = runnable;
    }

    @Override // io.axoniq.axonserver.connector.ReplyChannel
    public void send(T t) {
        this.stream.onNext(t);
    }

    private void ackSuccess() {
        if (this.instructionId == null || this.instructionId.isEmpty() || !this.resultSent.compareAndSet(false, true)) {
            return;
        }
        Optional<T> apply = this.resultBuilder.apply(InstructionResult.newBuilder().setInstructionId(this.instructionId).setSuccess(true).m154build());
        StreamObserver<T> streamObserver = this.stream;
        streamObserver.getClass();
        apply.ifPresent(streamObserver::onNext);
    }

    @Override // io.axoniq.axonserver.connector.ReplyChannel
    public void complete() {
        ackSuccess();
        markConsumed();
    }

    @Override // io.axoniq.axonserver.connector.ReplyChannel
    public void completeWithError(ErrorMessage errorMessage) {
        ackFailure(errorMessage);
        markConsumed();
    }

    private void ackFailure(ErrorMessage errorMessage) {
        if (this.instructionId == null || this.instructionId.isEmpty() || !this.resultSent.compareAndSet(false, true)) {
            return;
        }
        if (errorMessage.getLocation().isEmpty()) {
            errorMessage = errorMessage.m62toBuilder().setLocation(this.clientId).m79build();
        }
        Optional<T> apply = this.resultBuilder.apply(InstructionResult.newBuilder().setInstructionId(this.instructionId).setError(errorMessage == null ? ErrorMessage.getDefaultInstance() : errorMessage).setSuccess(false).m154build());
        StreamObserver<T> streamObserver = this.stream;
        streamObserver.getClass();
        apply.ifPresent(streamObserver::onNext);
    }

    @Override // io.axoniq.axonserver.connector.ReplyChannel
    public void completeWithError(ErrorCategory errorCategory, String str) {
        completeWithError(ErrorMessage.newBuilder().setErrorCode(errorCategory.errorCode()).setLocation(this.clientId).setMessage(str).m79build());
    }

    private void markConsumed() {
        if (this.completed.compareAndSet(false, true)) {
            this.onConsumed.run();
        }
    }
}
