package io.axoniq.axonserver.connector.event.impl;

import io.axoniq.axonserver.connector.AxonServerException;
import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.event.PersistentStream;
import io.axoniq.axonserver.connector.event.PersistentStreamCallbacks;
import io.axoniq.axonserver.connector.event.PersistentStreamSegment;
import io.axoniq.axonserver.connector.impl.ObjectUtils;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.axoniq.axonserver.grpc.streams.InitializationProperties;
import io.axoniq.axonserver.grpc.streams.Open;
import io.axoniq.axonserver.grpc.streams.ProgressAcknowledgement;
import io.axoniq.axonserver.grpc.streams.SegmentError;
import io.axoniq.axonserver.grpc.streams.StreamRequest;
import io.axoniq.axonserver.grpc.streams.StreamSignal;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/axonserver/connector/event/impl/PersistentStreamImpl.class */
public class PersistentStreamImpl implements PersistentStream, ClientResponseObserver<StreamRequest, StreamSignal> {
    private static final Logger logger = LoggerFactory.getLogger(PersistentStreamImpl.class);
    private static final Consumer<Throwable> NO_OP = th -> {
    };
    private final String streamId;
    private final String clientId;
    private final int bufferSize;
    private final int refillBatch;
    private final Map<Integer, BufferedPersistentStreamSegment> openSegments = new ConcurrentHashMap();
    private final AtomicReference<ClientCallStreamObserver<StreamRequest>> outboundStreamHolder = new AtomicReference<>();
    private final AtomicReference<Consumer<Throwable>> onClosedCallback = new AtomicReference<>(NO_OP);
    private final Set<Consumer<PersistentStreamSegment>> onSegmentOpenedCallbacks = new CopyOnWriteArraySet();
    private final Set<Consumer<PersistentStreamSegment>> segmentOnAvailable = new CopyOnWriteArraySet();
    private final Set<Consumer<PersistentStreamSegment>> segmentOnClose = new CopyOnWriteArraySet();
    private final Set<Integer> closeConfirmationsSent = new CopyOnWriteArraySet();
    private final AtomicBoolean closed = new AtomicBoolean();

    /* loaded from: input_file:io/axoniq/axonserver/connector/event/impl/PersistentStreamImpl$StreamRequestClientCallStreamObserver.class */
    private class StreamRequestClientCallStreamObserver extends ClientCallStreamObserver<StreamRequest> {
        private final ClientCallStreamObserver<StreamRequest> clientCallStreamObserver;

        public StreamRequestClientCallStreamObserver(ClientCallStreamObserver<StreamRequest> clientCallStreamObserver) {
            this.clientCallStreamObserver = clientCallStreamObserver;
        }

        public void cancel(@Nullable String str, @Nullable Throwable th) {
            PersistentStreamImpl.logger.debug("{}: Ignore cancel: {}", new Object[]{PersistentStreamImpl.this.streamId, str, th});
        }

        public boolean isReady() {
            return this.clientCallStreamObserver.isReady();
        }

        public void setOnReadyHandler(Runnable runnable) {
            this.clientCallStreamObserver.setOnReadyHandler(runnable);
        }

        public void request(int i) {
            this.clientCallStreamObserver.request(i);
        }

        public void setMessageCompression(boolean z) {
            this.clientCallStreamObserver.setMessageCompression(z);
        }

        public void disableAutoInboundFlowControl() {
        }

        public void disableAutoRequestWithInitial(int i) {
        }

        public void onNext(StreamRequest streamRequest) {
            synchronized (PersistentStreamImpl.this.outboundStreamHolder) {
                PersistentStreamImpl.logger.trace("Send {}", streamRequest);
                this.clientCallStreamObserver.onNext(streamRequest);
            }
        }

        public void onError(Throwable th) {
            try {
                this.clientCallStreamObserver.onError(th);
            } catch (IllegalStateException e) {
            }
        }

        public void onCompleted() {
            try {
                this.clientCallStreamObserver.onCompleted();
            } catch (IllegalStateException e) {
            }
        }
    }

    public PersistentStreamImpl(ClientIdentification clientIdentification, String str, int i, int i2, PersistentStreamCallbacks persistentStreamCallbacks) {
        this.streamId = str;
        this.clientId = clientIdentification.getClientId();
        this.bufferSize = i;
        this.refillBatch = i2;
        if (persistentStreamCallbacks.onAvailable() != null) {
            this.segmentOnAvailable.add(persistentStreamCallbacks.onAvailable());
        }
        if (persistentStreamCallbacks.onClosed() != null) {
            this.onClosedCallback.set(persistentStreamCallbacks.onClosed());
        }
        if (persistentStreamCallbacks.onSegmentOpened() != null) {
            this.onSegmentOpenedCallbacks.add(persistentStreamCallbacks.onSegmentOpened());
        }
        if (persistentStreamCallbacks.onSegmentClosed() != null) {
            this.segmentOnClose.add(persistentStreamCallbacks.onSegmentClosed());
        }
    }

    public void openConnection() {
        openConnection(null);
    }

    public void openConnection(InitializationProperties initializationProperties) {
        Open.Builder clientId = Open.newBuilder().setStreamId(this.streamId).setClientId(this.clientId);
        if (initializationProperties != null) {
            clientId.setInitializationProperties(initializationProperties);
        }
        this.outboundStreamHolder.get().onNext(StreamRequest.newBuilder().setOpen(clientId.m3123build()).m3384build());
    }

