package io.axoniq.axonserver.connector.event.transformation.impl.grpc;

import com.google.protobuf.Empty;
import io.axoniq.axonserver.connector.AxonServerException;
import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.admin.AdminChannel;
import io.axoniq.axonserver.connector.event.transformation.EventTransformation;
import io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationService;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.FutureListStreamObserver;
import io.axoniq.axonserver.connector.impl.FutureStreamObserver;
import io.axoniq.axonserver.grpc.event.ApplyTransformationRequest;
import io.axoniq.axonserver.grpc.event.CompactionRequest;
import io.axoniq.axonserver.grpc.event.EventTransformationServiceGrpc;
import io.axoniq.axonserver.grpc.event.StartTransformationRequest;
import io.axoniq.axonserver.grpc.event.TransformRequest;
import io.axoniq.axonserver.grpc.event.TransformRequestAck;
import io.axoniq.axonserver.grpc.event.TransformationId;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/axonserver/connector/event/transformation/impl/grpc/GrpcEventTransformationService.class */
public class GrpcEventTransformationService implements EventTransformationService {
    private final Logger logger;
    private final EventTransformationServiceGrpc.EventTransformationServiceStub stub;

    public GrpcEventTransformationService(AxonServerManagedChannel axonServerManagedChannel) {
        this(EventTransformationServiceGrpc.newStub(axonServerManagedChannel));
    }

    public GrpcEventTransformationService(EventTransformationServiceGrpc.EventTransformationServiceStub eventTransformationServiceStub) {
        this.logger = LoggerFactory.getLogger(GrpcEventTransformationService.class);
        this.stub = eventTransformationServiceStub;
    }

    @Nonnull
    private static ApplyTransformationRequest applyTransformationRequest(String str, Long l) {
        return ApplyTransformationRequest.newBuilder().setTransformationId(TransformationId.newBuilder().setId(str)).setLastSequence(l.longValue()).m3283build();
    }

    @Override // io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationService
    public CompletableFuture<Iterable<EventTransformation>> transformations() {
        FutureListStreamObserver futureListStreamObserver = new FutureListStreamObserver();
        this.stub.transformations(Empty.newBuilder().build(), futureListStreamObserver);
        return futureListStreamObserver.thenApply(list -> {
            return (List) list.stream().map(GrpcEventTransformation::new).collect(Collectors.toList());
        });
    }

    @Override // io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationService
    public CompletableFuture<EventTransformation> transformationById(String str) {
        return transformations().thenApply(iterable -> {
            return (EventTransformation) StreamSupport.stream(iterable.spliterator(), false).filter(eventTransformation -> {
                return str.equals(eventTransformation.id());
            }).findFirst().orElseThrow(IllegalStateException::new);
        });
    }

    @Override // io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationService
    public CompletableFuture<String> newTransformation(String str) {
        FutureStreamObserver futureStreamObserver = new FutureStreamObserver((Throwable) new AxonServerException(ErrorCategory.OTHER, "An unknown error occurred while starting transformation. No response received from Server.", AdminChannel.CHANNEL_CONTEXT));
        this.stub.startTransformation(StartTransformationRequest.newBuilder().setDescription(str).m4428build(), futureStreamObserver);
        return futureStreamObserver.thenApply((v0) -> {
            return v0.getId();
        });
    }

    @Override // io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationService
    public EventTransformationService.TransformationStream transformationStream(String str) {
        AtomicReference atomicReference = new AtomicReference(l -> {
        });
        AtomicReference atomicReference2 = new AtomicReference(th -> {
        });
        StreamObserver<TransformRequest> transformEvents = this.stub.transformEvents(responseObserver(l2 -> {
            ((Consumer) atomicReference.get()).accept(l2);
        }, th2 -> {
            ((Consumer) atomicReference2.get()).accept(th2);
        }));
        atomicReference.getClass();
        Consumer consumer = (v1) -> {
            r4.set(v1);
        };
        atomicReference2.getClass();
        return new GrpcTransformationStream(str, transformEvents, consumer, (v1) -> {
            r5.set(v1);
        });
    }

    private StreamObserver<TransformRequestAck> responseObserver(final Consumer<Long> consumer, final Consumer<Throwable> consumer2) {
        return new StreamObserver<TransformRequestAck>() { // from class: io.axoniq.axonserver.connector.event.transformation.impl.grpc.GrpcEventTransformationService.1
            public void onNext(TransformRequestAck transformRequestAck) {
                consumer.accept(Long.valueOf(transformRequestAck.getSequence()));
            }

            public void onError(Throwable th) {
                GrpcEventTransformationService.this.logger.warn("Transformation failed by server", th);
                consumer2.accept(th);
            }

            public void onCompleted() {
                NonTransientException nonTransientException = new NonTransientException("The server unexpectedly completed the transformation stream");
                GrpcEventTransformationService.this.logger.warn("The server unexpectedly completed the transformation stream", nonTransientException);
                consumer2.accept(nonTransientException);
            }
        };
    }

    @Override // io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationService
    public CompletableFuture<Void> startApplying(String str, Long l) {
        FutureStreamObserver futureStreamObserver = new FutureStreamObserver(Empty.getDefaultInstance());
        this.stub.applyTransformation(applyTransformationRequest(str, l), futureStreamObserver);
        return futureStreamObserver.thenAccept(empty -> {
        });
    }

    @Override // io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationService
    public CompletableFuture<Void> cancel(String str) {
        FutureStreamObserver futureStreamObserver = new FutureStreamObserver(Empty.getDefaultInstance());
        this.stub.cancelTransformation(TransformationId.newBuilder().setId(str).m4665build(), futureStreamObserver);
        return futureStreamObserver.thenAccept(empty -> {
        });
    }

    @Override // io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationService
    public CompletableFuture<Void> startCompacting() {
        FutureStreamObserver futureStreamObserver = new FutureStreamObserver(Empty.getDefaultInstance());
        this.stub.compact(CompactionRequest.newBuilder().m3425build(), futureStreamObserver);
        return futureStreamObserver.thenAccept(empty -> {
        });
    }
}
