package org.springframework.pulsar.reactive.listener;

import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
import org.apache.pulsar.reactive.client.api.ReactiveMessagePipelineBuilder;
import org.apache.pulsar.reactive.client.internal.api.ApiImplementationFactory;
import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainer.class */
public class DefaultReactivePulsarMessageListenerContainer<T> implements ReactivePulsarMessageListenerContainer<T> {
    private final ReactivePulsarConsumerFactory<T> pulsarConsumerFactory;
    private final ReactivePulsarContainerProperties<T> pulsarContainerProperties;
    private ReactiveMessageConsumerBuilderCustomizer<T> consumerCustomizer;
    private ReactiveMessagePipeline pipeline;
    private final LogAccessor logger = new LogAccessor(getClass());
    private boolean autoStartup = true;
    private final ReentrantLock lifecycleLock = new ReentrantLock();
    private final AtomicBoolean running = new AtomicBoolean(false);

    public DefaultReactivePulsarMessageListenerContainer(ReactivePulsarConsumerFactory<T> reactivePulsarConsumerFactory, ReactivePulsarContainerProperties<T> reactivePulsarContainerProperties) {
        this.pulsarConsumerFactory = reactivePulsarConsumerFactory;
        this.pulsarContainerProperties = reactivePulsarContainerProperties;
    }

    public ReactivePulsarConsumerFactory<T> getReactivePulsarConsumerFactory() {
        return this.pulsarConsumerFactory;
    }

    @Override // org.springframework.pulsar.reactive.listener.ReactivePulsarMessageListenerContainer
    public ReactivePulsarContainerProperties<T> getContainerProperties() {
        return this.pulsarContainerProperties;
    }

    public boolean isRunning() {
        return this.running.get();
    }

    protected void setRunning(boolean z) {
        this.running.set(z);
    }

    @Override // org.springframework.pulsar.reactive.listener.ReactivePulsarMessageListenerContainer
    public void setupMessageHandler(ReactivePulsarMessageHandler reactivePulsarMessageHandler) {
        this.pulsarContainerProperties.setMessageHandler(reactivePulsarMessageHandler);
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAutoStartup(boolean z) {
        this.autoStartup = z;
    }

    public ReactiveMessageConsumerBuilderCustomizer<T> getConsumerCustomizer() {
        return this.consumerCustomizer;
    }

    @Override // org.springframework.pulsar.reactive.listener.ReactivePulsarMessageListenerContainer
    public void setConsumerCustomizer(ReactiveMessageConsumerBuilderCustomizer<T> reactiveMessageConsumerBuilderCustomizer) {
        this.consumerCustomizer = reactiveMessageConsumerBuilderCustomizer;
    }

    public final void start() {
        this.lifecycleLock.lock();
        try {
            if (!isRunning()) {
                Objects.requireNonNull(this.pulsarContainerProperties.getMessageHandler(), "A ReactivePulsarMessageHandler must be provided");
                doStart();
            }
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    public void stop() {
        this.lifecycleLock.lock();
        try {
            if (isRunning()) {
                doStop();
            }
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    private void doStart() {
        setRunning(true);
        this.pipeline = startPipeline(this.pulsarContainerProperties);
    }

    public void doStop() {
        try {
            this.logger.info("Closing Pulsar Reactive pipeline.");
            this.pipeline.close();
        } catch (Exception e) {
            this.logger.error(e, () -> {
                return "Error closing Pulsar Reactive pipeline.";
            });
        } finally {
            setRunning(false);
        }
    }

    private ReactiveMessagePipeline startPipeline(ReactivePulsarContainerProperties<T> reactivePulsarContainerProperties) {
        ReactiveMessagePipeline build;
        ReactiveMessageConsumerBuilderCustomizer<T> reactiveMessageConsumerBuilderCustomizer = reactiveMessageConsumerBuilder -> {
            if (reactivePulsarContainerProperties.getSubscriptionType() != null) {
                reactiveMessageConsumerBuilder.subscriptionType(reactivePulsarContainerProperties.getSubscriptionType());
            }
            if (reactivePulsarContainerProperties.getSubscriptionName() != null) {
                reactiveMessageConsumerBuilder.subscriptionName(reactivePulsarContainerProperties.getSubscriptionName());
            }
            if (!CollectionUtils.isEmpty(reactivePulsarContainerProperties.getTopics())) {
                reactiveMessageConsumerBuilder.topics(new ArrayList(reactivePulsarContainerProperties.getTopics()));
            }
            if (reactivePulsarContainerProperties.getTopicsPattern() != null) {
                reactiveMessageConsumerBuilder.topicsPattern(reactivePulsarContainerProperties.getTopicsPattern());
            }
        };
        ArrayList arrayList = new ArrayList();
        arrayList.add(reactiveMessageConsumerBuilderCustomizer);
        if (this.consumerCustomizer != null) {
            arrayList.add(this.consumerCustomizer);
        }
        ReactiveMessagePipelineBuilder createReactiveMessageHandlerPipelineBuilder = ApiImplementationFactory.createReactiveMessageHandlerPipelineBuilder(getReactivePulsarConsumerFactory().createConsumer(reactivePulsarContainerProperties.getSchema(), arrayList));
        ReactivePulsarMessageHandler messageHandler = reactivePulsarContainerProperties.getMessageHandler();
        if (messageHandler instanceof ReactivePulsarStreamingHandler) {
            ReactivePulsarStreamingHandler reactivePulsarStreamingHandler = (ReactivePulsarStreamingHandler) messageHandler;
            Objects.requireNonNull(reactivePulsarStreamingHandler);
            build = createReactiveMessageHandlerPipelineBuilder.streamingMessageHandler(reactivePulsarStreamingHandler::mo4received).build();
        } else {
            ReactivePulsarOneByOneMessageHandler reactivePulsarOneByOneMessageHandler = (ReactivePulsarOneByOneMessageHandler) messageHandler;
            Objects.requireNonNull(reactivePulsarOneByOneMessageHandler);
            ReactiveMessagePipelineBuilder.OneByOneMessagePipelineBuilder handlingTimeout = createReactiveMessageHandlerPipelineBuilder.messageHandler(reactivePulsarOneByOneMessageHandler::received).handlingTimeout(reactivePulsarContainerProperties.getHandlingTimeout());
            if (reactivePulsarContainerProperties.getConcurrency() > 0) {
                ReactiveMessagePipelineBuilder.ConcurrentOneByOneMessagePipelineBuilder concurrency = handlingTimeout.concurrency(reactivePulsarContainerProperties.getConcurrency());
                if (reactivePulsarContainerProperties.isUseKeyOrderedProcessing()) {
                    concurrency.useKeyOrderedProcessing();
                }
                build = concurrency.build();
            } else {
                build = createReactiveMessageHandlerPipelineBuilder.build();
            }
        }
        build.start();
        return build;
    }
}
