package org.springframework.rabbit.stream.listener;

import com.rabbitmq.client.Channel;
import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.ConsumerBuilder;
import com.rabbitmq.stream.Environment;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.MicrometerHolder;
import org.springframework.amqp.rabbit.listener.ObservableListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.rabbit.stream.micrometer.RabbitStreamListenerObservation;
import org.springframework.rabbit.stream.micrometer.RabbitStreamListenerObservationConvention;
import org.springframework.rabbit.stream.micrometer.RabbitStreamMessageReceiverContext;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.rabbit.stream.support.converter.DefaultStreamMessageConverter;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/rabbit/stream/listener/StreamListenerContainer.class */
public class StreamListenerContainer extends ObservableListenerContainer {
    protected LogAccessor logger;
    private final ConsumerBuilder builder;
    private final Collection<Consumer> consumers;
    private StreamMessageConverter streamConverter;
    private ConsumerCustomizer consumerCustomizer;
    private boolean simpleStream;
    private boolean superStream;
    private int concurrency;
    private boolean autoStartup;
    private MessageListener messageListener;
    private StreamMessageListener streamListener;
    private Advice[] adviceChain;
    private String streamName;

    @Nullable
    private RabbitStreamListenerObservationConvention observationConvention;

    public StreamListenerContainer(Environment environment) {
        this(environment, null);
    }

    public StreamListenerContainer(Environment environment, @Nullable Codec codec) {
        this.logger = new LogAccessor(LogFactory.getLog(getClass()));
        this.consumers = new ArrayList();
        this.consumerCustomizer = (str, consumerBuilder) -> {
        };
        this.concurrency = 1;
        this.autoStartup = true;
        Assert.notNull(environment, "'environment' cannot be null");
        this.builder = environment.consumerBuilder();
        this.streamConverter = new DefaultStreamMessageConverter(codec);
    }

    public synchronized void setQueueNames(String... strArr) {
        Assert.isTrue(!this.superStream, "setQueueNames() and superStream() are mutually exclusive");
        Assert.isTrue(strArr != null && strArr.length == 1, "Only one stream is supported");
        this.builder.stream(strArr[0]);
        this.simpleStream = true;
        this.streamName = strArr[0];
    }

    public void superStream(String str, String str2) {
        superStream(str, str2, 1);
    }

    public synchronized void superStream(String str, String str2, int i) {
        Assert.isTrue(i > 0, () -> {
            return "'concurrency' must be greater than zero, not " + i;
        });
        this.concurrency = i;
        Assert.isTrue(!this.simpleStream, "setQueueNames() and superStream() are mutually exclusive");
        Assert.notNull(str, "'superStream' cannot be null");
        this.builder.superStream(str).singleActiveConsumer().name(str2);
        this.superStream = true;
        this.streamName = str;
    }

    public StreamMessageConverter getStreamConverter() {
        return this.streamConverter;
    }

    public void setStreamConverter(StreamMessageConverter streamMessageConverter) {
        Assert.notNull(streamMessageConverter, "'messageConverter' cannot be null");
        this.streamConverter = streamMessageConverter;
    }

    public synchronized void setConsumerCustomizer(ConsumerCustomizer consumerCustomizer) {
        Assert.notNull(consumerCustomizer, "'consumerCustomizer' cannot be null");
        this.consumerCustomizer = consumerCustomizer;
    }

