package io.awspring.cloud.sqs.listener;

import io.awspring.cloud.sqs.ConfigUtils;
import io.awspring.cloud.sqs.LifecycleHandler;
import io.awspring.cloud.sqs.MessageExecutionThread;
import io.awspring.cloud.sqs.MessageExecutionThreadFactory;
import io.awspring.cloud.sqs.UnsupportedThreadFactoryException;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder;
import io.awspring.cloud.sqs.listener.pipeline.AcknowledgementHandlerExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.AfterProcessingContextInterceptorExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.AfterProcessingInterceptorExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.BeforeProcessingContextInterceptorExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.BeforeProcessingInterceptorExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.ErrorHandlerExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.MessageListenerExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.MessageProcessingConfiguration;
import io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipeline;
import io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder;
import io.awspring.cloud.sqs.listener.sink.MessageProcessingPipelineSink;
import io.awspring.cloud.sqs.listener.sink.MessageSink;
import io.awspring.cloud.sqs.listener.source.AcknowledgementProcessingMessageSource;
import io.awspring.cloud.sqs.listener.source.MessageSource;
import io.awspring.cloud.sqs.listener.source.PollingMessageSource;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/* loaded from: input_file:io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.class */
public abstract class AbstractPipelineMessageListenerContainer<T, O extends ContainerOptions<O, B>, B extends ContainerOptionsBuilder<B, O>> extends AbstractMessageListenerContainer<T, O, B> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractPipelineMessageListenerContainer.class);
    private Collection<MessageSource<T>> messageSources;
    private MessageSink<T> messageSink;
    private TaskExecutor componentsTaskExecutor;

    @Nullable
    private TaskExecutor acknowledgementResultTaskExecutor;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractPipelineMessageListenerContainer(O o) {
        super(o);
    }

    @Override // io.awspring.cloud.sqs.listener.AbstractMessageListenerContainer
    protected void doStart() {
        ContainerComponentFactory<T, O> determineComponentFactory = determineComponentFactory();
        this.messageSources = createMessageSources(determineComponentFactory);
        this.messageSink = determineComponentFactory.createMessageSink(getContainerOptions());
        configureComponents(determineComponentFactory);
        LifecycleHandler.get().start(this.messageSink, this.messageSources);
    }

    private ContainerComponentFactory<T, O> determineComponentFactory() {
        return getComponentFactories().stream().filter(containerComponentFactory -> {
            return containerComponentFactory.supports(getQueueNames(), getContainerOptions());
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException("No ContainerComponentFactory found for queues " + getQueueNames());
        });
    }

    private Collection<ContainerComponentFactory<T, O>> getComponentFactories() {
        return !getContainerComponentFactories().isEmpty() ? getContainerComponentFactories() : createDefaultComponentFactories();
    }

    protected abstract Collection<ContainerComponentFactory<T, O>> createDefaultComponentFactories();

    protected Collection<MessageSource<T>> createMessageSources(ContainerComponentFactory<T, O> containerComponentFactory) {
        ArrayList arrayList = new ArrayList(getQueueNames());
        return (Collection) IntStream.range(0, arrayList.size()).mapToObj(i -> {
            return createMessageSource((String) arrayList.get(i), i, containerComponentFactory);
        }).collect(Collectors.toList());
    }

    protected MessageSource<T> createMessageSource(String str, int i, ContainerComponentFactory<T, O> containerComponentFactory) {
        MessageSource<T> createMessageSource = containerComponentFactory.createMessageSource(getContainerOptions());
        ConfigUtils.INSTANCE.acceptIfInstance(createMessageSource, PollingMessageSource.class, pollingMessageSource -> {
            pollingMessageSource.setPollingEndpointName(str);
        }).acceptIfInstance(createMessageSource, IdentifiableContainerComponent.class, identifiableContainerComponent -> {
            identifiableContainerComponent.setId(getId() + "-" + i);
        });
        return createMessageSource;
    }

    private void configureComponents(ContainerComponentFactory<T, O> containerComponentFactory) {
        getContainerOptions().configure(this.messageSources).configure(this.messageSink);
        this.componentsTaskExecutor = resolveComponentsTaskExecutor();
        configureMessageSources(containerComponentFactory);
        configureMessageSink(createMessageProcessingPipeline(containerComponentFactory));
        configureContainerComponents();
    }

    private void verifyThreadType() {
        if (!MessageExecutionThread.class.isAssignableFrom(Thread.currentThread().getClass())) {
            throw new UnsupportedThreadFactoryException("Custom TaskExecutors must use a %s.".formatted(MessageExecutionThreadFactory.class.getSimpleName()));
        }
    }

    protected void configureMessageSources(ContainerComponentFactory<T, O> containerComponentFactory) {
        TaskExecutor createSourcesTaskExecutor = createSourcesTaskExecutor();
        ConfigUtils.INSTANCE.acceptMany(this.messageSources, messageSource -> {
            messageSource.setMessageSink(this.messageSink);
        }).acceptManyIfInstance(this.messageSources, PollingMessageSource.class, pollingMessageSource -> {
            pollingMessageSource.setBackPressureHandler(createBackPressureHandler());
        }).acceptManyIfInstance(this.messageSources, AcknowledgementProcessingMessageSource.class, acknowledgementProcessingMessageSource -> {
            acknowledgementProcessingMessageSource.setAcknowledgementProcessor(containerComponentFactory.createAcknowledgementProcessor(getContainerOptions()));
        }).acceptManyIfInstance(this.messageSources, AcknowledgementProcessingMessageSource.class, acknowledgementProcessingMessageSource2 -> {
            acknowledgementProcessingMessageSource2.setAcknowledgementResultCallback(getAcknowledgementResultCallback());
        }).acceptManyIfInstance(this.messageSources, TaskExecutorAware.class, taskExecutorAware -> {
            taskExecutorAware.setTaskExecutor(createSourcesTaskExecutor);
        });
        doConfigureMessageSources(this.messageSources);
    }

    protected void doConfigureMessageSources(Collection<MessageSource<T>> collection) {
    }

    protected void configureMessageSink(MessageProcessingPipeline<T> messageProcessingPipeline) {
        ConfigUtils.INSTANCE.acceptIfInstance(this.messageSink, IdentifiableContainerComponent.class, identifiableContainerComponent -> {
            identifiableContainerComponent.setId(getId());
        }).acceptIfInstance(this.messageSink, TaskExecutorAware.class, taskExecutorAware -> {
            taskExecutorAware.setTaskExecutor(getComponentsTaskExecutor());
        }).acceptIfInstance(this.messageSink, MessageProcessingPipelineSink.class, messageProcessingPipelineSink -> {
            messageProcessingPipelineSink.setMessagePipeline(messageProcessingPipeline);
        });
        doConfigureMessageSink(this.messageSink);
    }

    protected void doConfigureMessageSink(MessageSink<T> messageSink) {
    }

    protected void configureContainerComponents() {
        ConfigUtils.INSTANCE.acceptManyIfInstance(getMessageInterceptors(), TaskExecutorAware.class, taskExecutorAware -> {
            taskExecutorAware.setTaskExecutor(getComponentsTaskExecutor());
        }).acceptIfInstance(getMessageListener(), TaskExecutorAware.class, taskExecutorAware2 -> {
            taskExecutorAware2.setTaskExecutor(getComponentsTaskExecutor());
        }).acceptIfInstance(getErrorHandler(), TaskExecutorAware.class, taskExecutorAware3 -> {
            taskExecutorAware3.setTaskExecutor(getComponentsTaskExecutor());
        }).acceptIfInstance(getAcknowledgementResultCallback(), TaskExecutorAware.class, taskExecutorAware4 -> {
            taskExecutorAware4.setTaskExecutor(getAcknowledgementResultTaskExecutor());
        });
    }

    protected MessageProcessingPipeline<T> createMessageProcessingPipeline(ContainerComponentFactory<T, O> containerComponentFactory) {
        return MessageProcessingPipelineBuilder.first(BeforeProcessingContextInterceptorExecutionStage::new).then(BeforeProcessingInterceptorExecutionStage::new).then(MessageListenerExecutionStage::new).thenInTheFuture(ErrorHandlerExecutionStage::new).thenInTheFuture(AfterProcessingInterceptorExecutionStage::new).thenInTheFuture(AfterProcessingContextInterceptorExecutionStage::new).thenInTheFuture(AcknowledgementHandlerExecutionStage::new).build(MessageProcessingConfiguration.builder().interceptors(getMessageInterceptors()).messageListener(getMessageListener()).errorHandler(getErrorHandler()).ackHandler(containerComponentFactory.createAcknowledgementHandler(getContainerOptions())).build());
    }

    private TaskExecutor resolveComponentsTaskExecutor() {
        return getContainerOptions().getComponentsTaskExecutor() != null ? validateCustomExecutor(getContainerOptions().getComponentsTaskExecutor()) : createTaskExecutor();
    }

    private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) {
        CompletableFuture.runAsync(this::verifyThreadType, taskExecutor).join();
        return taskExecutor;
    }

    protected BackPressureHandler createBackPressureHandler() {
        return SemaphoreBackPressureHandler.builder().batchSize(getContainerOptions().getMaxMessagesPerPoll()).totalPermits(getContainerOptions().getMaxConcurrentMessages()).acquireTimeout(getContainerOptions().getMaxDelayBetweenPolls()).throughputConfiguration(getContainerOptions().getBackPressureMode()).build();
    }

    protected TaskExecutor createSourcesTaskExecutor() {
        SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
        simpleAsyncTaskExecutor.setThreadNamePrefix(getId() + "#message_source-");
        return simpleAsyncTaskExecutor;
    }

    protected TaskExecutor createTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        int maxConcurrentMessages = getContainerOptions().getMaxConcurrentMessages() * this.messageSources.size();
        threadPoolTaskExecutor.setMaxPoolSize(maxConcurrentMessages);
        threadPoolTaskExecutor.setCorePoolSize(getContainerOptions().getMaxMessagesPerPoll());
        threadPoolTaskExecutor.setQueueCapacity(maxConcurrentMessages);
        threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
        threadPoolTaskExecutor.setThreadFactory(createThreadFactory());
        threadPoolTaskExecutor.afterPropertiesSet();
        return threadPoolTaskExecutor;
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [io.awspring.cloud.sqs.MessageExecutionThreadFactory, java.util.concurrent.ThreadFactory] */
    protected ThreadFactory createThreadFactory() {
        ?? messageExecutionThreadFactory = new MessageExecutionThreadFactory();
        messageExecutionThreadFactory.setThreadNamePrefix(getId() + "-");
        return messageExecutionThreadFactory;
    }

    @Override // io.awspring.cloud.sqs.listener.AbstractMessageListenerContainer
    protected void doStop() {
        LifecycleHandler.get().stop(this.messageSources, this.messageSink);
        shutdownComponentsTaskExecutor();
        logger.debug("Container {} stopped", getId());
    }

    protected TaskExecutor getComponentsTaskExecutor() {
        return this.componentsTaskExecutor;
    }

    protected TaskExecutor getAcknowledgementResultTaskExecutor() {
        if (this.acknowledgementResultTaskExecutor == null) {
            this.acknowledgementResultTaskExecutor = determineAcknowledgementResultExecutor();
        }
        return this.acknowledgementResultTaskExecutor;
    }

    private TaskExecutor determineAcknowledgementResultExecutor() {
        return getContainerOptions().getAcknowledgementResultTaskExecutor() != null ? validateCustomExecutor(getContainerOptions().getAcknowledgementResultTaskExecutor()) : createTaskExecutor();
    }

    private void shutdownComponentsTaskExecutor() {
        if (!this.componentsTaskExecutor.equals(getContainerOptions().getComponentsTaskExecutor())) {
            LifecycleHandler.get().dispose(getComponentsTaskExecutor());
        }
        if (this.acknowledgementResultTaskExecutor == null || this.acknowledgementResultTaskExecutor.equals(getContainerOptions().getAcknowledgementResultTaskExecutor())) {
            return;
        }
        LifecycleHandler.get().dispose(getAcknowledgementResultTaskExecutor());
    }
}
