package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.ConsumerUpdateListener;
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.NoOffsetException;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.SubscriptionListener;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.StreamConsumerBuilder;
import com.rabbitmq.stream.impl.StreamEnvironment;
import com.rabbitmq.stream.impl.Utils;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/rabbitmq/stream/impl/StreamConsumer.class */
public class StreamConsumer implements Consumer {
    private static final AtomicLong ID_SEQUENCE = new AtomicLong(0);
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumer.class);
    private final String name;
    private final String stream;
    private final StreamEnvironment environment;
    private final LongConsumer trackingCallback;
    private final Runnable initCallback;
    private final ConsumerUpdateListener consumerUpdateListener;
    private volatile Runnable closingCallback;
    private volatile Client trackingClient;
    private volatile Client subscriptionClient;
    private volatile Status status;
    private volatile boolean sacActive;
    private final boolean sac;
    private final OffsetSpecification initialOffsetSpecification;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private volatile long lastRequestedStoredOffset = 0;
    private final AtomicBoolean nothingStoredYet = new AtomicBoolean(true);
    private final long id = ID_SEQUENCE.getAndIncrement();

    /* loaded from: input_file:com/rabbitmq/stream/impl/StreamConsumer$DefaultConsumerUpdateContext.class */
    private static class DefaultConsumerUpdateContext implements ConsumerUpdateListener.Context {
        private final StreamConsumer consumer;
        private final boolean active;

        private DefaultConsumerUpdateContext(StreamConsumer streamConsumer, boolean z) {
            this.consumer = streamConsumer;
            this.active = z;
        }

        @Override // com.rabbitmq.stream.ConsumerUpdateListener.Context
        public Consumer consumer() {
            return this.consumer;
        }

        @Override // com.rabbitmq.stream.ConsumerUpdateListener.Context
        public String stream() {
            return this.consumer.stream;
        }

        @Override // com.rabbitmq.stream.ConsumerUpdateListener.Context
        public boolean isActive() {
            return this.active;
        }

        public String toString() {
            return "DefaultConsumerUpdateContext{consumer=" + this.consumer + ", active=" + this.active + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/rabbitmq/stream/impl/StreamConsumer$Status.class */
    public enum Status {
        INITIALIZING,
        RUNNING,
        NOT_AVAILABLE,
        CLOSED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamConsumer(String str, OffsetSpecification offsetSpecification, MessageHandler messageHandler, String str2, StreamEnvironment streamEnvironment, StreamConsumerBuilder.TrackingConfiguration trackingConfiguration, boolean z, SubscriptionListener subscriptionListener, Map<String, String> map, ConsumerUpdateListener consumerUpdateListener) {
        Runnable runnable;
        LongSupplier longSupplier;
        try {
            this.name = str2;
            this.stream = str;
            this.environment = streamEnvironment;
            this.initialOffsetSpecification = offsetSpecification == null ? ConsumersCoordinator.DEFAULT_OFFSET_SPECIFICATION : offsetSpecification;
            AtomicReference atomicReference = new AtomicReference();
            if (trackingConfiguration.enabled()) {
                StreamEnvironment.TrackingConsumerRegistration registerTrackingConsumer = streamEnvironment.registerTrackingConsumer(this, trackingConfiguration);
                runnable = registerTrackingConsumer.closingCallback();
                java.util.function.Consumer<MessageHandler.Context> postMessageProcessingCallback = registerTrackingConsumer.postMessageProcessingCallback();
                if (postMessageProcessingCallback == null) {
                    atomicReference.set(messageHandler);
                } else {
                    atomicReference.set((context, message) -> {
                        messageHandler.handle(context, message);
                        postMessageProcessingCallback.accept(context);
                    });
                }
                this.trackingCallback = registerTrackingConsumer.trackingCallback();
                registerTrackingConsumer.getClass();
                longSupplier = registerTrackingConsumer::flush;
            } else {
                runnable = () -> {
                };
                this.trackingCallback = Utils.NO_OP_LONG_CONSUMER;
                longSupplier = Utils.NO_OP_LONG_SUPPLIER;
                atomicReference.set(messageHandler);
            }
            this.sacActive = false;
            if (Utils.isSac(map)) {
                this.sac = true;
                MessageHandler messageHandler2 = (MessageHandler) atomicReference.get();
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                atomicReference.set(trackingConfiguration.auto() ? (context2, message2) -> {
                    if (this.sacActive) {
                        atomicBoolean.set(true);
                        messageHandler2.handle(context2, message2);
                    }
                } : (context3, message3) -> {
                    if (this.sacActive) {
                        messageHandler2.handle(context3, message3);
                    }
                });
                if (consumerUpdateListener != null && !(consumerUpdateListener instanceof Utils.CompositeConsumerUpdateListener)) {
                    this.consumerUpdateListener = consumerUpdateListener;
                } else if (trackingConfiguration.auto()) {
                    LOGGER.debug("Setting default consumer update listener for auto tracking strategy");
                    LongSupplier longSupplier2 = longSupplier;
                    ConsumerUpdateListener consumerUpdateListener2 = context4 -> {
                        OffsetSpecification offsetSpecification2;
                        if (!context4.isActive()) {
                            if (atomicBoolean.get()) {
                                LOGGER.debug("Storing offset (consumer {}, stream {}) because going from active to passive", Long.valueOf(this.id), this.stream);
                                long asLong = longSupplier2.getAsLong();
                                LOGGER.debug("Making sure offset {} has been stored (consumer {}, stream {})", new Object[]{Long.valueOf(asLong), Long.valueOf(this.id), this.stream});
                                waitForOffsetToBeStored(asLong);
                            }
                            return OffsetSpecification.none();
                        }
                        LOGGER.debug("Looking up offset (stream {})", this.stream);
                        try {
                            long storedOffset = context4.consumer().storedOffset();
                            LOGGER.debug("Stored offset is {}, returning the value + 1 to the server", Long.valueOf(storedOffset));
                            offsetSpecification2 = OffsetSpecification.offset(storedOffset + 1);
                        } catch (NoOffsetException e) {
                            LOGGER.debug("No stored offset, using initial offset specification: {}", this.initialOffsetSpecification);
                            offsetSpecification2 = this.initialOffsetSpecification;
                        }
                        return offsetSpecification2;
                    };
                    if (consumerUpdateListener instanceof Utils.CompositeConsumerUpdateListener) {
                        ((Utils.CompositeConsumerUpdateListener) consumerUpdateListener).add(consumerUpdateListener2);
                        this.consumerUpdateListener = consumerUpdateListener;
                    } else {
                        this.consumerUpdateListener = consumerUpdateListener2;
                    }
                } else if (trackingConfiguration.manual()) {
                    LOGGER.debug("Setting default consumer update listener for manual tracking strategy");
                    this.consumerUpdateListener = context5 -> {
                        OffsetSpecification offsetSpecification2 = null;
                        if (context5.isActive()) {
                            LOGGER.debug("Going from passive to active, looking up offset");
                            try {
                                long storedOffset = context5.consumer().storedOffset();
                                LOGGER.debug("Stored offset is {}, returning the value + 1 to the server", Long.valueOf(storedOffset));
                                offsetSpecification2 = OffsetSpecification.offset(storedOffset + 1);
                            } catch (NoOffsetException e) {
                                LOGGER.debug("No stored offset, using initial offset specification: {}", this.initialOffsetSpecification);
                                offsetSpecification2 = this.initialOffsetSpecification;
                            }
                        }
                        return offsetSpecification2;
                    };
                } else {
                    this.consumerUpdateListener = context6 -> {
                        return null;
                    };
                }
            } else {
                this.consumerUpdateListener = null;
                this.sac = false;
            }
            MessageHandler messageHandler3 = (MessageHandler) atomicReference.get();
            MessageHandler messageHandler4 = (context7, message4) -> {
                if (this.closed.get()) {
                    return;
                }
                messageHandler3.handle(context7, message4);
            };
            Runnable runnable2 = runnable;
            Runnable runnable3 = () -> {
                this.status = Status.INITIALIZING;
                this.closingCallback = streamEnvironment.registerConsumer(this, str, offsetSpecification, this.name, subscriptionListener, runnable2, messageHandler4, Collections.unmodifiableMap(map));
                this.status = Status.RUNNING;
            };
            if (z) {
                this.initCallback = runnable3;
            } else {
                this.initCallback = () -> {
                };
                runnable3.run();
            }
        } catch (RuntimeException e) {
            this.closed.set(true);
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void waitForOffsetToBeStored(long j) {
        try {
            AsyncRetry.asyncRetry(() -> {
                try {
                    long storedOffset = storedOffset();
                    boolean z = storedOffset == j;
                    LOGGER.debug("Last stored offset from consumer {} on {} is {}, expecting {}", new Object[]{Long.valueOf(this.id), this.stream, Long.valueOf(storedOffset), Long.valueOf(j)});
                    if (z) {
                        return true;
                    }
                    throw new IllegalStateException();
                } catch (StreamException e) {
                    if (e.getCode() != 19) {
                        throw e;
                    }
                    LOGGER.debug("No stored offset for consumer {} on {}, expecting {}", new Object[]{Long.valueOf(this.id), this.stream, Long.valueOf(j)});
                    throw new IllegalStateException();
                }
            }).description("Last stored offset for consumer %s on stream %s must be %d", this.name, this.stream, Long.valueOf(j)).delayPolicy(BackOffDelayPolicy.fixedWithInitialDelay(Duration.ofMillis(200L), Duration.ofMillis(200L))).retry(exc -> {
                return exc instanceof IllegalStateException;
            }).scheduler(this.environment.scheduledExecutorService()).build().get(10L, TimeUnit.SECONDS);
            LOGGER.debug("Offset {} stored (consumer {}, stream {})", new Object[]{Long.valueOf(j), Long.valueOf(this.id), this.stream});
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException | TimeoutException e2) {
            LOGGER.warn("Error while checking offset has been stored", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        try {
            this.initCallback.run();
        } catch (RuntimeException e) {
            this.closed.set(true);
            throw e;
        }
    }

    @Override // com.rabbitmq.stream.Consumer
    public void store(long j) {
        checkNotClosed();
        this.trackingCallback.accept(j);
        if (canTrack()) {
            if (Utils.offsetBefore(this.lastRequestedStoredOffset, j) || this.nothingStoredYet.compareAndSet(true, false)) {
                try {
                    this.trackingClient.storeOffset(this.name, this.stream, j);
                    this.lastRequestedStoredOffset = j;
                } catch (Exception e) {
                    LOGGER.debug("Error while trying to store offset: {}", e.getMessage());
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OffsetSpecification consumerUpdate(boolean z) {
        LOGGER.debug("Consumer {} from stream {} with name {} received consumer update notification, active = {}", new Object[]{Long.valueOf(this.id), this.stream, this.name, Boolean.valueOf(z)});
        if (this.sacActive == z) {
            LOGGER.warn("Previous and new status are the same ({}), there should be no consumer update in this case.", Boolean.valueOf(z));
        }
        this.sacActive = z;
        DefaultConsumerUpdateContext defaultConsumerUpdateContext = new DefaultConsumerUpdateContext(z);
        LOGGER.debug("Calling consumer update listener");
        OffsetSpecification offsetSpecification = null;
        try {
            offsetSpecification = this.consumerUpdateListener.update(defaultConsumerUpdateContext);
            LOGGER.debug("Consumer update listener returned {}", offsetSpecification);
        } catch (Exception e) {
            LOGGER.warn("Error in consumer update listener", e);
        }
        return offsetSpecification;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isSac() {
        return this.sac;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean sacActive() {
        return this.sacActive;
    }

    private boolean canTrack() {
        return (this.status == Status.INITIALIZING || this.status == Status.RUNNING) && this.name != null;
    }

    @Override // com.rabbitmq.stream.Consumer, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.environment.removeConsumer(this);
            closeFromEnvironment();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeFromEnvironment() {
        LOGGER.debug("Calling consumer {} closing callback (stream {})", Long.valueOf(this.id), this.stream);
        this.closingCallback.run();
        this.closed.set(true);
        this.status = Status.CLOSED;
        LOGGER.debug("Closed consumer successfully");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeAfterStreamDeletion() {
        if (this.closed.compareAndSet(false, true)) {
            this.environment.removeConsumer(this);
            this.status = Status.CLOSED;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isOpen() {
        return !this.closed.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void setTrackingClient(Client client) {
        this.trackingClient = client;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setSubscriptionClient(Client client) {
        this.subscriptionClient = client;
        if (client == null && isSac()) {
            this.sacActive = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void unavailable() {
        this.status = Status.NOT_AVAILABLE;
        this.trackingClient = null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void running() {
        this.status = Status.RUNNING;
    }

    @Override // com.rabbitmq.stream.Consumer
    public long storedOffset() {
        checkNotClosed();
        if (!canTrack()) {
            if (this.name == null) {
                throw new UnsupportedOperationException("Not possible to query stored offset for a consumer without a name");
            }
            throw new IllegalStateException(String.format("Not possible to query offset for consumer %s on stream %s for now", this.name, this.stream));
        }
        try {
            Client.QueryOffsetResponse queryOffset = this.trackingClient.queryOffset(this.name, this.stream);
            if (queryOffset.isOk()) {
                return queryOffset.getOffset();
            }
            if (queryOffset.getResponseCode() == 19) {
                throw new NoOffsetException(String.format("No offset stored for consumer %s on stream %s (%s)", this.name, this.stream, Utils.formatConstant(queryOffset.getResponseCode())));
            }
            throw new StreamException(String.format("QueryOffset for consumer %s on stream %s returned an error (%s)", this.name, this.stream, Utils.formatConstant(queryOffset.getResponseCode())), queryOffset.getResponseCode());
        } catch (Exception e) {
            throw new IllegalStateException(String.format("Not possible to query offset for consumer %s on stream %s for now", this.name, this.stream), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String stream() {
        return this.stream;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        StreamConsumer streamConsumer = (StreamConsumer) obj;
        return this.id == streamConsumer.id && this.stream.equals(streamConsumer.stream);
    }

    public int hashCode() {
        return Objects.hash(Long.valueOf(this.id), this.stream);
    }

    public String toString() {
        Client client = this.subscriptionClient;
        Client client2 = this.trackingClient;
        return "{ \"id\" : " + this.id + ",\"stream\" : \"" + this.stream + "\",\"subscription_client\" : " + (client == null ? "null" : "\"" + client.connectionName() + "\"") + ", \"tracking_client\" : " + (client2 == null ? "null" : "\"" + client2.connectionName() + "\"") + "}";
    }

    private void checkNotClosed() {
        if (this.status == Status.CLOSED) {
            throw new IllegalStateException("This producer instance has been closed");
        }
    }
}