    public void setAutoStartup(boolean z) {
        this.autoStartup = z;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAdviceChain(Advice... adviceArr) {
        Assert.notNull(adviceArr, "'advices' cannot be null");
        Assert.noNullElements(adviceArr, "'advices' cannot have null elements");
        this.adviceChain = (Advice[]) Arrays.copyOf(adviceArr, adviceArr.length);
    }

    @Nullable
    public Object getMessageListener() {
        return this.messageListener;
    }

    public void setObservationConvention(RabbitStreamListenerObservationConvention rabbitStreamListenerObservationConvention) {
        this.observationConvention = rabbitStreamListenerObservationConvention;
    }

    public void afterPropertiesSet() {
        checkMicrometer();
        checkObservation();
    }

    public synchronized boolean isRunning() {
        return this.consumers.size() > 0;
    }

    public synchronized void start() {
        if (this.consumers.size() == 0) {
            this.consumerCustomizer.accept(getListenerId(), this.builder);
            if (this.simpleStream) {
                this.consumers.add(this.builder.build());
                return;
            }
            for (int i = 0; i < this.concurrency; i++) {
                this.consumers.add(this.builder.build());
            }
        }
    }

    public synchronized void stop() {
        this.consumers.forEach(consumer -> {
            try {
                consumer.close();
            } catch (RuntimeException e) {
                this.logger.error(e, "Failed to close consumer");
            }
        });
        this.consumers.clear();
    }

    public void setupMessageListener(MessageListener messageListener) {
        adviseIfNeeded(messageListener);
        this.builder.messageHandler((context, message) -> {
            ObservationRegistry observationRegistry = getObservationRegistry();
            Object obj = null;
            MicrometerHolder micrometerHolder = getMicrometerHolder();
            if (micrometerHolder != null) {
                obj = micrometerHolder.start();
            }
            Observation observation = RabbitStreamListenerObservation.STREAM_LISTENER_OBSERVATION.observation(this.observationConvention, RabbitStreamListenerObservation.DefaultRabbitStreamListenerObservationConvention.INSTANCE, () -> {
                return new RabbitStreamMessageReceiverContext(message, getListenerId(), this.streamName);
            }, observationRegistry);
            Object obj2 = obj;
            if (this.streamListener != null) {
                observation.observe(() -> {
                    try {
                        this.streamListener.onStreamMessage(message, context);
                        if (obj2 != null) {
                            micrometerHolder.success(obj2, this.streamName);
                        }
                    } catch (RuntimeException e) {
                        if (obj2 != null) {
                            micrometerHolder.failure(obj2, this.streamName, e.getClass().getSimpleName());
                        }
                        throw e;
                    } catch (Exception e2) {
                        if (obj2 != null) {
                            micrometerHolder.failure(obj2, this.streamName, e2.getClass().getSimpleName());
                        }
                        throw RabbitExceptionTranslator.convertRabbitAccessException(e2);
                    }
                });
                return;
            }
            Message message = this.streamConverter.toMessage((Object) message, new StreamMessageProperties(context));
            if (!(this.messageListener instanceof ChannelAwareMessageListener)) {
                observation.observe(() -> {
                    this.messageListener.onMessage(message);
                });
                return;
            }
            try {
                observation.observe(() -> {
                    try {
                        this.messageListener.onMessage(message, (Channel) null);
                        if (obj2 != null) {
                            micrometerHolder.success(obj2, this.streamName);
                        }
                    } catch (RuntimeException e) {
                        if (obj2 != null) {
                            micrometerHolder.failure(obj2, this.streamName, e.getClass().getSimpleName());
                        }
                        throw e;
                    } catch (Exception e2) {
                        if (obj2 != null) {
                            micrometerHolder.failure(obj2, this.streamName, e2.getClass().getSimpleName());
                        }
                        throw RabbitExceptionTranslator.convertRabbitAccessException(e2);
                    }
                });
            } catch (Exception e) {
                this.logger.error(e, "Listener threw an exception");
            }
        });
    }

    private void adviseIfNeeded(MessageListener messageListener) {
        this.messageListener = messageListener;
        if (messageListener instanceof StreamMessageListener) {
            this.streamListener = (StreamMessageListener) messageListener;
        }
        if (this.adviceChain == null || this.adviceChain.length <= 0) {
            return;
        }
        ProxyFactory proxyFactory = new ProxyFactory(messageListener);
        for (Advice advice : this.adviceChain) {
            proxyFactory.addAdvisor(new DefaultPointcutAdvisor(advice));
        }
        proxyFactory.setInterfaces(messageListener.getClass().getInterfaces());
        if (this.streamListener != null) {
            this.streamListener = (StreamMessageListener) proxyFactory.getProxy(getClass().getClassLoader());
        } else {
            this.messageListener = (MessageListener) proxyFactory.getProxy(getClass().getClassLoader());
        }
    }
}
