package io.awspring.cloud.sqs.listener;

import io.awspring.cloud.sqs.ConfigUtils;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementOrdering;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.acknowledgement.BatchingAcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.acknowledgement.ImmediateAcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.sink.BatchMessageSink;
import io.awspring.cloud.sqs.listener.sink.FanOutMessageSink;
import io.awspring.cloud.sqs.listener.sink.MessageSink;
import io.awspring.cloud.sqs.listener.source.MessageSource;
import io.awspring.cloud.sqs.listener.source.SqsMessageSource;
import java.time.Duration;
import java.util.Collection;
import org.springframework.util.Assert;

/* loaded from: input_file:io/awspring/cloud/sqs/listener/StandardSqsComponentFactory.class */
public class StandardSqsComponentFactory<T> implements ContainerComponentFactory<T> {
    private static final Duration DEFAULT_STANDARD_SQS_ACK_INTERVAL = Duration.ofSeconds(1);
    private static final Integer DEFAULT_STANDARD_SQS_ACK_THRESHOLD = 10;
    private static final AcknowledgementOrdering DEFAULT_STANDARD_SQS_ACK_ORDERING = AcknowledgementOrdering.PARALLEL;

    @Override // io.awspring.cloud.sqs.listener.ContainerComponentFactory
    public boolean supports(Collection<String> collection, ContainerOptions containerOptions) {
        return collection.stream().noneMatch(str -> {
            return str.endsWith(".fifo");
        });
    }

    @Override // io.awspring.cloud.sqs.listener.ContainerComponentFactory
    public MessageSource<T> createMessageSource(ContainerOptions containerOptions) {
        return new SqsMessageSource();
    }

    @Override // io.awspring.cloud.sqs.listener.ContainerComponentFactory
    public MessageSink<T> createMessageSink(ContainerOptions containerOptions) {
        return ListenerMode.SINGLE_MESSAGE.equals(containerOptions.getListenerMode()) ? new FanOutMessageSink() : new BatchMessageSink();
    }

    @Override // io.awspring.cloud.sqs.listener.ContainerComponentFactory
    public AcknowledgementProcessor<T> createAcknowledgementProcessor(ContainerOptions containerOptions) {
        validateAcknowledgementOrdering(containerOptions);
        return (containerOptions.getAcknowledgementInterval() == Duration.ZERO && containerOptions.getAcknowledgementThreshold().intValue() == 0) ? createAndConfigureImmediateProcessor(containerOptions) : createAndConfigureBatchingProcessor(containerOptions);
    }

    private void validateAcknowledgementOrdering(ContainerOptions containerOptions) {
        Assert.isTrue(!AcknowledgementOrdering.ORDERED_BY_GROUP.equals(containerOptions.getAcknowledgementOrdering()), "Standard SQS queues are not compatible with " + AcknowledgementOrdering.ORDERED_BY_GROUP);
    }

    private AcknowledgementProcessor<T> createAndConfigureBatchingProcessor(ContainerOptions containerOptions) {
        return configureBatchingAcknowledgementProcessor(containerOptions, createBatchingProcessorInstance());
    }

    protected ImmediateAcknowledgementProcessor<T> createAndConfigureImmediateProcessor(ContainerOptions containerOptions) {
        return configureImmediateAcknowledgementProcessor(createImmediateProcessorInstance(), containerOptions);
    }

    protected ImmediateAcknowledgementProcessor<T> createImmediateProcessorInstance() {
        return new ImmediateAcknowledgementProcessor<>();
    }

    protected BatchingAcknowledgementProcessor<T> createBatchingProcessorInstance() {
        return new BatchingAcknowledgementProcessor<>();
    }

    protected ImmediateAcknowledgementProcessor<T> configureImmediateAcknowledgementProcessor(ImmediateAcknowledgementProcessor<T> immediateAcknowledgementProcessor, ContainerOptions containerOptions) {
        immediateAcknowledgementProcessor.setMaxAcknowledgementsPerBatch(10);
        ContainerOptions.Builder builder = containerOptions.toBuilder();
        ConfigUtils configUtils = ConfigUtils.INSTANCE;
        builder.getClass();
        configUtils.acceptIfNotNullOrElse(builder::acknowledgementOrdering, containerOptions.getAcknowledgementOrdering(), DEFAULT_STANDARD_SQS_ACK_ORDERING);
        immediateAcknowledgementProcessor.configure(builder.build());
        return immediateAcknowledgementProcessor;
    }

    protected BatchingAcknowledgementProcessor<T> configureBatchingAcknowledgementProcessor(ContainerOptions containerOptions, BatchingAcknowledgementProcessor<T> batchingAcknowledgementProcessor) {
        batchingAcknowledgementProcessor.setMaxAcknowledgementsPerBatch(10);
        ContainerOptions.Builder builder = containerOptions.toBuilder();
        ConfigUtils configUtils = ConfigUtils.INSTANCE;
        builder.getClass();
        ConfigUtils acceptIfNotNullOrElse = configUtils.acceptIfNotNullOrElse(builder::acknowledgementInterval, containerOptions.getAcknowledgementInterval(), DEFAULT_STANDARD_SQS_ACK_INTERVAL);
        builder.getClass();
        ConfigUtils acceptIfNotNullOrElse2 = acceptIfNotNullOrElse.acceptIfNotNullOrElse((v1) -> {
            r1.acknowledgementThreshold(v1);
        }, containerOptions.getAcknowledgementThreshold(), DEFAULT_STANDARD_SQS_ACK_THRESHOLD);
        builder.getClass();
        acceptIfNotNullOrElse2.acceptIfNotNullOrElse(builder::acknowledgementOrdering, containerOptions.getAcknowledgementOrdering(), DEFAULT_STANDARD_SQS_ACK_ORDERING);
        batchingAcknowledgementProcessor.configure(builder.build());
        return batchingAcknowledgementProcessor;
    }
}
