/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.impl.StreamConsumer;
import com.rabbitmq.stream.impl.StreamConsumerBuilder;
import com.rabbitmq.stream.impl.StreamEnvironment;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SuperStreamConsumer
implements Consumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SuperStreamConsumer.class);
    private final String superStream;
    private final Map<String, Consumer> consumers = new ConcurrentHashMap<String, Consumer>();

    SuperStreamConsumer(StreamConsumerBuilder builder, String superStream, StreamEnvironment environment, StreamConsumerBuilder.TrackingConfiguration trackingConfiguration) {
        this.superStream = superStream;
        List partitions = environment.locatorOperation(c -> c.partitions(superStream));
        ConsumerState[] states = new ConsumerState[partitions.size()];
        HashMap partitionToStates = new HashMap(partitions.size());
        for (int i = 0; i < partitions.size(); ++i) {
            ConsumerState state;
            states[i] = state = new ConsumerState();
            partitionToStates.put(partitions.get(i), state);
        }
        for (String partition : partitions) {
            ConsumerState state = (ConsumerState)partitionToStates.get(partition);
            MessageHandler messageHandler = trackingConfiguration.enabled() && trackingConfiguration.manual() ? new ManualOffsetTrackingMessageHandler(builder.messageHandler(), states, state) : builder.messageHandler();
            StreamConsumerBuilder subConsumerBuilder = builder.duplicate();
            if (trackingConfiguration.enabled() && trackingConfiguration.auto()) {
                subConsumerBuilder = (StreamConsumerBuilder)subConsumerBuilder.autoTrackingStrategy().messageCountBeforeStorage(trackingConfiguration.autoMessageCountBeforeStorage() / partitions.size()).builder();
            }
            Consumer consumer = subConsumerBuilder.lazyInit(true).superStream(null).messageHandler(messageHandler).stream(partition).build();
            this.consumers.put(partition, consumer);
            state.consumer = consumer;
            LOGGER.debug("Created consumer on stream '{}' for super stream '{}'", (Object)partition, (Object)superStream);
        }
        this.consumers.values().forEach(c -> ((StreamConsumer)c).start());
    }

    @Override
    public void store(long offset) {
        throw new UnsupportedOperationException("Consumer#store(long) does not work for super streams, use MessageHandler.Context#storeOffset() instead");
    }

    @Override
    public void close() {
        for (Map.Entry<String, Consumer> entry : this.consumers.entrySet()) {
            LOGGER.debug("Closing consumer for partition '{}' of super stream {}", (Object)entry.getKey(), (Object)this.superStream);
            try {
                entry.getValue().close();
            }
            catch (Exception e) {
                LOGGER.info("Error while closing consumer for partition {} of super stream {}: {}", new Object[]{entry.getKey(), this.superStream, e.getMessage()});
            }
        }
    }

    private static final class ManualOffsetTrackingMessageHandler
    implements MessageHandler {
        private final MessageHandler delegate;
        private final ConsumerState[] consumerStates;
        private final ConsumerState consumerState;

        private ManualOffsetTrackingMessageHandler(MessageHandler delegate, ConsumerState[] consumerStates, ConsumerState consumerState) {
            this.delegate = delegate;
            this.consumerStates = consumerStates;
            this.consumerState = consumerState;
        }

        @Override
        public void handle(final MessageHandler.Context context, Message message) {
            MessageHandler.Context ctx = new MessageHandler.Context(){

                @Override
                public long offset() {
                    return context.offset();
                }

                @Override
                public long timestamp() {
                    return context.timestamp();
                }

                @Override
                public void storeOffset() {
                    for (ConsumerState state : consumerStates) {
                        if (consumerState == state) {
                            context.storeOffset();
                            continue;
                        }
                        if (state.offset == 0L) continue;
                        state.consumer.store(state.offset);
                    }
                }

                @Override
                public Consumer consumer() {
                    return context.consumer();
                }
            };
            this.delegate.handle(ctx, message);
            this.consumerState.offset = context.offset();
        }
    }

    private static final class ConsumerState {
        private volatile long offset = 0L;
        private volatile Consumer consumer;

        private ConsumerState() {
        }
    }
}

