package io.axoniq.axonserver.connector.event.impl;

import io.axoniq.axonserver.connector.event.PersistentStreamSegment;
import io.axoniq.axonserver.connector.impl.AbstractBufferedStream;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.event.EventWithToken;
import io.axoniq.axonserver.grpc.streams.PersistentStreamEvent;
import io.axoniq.axonserver.grpc.streams.Requests;
import io.axoniq.axonserver.grpc.streams.StreamRequest;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/axonserver/connector/event/impl/BufferedPersistentStreamSegment.class */
public class BufferedPersistentStreamSegment extends AbstractBufferedStream<PersistentStreamEvent, StreamRequest> implements PersistentStreamSegment {
    private static final Logger logger = LoggerFactory.getLogger(BufferedPersistentStreamSegment.class);
    private static final PersistentStreamEvent TERMINAL_MESSAGE = PersistentStreamEvent.newBuilder().setEvent(EventWithToken.newBuilder().setToken(-1729).m2015build()).m3173build();
    private final Set<Runnable> onSegmentClosedCallbacks;
    private final String streamId;
    private final int segment;
    private final LongConsumer progressCallback;
    private final Consumer<String> errorCallback;
    private final AtomicBoolean closed;
    private Runnable localOnAvailableCallback;

    public BufferedPersistentStreamSegment(String str, int i, int i2, int i3, LongConsumer longConsumer, Consumer<String> consumer) {
        super("ignoredClientId", i2, i3);
        this.onSegmentClosedCallbacks = new CopyOnWriteArraySet();
        this.closed = new AtomicBoolean();
        this.localOnAvailableCallback = () -> {
        };
        this.streamId = str;
        this.segment = i;
        this.progressCallback = longConsumer;
        this.errorCallback = consumer;
    }

    @Override // io.axoniq.axonserver.connector.event.PersistentStreamSegment
    public void onSegmentClosed(Runnable runnable) {
        this.onSegmentClosedCallbacks.add(runnable);
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractBufferedStream, io.axoniq.axonserver.connector.impl.FlowControlledBuffer
    public void onCompleted() {
        super.onCompleted();
        this.closed.set(true);
        this.onSegmentClosedCallbacks.forEach((v0) -> {
            v0.run();
        });
    }

    @Override // io.axoniq.axonserver.connector.event.PersistentStreamSegment
    public void acknowledge(long j) {
        if (this.closed.get()) {
            logger.debug("{}: Acknowledging position {} for segment {} after closing the segment", new Object[]{this.streamId, Long.valueOf(j), Integer.valueOf(this.segment)});
        }
        this.progressCallback.accept(j);
    }

    @Override // io.axoniq.axonserver.connector.event.PersistentStreamSegment
    public void error(String str) {
        this.errorCallback.accept(str);
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractBufferedStream, io.axoniq.axonserver.connector.impl.FlowControlledBuffer, io.axoniq.axonserver.connector.ResultStream
    public boolean isClosed() {
        return this.closed.get();
    }

    @Override // io.axoniq.axonserver.connector.event.PersistentStreamSegment
    public int segment() {
        return this.segment;
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractBufferedStream, io.axoniq.axonserver.connector.impl.FlowControlledBuffer, io.axoniq.axonserver.connector.ResultStream, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            logger.info("{}: Close segment {}", this.streamId, Integer.valueOf(this.segment));
            this.localOnAvailableCallback.run();
        }
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractBufferedStream, io.axoniq.axonserver.connector.ResultStream
    public void onAvailable(Runnable runnable) {
        super.onAvailable(runnable);
        this.localOnAvailableCallback = runnable;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.axoniq.axonserver.connector.impl.FlowControlledBuffer
    public PersistentStreamEvent terminalMessage() {
        return TERMINAL_MESSAGE;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.axoniq.axonserver.connector.impl.FlowControlledStream
    public StreamRequest buildFlowControlMessage(FlowControl flowControl) {
        return StreamRequest.newBuilder().setRequests(Requests.newBuilder().setSegment(this.segment).setRequests((int) flowControl.getPermits())).m3384build();
    }

    public String toString() {
        return this.streamId + "[" + this.segment + "]";
    }
}
