/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.ConsumerFlowStrategy;
import com.rabbitmq.stream.ConsumerUpdateListener;
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.NoOffsetException;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.SubscriptionListener;
import com.rabbitmq.stream.impl.AsyncRetry;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.ConsumersCoordinator;
import com.rabbitmq.stream.impl.StreamConsumerBuilder;
import com.rabbitmq.stream.impl.StreamEnvironment;
import com.rabbitmq.stream.impl.Utils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class StreamConsumer
implements Consumer {
    private static final AtomicLong ID_SEQUENCE = new AtomicLong(0L);
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumer.class);
    private final long id;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final String name;
    private final String stream;
    private final StreamEnvironment environment;
    private final LongConsumer trackingCallback;
    private final Runnable initCallback;
    private final ConsumerUpdateListener consumerUpdateListener;
    private volatile Runnable closingCallback;
    private volatile Client trackingClient;
    private volatile Client subscriptionClient;
    private volatile Status status;
    private volatile long lastRequestedStoredOffset = 0L;
    private final AtomicBoolean nothingStoredYet = new AtomicBoolean(true);
    private volatile boolean sacActive;
    private final boolean sac;
    private final OffsetSpecification initialOffsetSpecification;

    @SuppressFBWarnings(value={"CT_CONSTRUCTOR_THROW"})
    StreamConsumer(String stream, OffsetSpecification offsetSpecification, MessageHandler messageHandler, String name, StreamEnvironment environment, StreamConsumerBuilder.TrackingConfiguration trackingConfiguration, boolean lazyInit, SubscriptionListener subscriptionListener, Map<String, String> subscriptionProperties, ConsumerUpdateListener consumerUpdateListener, ConsumerFlowStrategy flowStrategy) {
        if (Utils.filteringEnabled(subscriptionProperties) && !environment.filteringSupported()) {
            throw new IllegalArgumentException("Filtering is not supported by the broker (requires RabbitMQ 3.13+ and stream_filtering feature flag activated");
        }
        this.id = ID_SEQUENCE.getAndIncrement();
        try {
            LongSupplier trackingFlushCallback;
            Runnable trackingClosingCallback;
            this.name = name;
            this.stream = stream;
            this.environment = environment;
            this.initialOffsetSpecification = offsetSpecification == null ? ConsumersCoordinator.DEFAULT_OFFSET_SPECIFICATION : offsetSpecification;
            AtomicReference<MessageHandler> decoratedMessageHandler = new AtomicReference<MessageHandler>();
            if (trackingConfiguration.enabled()) {
                StreamEnvironment.TrackingConsumerRegistration trackingConsumerRegistration = environment.registerTrackingConsumer(this, trackingConfiguration);
                trackingClosingCallback = trackingConsumerRegistration.closingCallback();
                java.util.function.Consumer<MessageHandler.Context> postMessageProcessingCallback = trackingConsumerRegistration.postMessageProcessingCallback();
                if (postMessageProcessingCallback == null) {
                    decoratedMessageHandler.set(messageHandler);
                } else {
                    decoratedMessageHandler.set((context, message) -> {
                        messageHandler.handle(context, message);
                        postMessageProcessingCallback.accept(context);
                    });
                }
                this.trackingCallback = trackingConsumerRegistration.trackingCallback();
                trackingFlushCallback = trackingConsumerRegistration::flush;
            } else {
                trackingClosingCallback = () -> {};
                this.trackingCallback = Utils.NO_OP_LONG_CONSUMER;
                trackingFlushCallback = Utils.NO_OP_LONG_SUPPLIER;
                decoratedMessageHandler.set(messageHandler);
            }
            this.sacActive = false;
            if (Utils.isSac(subscriptionProperties)) {
                this.sac = true;
                MessageHandler existingMessageHandler = (MessageHandler)decoratedMessageHandler.get();
                AtomicBoolean receivedSomething = new AtomicBoolean(false);
                MessageHandler messageHandlerWithSac = trackingConfiguration.auto() ? (context, message) -> {
                    if (this.sacActive) {
                        receivedSomething.set(true);
                        existingMessageHandler.handle(context, message);
                    }
                } : (context, message) -> {
                    if (this.sacActive) {
                        existingMessageHandler.handle(context, message);
                    }
                };
                decoratedMessageHandler.set(messageHandlerWithSac);
                if (consumerUpdateListener == null || consumerUpdateListener instanceof Utils.CompositeConsumerUpdateListener) {
                    if (trackingConfiguration.auto()) {
                        LOGGER.debug("Setting default consumer update listener for auto tracking strategy");
                        ConsumerUpdateListener defaultListener = context -> {
                            OffsetSpecification result = null;
                            if (context.isActive()) {
                                LOGGER.debug("Looking up offset (stream {})", (Object)this.stream);
                                StreamConsumer consumer = (StreamConsumer)context.consumer();
                                try {
                                    long offset = StreamConsumer.getStoredOffsetSafely(consumer, this.environment);
                                    LOGGER.debug("Stored offset is {}, returning the value + 1 to the server", (Object)offset);
                                    result = OffsetSpecification.offset(offset + 1L);
                                }
                                catch (NoOffsetException e) {
                                    LOGGER.debug("No stored offset, using initial offset specification: {}", (Object)this.initialOffsetSpecification);
                                    result = this.initialOffsetSpecification;
                                }
                                return result;
                            }
                            if (receivedSomething.get()) {
                                LOGGER.debug("Storing offset (consumer {}, stream {}) because going from active to passive", (Object)this.id, (Object)this.stream);
                                long offset = trackingFlushCallback.getAsLong();
                                LOGGER.debug("Making sure offset {} has been stored (consumer {}, stream {})", new Object[]{offset, this.id, this.stream});
                                this.waitForOffsetToBeStored(offset);
                            }
                            result = OffsetSpecification.none();
                            return result;
                        };
                        if (consumerUpdateListener != null) {
                            ((Utils.CompositeConsumerUpdateListener)consumerUpdateListener).add(defaultListener);
                            this.consumerUpdateListener = consumerUpdateListener;
                        } else {
                            this.consumerUpdateListener = defaultListener;
                        }
                    } else if (trackingConfiguration.manual()) {
                        ConsumerUpdateListener defaultListener;
                        LOGGER.debug("Setting default consumer update listener for manual tracking strategy");
                        this.consumerUpdateListener = defaultListener = context -> {
                            OffsetSpecification result = null;
                            if (context.isActive()) {
                                LOGGER.debug("Going from passive to active, looking up offset");
                                StreamConsumer consumer = (StreamConsumer)context.consumer();
                                try {
                                    long offset = StreamConsumer.getStoredOffsetSafely(consumer, this.environment);
                                    LOGGER.debug("Stored offset is {}, returning the value + 1 to the server", (Object)offset);
                                    result = OffsetSpecification.offset(offset + 1L);
                                }
                                catch (NoOffsetException e) {
                                    LOGGER.debug("No stored offset, using initial offset specification: {}", (Object)this.initialOffsetSpecification);
                                    result = this.initialOffsetSpecification;
                                }
                            }
                            return result;
                        };
                    } else {
                        this.consumerUpdateListener = context -> null;
                    }
                } else {
                    this.consumerUpdateListener = consumerUpdateListener;
                }
            } else {
                this.consumerUpdateListener = null;
                this.sac = false;
            }
            MessageHandler computedMessageHandler = (MessageHandler)decoratedMessageHandler.get();
            MessageHandler closedAwareMessageHandler = (context, message) -> {
                if (!this.closed.get()) {
                    computedMessageHandler.handle(context, message);
                }
            };
            Runnable init = () -> {
                this.status = Status.INITIALIZING;
                this.closingCallback = environment.registerConsumer(this, stream, offsetSpecification, this.name, subscriptionListener, trackingClosingCallback, closedAwareMessageHandler, Collections.unmodifiableMap(subscriptionProperties), flowStrategy);
                this.status = Status.RUNNING;
            };
            if (lazyInit) {
                this.initCallback = init;
            } else {
                this.initCallback = () -> {};
                init.run();
            }
        }
        catch (RuntimeException e) {
            this.closed.set(true);
            throw e;
        }
    }

    static long getStoredOffsetSafely(StreamConsumer consumer, StreamEnvironment environment) {
        long offset;
        try {
            offset = consumer.storedOffset();
        }
        catch (IllegalStateException e) {
            LOGGER.debug("Leader connection not available to retrieve offset, retrying");
            CompletableFuture<Long> storedOffetRetrievalFuture = AsyncRetry.asyncRetry(() -> consumer.storedOffset(() -> consumer.trackingClient())).description("Stored offset retrieval for '%s' on stream '%s'", consumer.name, consumer.stream).scheduler(environment.scheduledExecutorService()).retry(ex -> ex instanceof IllegalStateException).delayPolicy(BackOffDelayPolicy.fixedWithInitialDelay(environment.recoveryBackOffDelayPolicy().delay(0), environment.recoveryBackOffDelayPolicy().delay(1), environment.recoveryBackOffDelayPolicy().delay(0).multipliedBy(3L))).build();
            try {
                offset = storedOffetRetrievalFuture.get();
            }
            catch (InterruptedException ex2) {
                Thread.currentThread().interrupt();
                throw new StreamException(String.format("Could not get stored offset for '%s' on stream '%s'", consumer.name, consumer.stream), ex2);
            }
            catch (ExecutionException ex3) {
                throw new StreamException(String.format("Could not get stored offset for '%s' on stream '%s'", consumer.name, consumer.stream), ex3);
            }
        }
        return offset;
    }

    Client trackingClient() {
        return this.trackingClient;
    }

    void waitForOffsetToBeStored(long expectedStoredOffset) {
        CompletableFuture<Boolean> storedTask = AsyncRetry.asyncRetry(() -> {
            try {
                long lastStoredOffset = this.storedOffset();
                boolean stored = lastStoredOffset == expectedStoredOffset;
                LOGGER.debug("Last stored offset from consumer {} on {} is {}, expecting {}", new Object[]{this.id, this.stream, lastStoredOffset, expectedStoredOffset});
                if (!stored) {
                    throw new IllegalStateException();
                }
                return true;
            }
            catch (StreamException e) {
                if (e.getCode() == 19) {
                    LOGGER.debug("No stored offset for consumer {} on {}, expecting {}", new Object[]{this.id, this.stream, expectedStoredOffset});
                    throw new IllegalStateException();
                }
                throw e;
            }
        }).description("Last stored offset for consumer %s on stream %s must be %d", this.name, this.stream, expectedStoredOffset).delayPolicy(BackOffDelayPolicy.fixedWithInitialDelay(Duration.ofMillis(200L), Duration.ofMillis(200L))).retry(exception -> exception instanceof IllegalStateException).scheduler(this.environment.scheduledExecutorService()).build();
        try {
            storedTask.get(10L, TimeUnit.SECONDS);
            LOGGER.debug("Offset {} stored (consumer {}, stream {})", new Object[]{expectedStoredOffset, this.id, this.stream});
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException | TimeoutException e) {
            LOGGER.warn("Error while checking offset has been stored", (Throwable)e);
        }
    }

    void start() {
        try {
            this.initCallback.run();
        }
        catch (RuntimeException e) {
            this.closed.set(true);
            throw e;
        }
    }

    @Override
    public void store(long offset) {
        this.checkNotClosed();
        this.trackingCallback.accept(offset);
        if (this.canTrack() && (Utils.offsetBefore(this.lastRequestedStoredOffset, offset) || this.nothingStoredYet.compareAndSet(true, false))) {
            try {
                this.trackingClient.storeOffset(this.name, this.stream, offset);
                this.lastRequestedStoredOffset = offset;
            }
            catch (Exception e) {
                LOGGER.debug("Error while trying to store offset: {}", (Object)e.getMessage());
            }
        }
    }

    OffsetSpecification consumerUpdate(boolean active) {
        LOGGER.debug("Consumer {} from stream {} with name {} received consumer update notification, active = {}", new Object[]{this.id, this.stream, this.name, active});
        if (this.sacActive == active) {
            LOGGER.warn("Previous and new status are the same ({}), there should be no consumer update in this case.", (Object)active);
        }
        this.sacActive = active;
        DefaultConsumerUpdateContext context = new DefaultConsumerUpdateContext(this, active);
        LOGGER.debug("Calling consumer update listener");
        OffsetSpecification result = null;
        try {
            result = this.consumerUpdateListener.update(context);
            LOGGER.debug("Consumer update listener returned {}", (Object)result);
        }
        catch (Exception e) {
            LOGGER.warn("Error in consumer update listener", (Throwable)e);
        }
        return result;
    }

    boolean isSac() {
        return this.sac;
    }

    boolean sacActive() {
        return this.sacActive;
    }

    private boolean canTrack() {
        return (this.status == Status.INITIALIZING || this.status == Status.RUNNING) && this.name != null;
    }

    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.environment.removeConsumer(this);
            this.closeFromEnvironment();
        }
    }

    void closeFromEnvironment() {
        this.maybeNotifyActiveToInactiveSac();
        LOGGER.debug("Calling consumer {} closing callback (stream {})", (Object)this.id, (Object)this.stream);
        this.closingCallback.run();
        this.closed.set(true);
        this.status = Status.CLOSED;
        LOGGER.debug("Closed consumer successfully");
    }

    void closeAfterStreamDeletion() {
        if (this.closed.compareAndSet(false, true)) {
            this.maybeNotifyActiveToInactiveSac();
            this.environment.removeConsumer(this);
            this.status = Status.CLOSED;
        }
    }

    boolean isOpen() {
        return !this.closed.get();
    }

    void setTrackingClient(Client client) {
        this.trackingClient = client;
    }

    void setSubscriptionClient(Client client) {
        this.subscriptionClient = client;
        if (client == null && this.isSac()) {
            this.maybeNotifyActiveToInactiveSac();
            this.sacActive = false;
        }
    }

    private void maybeNotifyActiveToInactiveSac() {
        if (this.isSac() && this.sacActive) {
            LOGGER.debug("Single active consumer {} from stream {} with name {} is unavailable, calling consumer update listener", new Object[]{this.id, this.stream, this.name});
            this.consumerUpdate(false);
        }
    }

    synchronized void unavailable() {
        this.status = Status.NOT_AVAILABLE;
        this.trackingClient = null;
    }

    void running() {
        this.status = Status.RUNNING;
    }

    long storedOffset(Supplier<Client> clientSupplier) {
        this.checkNotClosed();
        if (this.canTrack()) {
            Client.QueryOffsetResponse response;
            try {
                response = clientSupplier.get().queryOffset(this.name, this.stream);
            }
            catch (Exception e) {
                throw new IllegalStateException(String.format("Not possible to query offset for consumer %s on stream %s for now: %s", this.name, this.stream, e.getMessage()), e);
            }
            if (response.isOk()) {
                return response.getOffset();
            }
            if (response.getResponseCode() == 19) {
                throw new NoOffsetException(String.format("No offset stored for consumer %s on stream %s (%s)", this.name, this.stream, Utils.formatConstant(response.getResponseCode())));
            }
            throw new StreamException(String.format("QueryOffset for consumer %s on stream %s returned an error (%s)", this.name, this.stream, Utils.formatConstant(response.getResponseCode())), response.getResponseCode());
        }
        if (this.name == null) {
            throw new UnsupportedOperationException("Not possible to query stored offset for a consumer without a name");
        }
        throw new IllegalStateException(String.format("Not possible to query offset for consumer %s on stream %s for now, consumer status is %s", this.name, this.stream, this.status.name()));
    }

    @Override
    public long storedOffset() {
        return this.storedOffset(() -> this.trackingClient);
    }

    String stream() {
        return this.stream;
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        StreamConsumer that = (StreamConsumer)o;
        return this.id == that.id && this.stream.equals(that.stream);
    }

    public int hashCode() {
        return Objects.hash(this.id, this.stream);
    }

    public String toString() {
        Client subscriptionClient = this.subscriptionClient;
        Client trackingClient = this.trackingClient;
        return "{ \"id\" : " + this.id + ",\"stream\" : \"" + this.stream + "\",\"subscription_client\" : " + (subscriptionClient == null ? "null" : "\"" + subscriptionClient.connectionName() + "\"") + ", \"tracking_client\" : " + (trackingClient == null ? "null" : "\"" + trackingClient.connectionName() + "\"") + "}";
    }

    private void checkNotClosed() {
        if (this.status == Status.CLOSED) {
            throw new IllegalStateException("This producer instance has been closed");
        }
    }

    long id() {
        return this.id;
    }

    String subscriptionConnectionName() {
        Client client = this.subscriptionClient;
        if (client == null) {
            return "<no-connection>";
        }
        return client.clientConnectionName();
    }

    static enum Status {
        INITIALIZING,
        RUNNING,
        NOT_AVAILABLE,
        CLOSED;

    }

    private static class DefaultConsumerUpdateContext
    implements ConsumerUpdateListener.Context {
        private final StreamConsumer consumer;
        private final boolean active;

        private DefaultConsumerUpdateContext(StreamConsumer consumer, boolean active) {
            this.consumer = consumer;
            this.active = active;
        }

        @Override
        public Consumer consumer() {
            return this.consumer;
        }

        @Override
        public String stream() {
            return this.consumer.stream;
        }

        @Override
        public boolean isActive() {
            return this.active;
        }

        public String toString() {
            return "DefaultConsumerUpdateContext{consumer=" + this.consumer + ", active=" + this.active + '}';
        }
    }
}