    public void triggerReconnect() {
        close();
        this.onClosedCallback.get().accept(new AxonServerException(ErrorCategory.OTHER, "Client initiated reconnect", "client"));
    }

    @Override // io.axoniq.axonserver.connector.event.PersistentStream
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            Instant plus = Instant.now().plus((TemporalAmount) Duration.ofSeconds(2L));
            this.openSegments.forEach((num, bufferedPersistentStreamSegment) -> {
                bufferedPersistentStreamSegment.close();
            });
            while (this.closeConfirmationsSent.size() != this.openSegments.size() && Instant.now().isBefore(plus)) {
                try {
                    logger.debug("{}: Waiting for segments to complete {} of {}", new Object[]{this.streamId, Integer.valueOf(this.closeConfirmationsSent.size()), Integer.valueOf(this.openSegments.size())});
                    Thread.sleep(50L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }
            logger.debug("{}: Waited for segments to complete {}", this.streamId, this.closeConfirmationsSent);
            sendCompleted();
        }
    }

    public void beforeStart(ClientCallStreamObserver<StreamRequest> clientCallStreamObserver) {
        this.outboundStreamHolder.set(new StreamRequestClientCallStreamObserver(clientCallStreamObserver));
    }

    public void onNext(StreamSignal streamSignal) {
        if (streamSignal.hasOpen()) {
            getPersistentStreamSegment(streamSignal.getSegment());
        }
        if (streamSignal.hasEvent()) {
            getPersistentStreamSegment(streamSignal.getSegment()).onNext(streamSignal.getEvent());
        }
        if (streamSignal.getClosed()) {
            logger.debug("Received: {} - closed", Integer.valueOf(streamSignal.getSegment()));
            BufferedPersistentStreamSegment remove = this.openSegments.remove(Integer.valueOf(streamSignal.getSegment()));
            if (remove != null) {
                remove.onCompleted();
            }
        }
    }

    private BufferedPersistentStreamSegment getPersistentStreamSegment(int i) {
        boolean z = !this.openSegments.containsKey(Integer.valueOf(i));
        BufferedPersistentStreamSegment computeIfAbsent = this.openSegments.computeIfAbsent(Integer.valueOf(i), num -> {
            BufferedPersistentStreamSegment bufferedPersistentStreamSegment = new BufferedPersistentStreamSegment(this.streamId, i, this.bufferSize, this.refillBatch, j -> {
                acknowledge(num.intValue(), j);
            }, str -> {
                sendError(num.intValue(), str);
            });
            bufferedPersistentStreamSegment.beforeStart(this.outboundStreamHolder.get());
            bufferedPersistentStreamSegment.enableFlowControl();
            return bufferedPersistentStreamSegment;
        });
        if (z) {
            this.onSegmentOpenedCallbacks.forEach(consumer -> {
                consumer.accept(computeIfAbsent);
            });
            this.segmentOnAvailable.forEach(consumer2 -> {
                computeIfAbsent.onAvailable(() -> {
                    consumer2.accept(computeIfAbsent);
                });
            });
            this.segmentOnClose.forEach(consumer3 -> {
                computeIfAbsent.onSegmentClosed(() -> {
                    consumer3.accept(computeIfAbsent);
                });
            });
            this.closeConfirmationsSent.remove(Integer.valueOf(computeIfAbsent.segment()));
        }
        return computeIfAbsent;
    }

    private void acknowledge(int i, long j) {
        try {
            ObjectUtils.doIfNotNull(this.outboundStreamHolder.get(), clientCallStreamObserver -> {
                clientCallStreamObserver.onNext(StreamRequest.newBuilder().setAcknowledgeProgress(ProgressAcknowledgement.newBuilder().setSegment(i).setPosition(j).m3205build()).m3384build());
            });
        } catch (Exception e) {
            logger.debug("Failed to send acknowledgement.", e);
        }
        if (j == -45) {
            logger.info("{}: Close confirmed for segment {}", this.streamId, Integer.valueOf(i));
            this.closeConfirmationsSent.add(Integer.valueOf(i));
        }
    }

    private void sendError(int i, String str) {
        ObjectUtils.doIfNotNull(this.outboundStreamHolder.get(), clientCallStreamObserver -> {
            clientCallStreamObserver.onNext(StreamRequest.newBuilder().setError(SegmentError.newBuilder().setSegment(i).setError(str).m3307build()).m3384build());
        });
    }

    public void onError(Throwable th) {
        logger.warn("{}: Error on stream: {}", new Object[]{this.streamId, th.getMessage(), th});
        close(th);
    }

    public void onCompleted() {
        sendCompleted();
        close(null);
    }

    private void sendCompleted() {
        try {
            ObjectUtils.doIfNotNull(this.outboundStreamHolder.getAndSet(null), (v0) -> {
                v0.onCompleted();
            });
        } catch (Exception e) {
        }
    }

    private void close(Throwable th) {
        this.closed.set(true);
        this.openSegments.forEach((num, bufferedPersistentStreamSegment) -> {
            try {
                if (th != null) {
                    bufferedPersistentStreamSegment.onError(th);
                } else {
                    bufferedPersistentStreamSegment.onCompleted();
                }
            } catch (Exception e) {
                logger.debug("{}: Exception while completing segment {}", new Object[]{this.streamId, Integer.valueOf(bufferedPersistentStreamSegment.segment()), e});
            }
        });
        this.onClosedCallback.get().accept(th);
    }
}
