package com.azure.spring.cloud.stream.binder.servicebus;

import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.spring.cloud.core.implementation.util.AzurePropertiesUtils;
import com.azure.spring.cloud.core.properties.AzureProperties;
import com.azure.spring.cloud.stream.binder.servicebus.config.ServiceBusProcessorFactoryCustomizer;
import com.azure.spring.cloud.stream.binder.servicebus.config.ServiceBusProducerFactoryCustomizer;
import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusConsumerProperties;
import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusExtendedBindingProperties;
import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusProducerProperties;
import com.azure.spring.cloud.stream.binder.servicebus.core.provisioning.ServiceBusChannelProvisioner;
import com.azure.spring.integration.core.handler.DefaultMessageHandler;
import com.azure.spring.integration.core.implementation.instrumentation.DefaultInstrumentation;
import com.azure.spring.integration.core.implementation.instrumentation.DefaultInstrumentationManager;
import com.azure.spring.integration.core.implementation.instrumentation.InstrumentationSendCallback;
import com.azure.spring.integration.core.instrumentation.Instrumentation;
import com.azure.spring.integration.core.instrumentation.InstrumentationManager;
import com.azure.spring.integration.servicebus.implementation.health.ServiceBusProcessorInstrumentation;
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.messaging.servicebus.core.DefaultServiceBusNamespaceProcessorFactory;
import com.azure.spring.messaging.servicebus.core.DefaultServiceBusNamespaceProducerFactory;
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.messaging.servicebus.core.properties.NamespaceProperties;
import com.azure.spring.messaging.servicebus.core.properties.ProcessorProperties;
import com.azure.spring.messaging.servicebus.core.properties.ProducerProperties;
import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties;
import com.azure.spring.messaging.servicebus.implementation.properties.merger.ProcessorPropertiesMerger;
import com.azure.spring.messaging.servicebus.support.converter.ServiceBusMessageConverter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/azure/spring/cloud/stream/binder/servicebus/ServiceBusMessageChannelBinder.class */
public class ServiceBusMessageChannelBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<ServiceBusConsumerProperties>, ExtendedProducerProperties<ServiceBusProducerProperties>, ServiceBusChannelProvisioner> implements ExtendedPropertiesBinder<MessageChannel, ServiceBusConsumerProperties, ServiceBusProducerProperties> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusMessageChannelBinder.class);
    private static final DefaultErrorMessageStrategy DEFAULT_ERROR_MESSAGE_STRATEGY = new DefaultErrorMessageStrategy();
    private static final String EXCEPTION_MESSAGE = "exception-message";
    private ServiceBusExtendedBindingProperties bindingProperties;
    private NamespaceProperties namespaceProperties;
    private ServiceBusTemplate serviceBusTemplate;
    private ServiceBusProcessorFactory processorFactory;
    private ServiceBusMessageConverter messageConverter;
    private final List<ServiceBusMessageListenerContainer> serviceBusMessageListenerContainers;
    private final InstrumentationManager instrumentationManager;
    private final Map<String, ExtendedProducerProperties<ServiceBusProducerProperties>> extendedProducerPropertiesMap;
    private final List<ServiceBusProducerFactoryCustomizer> producerFactoryCustomizers;
    private final List<ServiceBusProcessorFactoryCustomizer> processorFactoryCustomizers;

    public ServiceBusMessageChannelBinder(String[] strArr, ServiceBusChannelProvisioner serviceBusChannelProvisioner) {
        super(strArr, serviceBusChannelProvisioner);
        this.bindingProperties = new ServiceBusExtendedBindingProperties();
        this.messageConverter = new ServiceBusMessageConverter();
        this.serviceBusMessageListenerContainers = new ArrayList();
        this.instrumentationManager = new DefaultInstrumentationManager();
        this.extendedProducerPropertiesMap = new ConcurrentHashMap();
        this.producerFactoryCustomizers = new ArrayList();
        this.processorFactoryCustomizers = new ArrayList();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageHandler createProducerMessageHandler(ProducerDestination producerDestination, ExtendedProducerProperties<ServiceBusProducerProperties> extendedProducerProperties, MessageChannel messageChannel) {
        Assert.notNull(getServiceBusTemplate(), "ServiceBusTemplate can't be null when create a producer");
        this.extendedProducerPropertiesMap.put(producerDestination.getName(), extendedProducerProperties);
        DefaultMessageHandler defaultMessageHandler = new DefaultMessageHandler(producerDestination.getName(), this.serviceBusTemplate);
        defaultMessageHandler.setBeanFactory(getBeanFactory());
        defaultMessageHandler.setSync(((ServiceBusProducerProperties) extendedProducerProperties.getExtension()).isSync());
        defaultMessageHandler.setSendTimeout(((ServiceBusProducerProperties) extendedProducerProperties.getExtension()).getSendTimeout().toMillis());
        defaultMessageHandler.setSendFailureChannel(messageChannel);
        defaultMessageHandler.setSendCallback(new InstrumentationSendCallback(Instrumentation.buildId(Instrumentation.Type.PRODUCER, producerDestination.getName()), this.instrumentationManager));
        if (extendedProducerProperties.isPartitioned()) {
            defaultMessageHandler.setPartitionKeyExpressionString("'partitionKey-' + headers['scst_partition']");
        } else {
            defaultMessageHandler.setPartitionKeyExpression(new FunctionExpression(message -> {
                return Integer.valueOf(message.getPayload().hashCode());
            }));
        }
        return defaultMessageHandler;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageProducer createConsumerEndpoint(ConsumerDestination consumerDestination, String str, ExtendedConsumerProperties<ServiceBusConsumerProperties> extendedConsumerProperties) {
        ServiceBusMessageListenerContainer serviceBusMessageListenerContainer = new ServiceBusMessageListenerContainer(getProcessorFactory(), createContainerProperties(consumerDestination, str, extendedConsumerProperties));
        this.serviceBusMessageListenerContainers.add(serviceBusMessageListenerContainer);
        ServiceBusInboundChannelAdapter serviceBusInboundChannelAdapter = new ServiceBusInboundChannelAdapter(serviceBusMessageListenerContainer);
        String buildId = Instrumentation.buildId(Instrumentation.Type.CONSUMER, consumerDestination.getName() + "/" + getGroup(str));
        AbstractMessageChannelBinder.ErrorInfrastructure registerErrorInfrastructure = registerErrorInfrastructure(consumerDestination, getGroup(str), extendedConsumerProperties);
        serviceBusInboundChannelAdapter.setBeanFactory(getBeanFactory());
        serviceBusInboundChannelAdapter.setInstrumentationManager(this.instrumentationManager);
        serviceBusInboundChannelAdapter.setInstrumentationId(buildId);
        serviceBusInboundChannelAdapter.setErrorChannel(registerErrorInfrastructure.getErrorChannel());
        serviceBusInboundChannelAdapter.setMessageConverter(this.messageConverter);
        return serviceBusInboundChannelAdapter;
    }

    ServiceBusContainerProperties createContainerProperties(ConsumerDestination consumerDestination, String str, ExtendedConsumerProperties<ServiceBusConsumerProperties> extendedConsumerProperties) {
        ServiceBusContainerProperties serviceBusContainerProperties = new ServiceBusContainerProperties();
        AzurePropertiesUtils.copyAzureCommonProperties((AzureProperties) extendedConsumerProperties.getExtension(), serviceBusContainerProperties);
        ProcessorPropertiesMerger.copyProcessorPropertiesIfNotNull((ProcessorProperties) extendedConsumerProperties.getExtension(), serviceBusContainerProperties);
        serviceBusContainerProperties.setEntityName(consumerDestination.getName());
        serviceBusContainerProperties.setSubscriptionName(str);
        return serviceBusContainerProperties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageHandler getErrorMessageHandler(ConsumerDestination consumerDestination, String str, ExtendedConsumerProperties<ServiceBusConsumerProperties> extendedConsumerProperties) {
        return message -> {
            Assert.state(message instanceof ErrorMessage, "Expected an ErrorMessage, not a " + message.getClass().toString() + " for: " + message);
            Message originalMessage = ((ErrorMessage) message).getOriginalMessage();
            if (originalMessage == null) {
                this.logger.error("No raw message header in " + message);
                return;
            }
            Throwable th = (Throwable) message.getPayload();
            if (((ServiceBusConsumerProperties) extendedConsumerProperties.getExtension()).isRequeueRejected()) {
                deadLetter(consumerDestination.getName(), originalMessage, EXCEPTION_MESSAGE, th.getCause() != null ? th.getCause().getMessage() : th.getMessage());
            } else {
                abandon(consumerDestination.getName(), originalMessage);
            }
        };
    }

    public <T> void deadLetter(String str, Message<T> message, String str2, String str3) {
        Assert.hasText(str, "destination can't be null or empty");
        ServiceBusReceivedMessageContext serviceBusReceivedMessageContext = (ServiceBusReceivedMessageContext) message.getHeaders().get("azure_service_bus_received_message_context");
        if (serviceBusReceivedMessageContext != null) {
            serviceBusReceivedMessageContext.deadLetter();
        }
    }

    public <T> void abandon(String str, Message<T> message) {
        Assert.hasText(str, "destination can't be null or empty");
        ServiceBusReceivedMessageContext serviceBusReceivedMessageContext = (ServiceBusReceivedMessageContext) message.getHeaders().get("azure_service_bus_received_message_context");
        if (serviceBusReceivedMessageContext != null) {
            serviceBusReceivedMessageContext.abandon();
        }
    }

    /* renamed from: getExtendedConsumerProperties, reason: merged with bridge method [inline-methods] */
    public ServiceBusConsumerProperties m2getExtendedConsumerProperties(String str) {
        return (ServiceBusConsumerProperties) this.bindingProperties.getExtendedConsumerProperties(str);
    }

    /* renamed from: getExtendedProducerProperties, reason: merged with bridge method [inline-methods] */
    public ServiceBusProducerProperties m1getExtendedProducerProperties(String str) {
        return (ServiceBusProducerProperties) this.bindingProperties.getExtendedProducerProperties(str);
    }

    public String getDefaultsPrefix() {
        return this.bindingProperties.getDefaultsPrefix();
    }

    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return this.bindingProperties.getExtendedPropertiesEntryClass();
    }

    protected ErrorMessageStrategy getErrorMessageStrategy() {
        return DEFAULT_ERROR_MESSAGE_STRATEGY;
    }

    public void setBindingProperties(ServiceBusExtendedBindingProperties serviceBusExtendedBindingProperties) {
        this.bindingProperties = serviceBusExtendedBindingProperties;
    }

    private ServiceBusTemplate getServiceBusTemplate() {
        if (this.serviceBusTemplate == null) {
            DefaultServiceBusNamespaceProducerFactory defaultServiceBusNamespaceProducerFactory = new DefaultServiceBusNamespaceProducerFactory(this.namespaceProperties, getProducerPropertiesSupplier());
            this.producerFactoryCustomizers.forEach(serviceBusProducerFactoryCustomizer -> {
                serviceBusProducerFactoryCustomizer.customize(defaultServiceBusNamespaceProducerFactory);
            });
            defaultServiceBusNamespaceProducerFactory.addListener((str, serviceBusSenderAsyncClient) -> {
                DefaultInstrumentation defaultInstrumentation = new DefaultInstrumentation(str, Instrumentation.Type.PRODUCER);
                defaultInstrumentation.setStatus(Instrumentation.Status.UP);
                this.instrumentationManager.addHealthInstrumentation(defaultInstrumentation);
            });
            this.serviceBusTemplate = new ServiceBusTemplate(defaultServiceBusNamespaceProducerFactory);
        }
        return this.serviceBusTemplate;
    }

    private ServiceBusProcessorFactory getProcessorFactory() {
        if (this.processorFactory == null) {
            this.processorFactory = new DefaultServiceBusNamespaceProcessorFactory(this.namespaceProperties);
            this.processorFactoryCustomizers.forEach(serviceBusProcessorFactoryCustomizer -> {
                serviceBusProcessorFactoryCustomizer.customize(this.processorFactory);
            });
            this.processorFactory.addListener((str, serviceBusProcessorClient) -> {
                String subscriptionName = serviceBusProcessorClient.getSubscriptionName();
                ServiceBusProcessorInstrumentation serviceBusProcessorInstrumentation = new ServiceBusProcessorInstrumentation((StringUtils.hasText(subscriptionName) ? serviceBusProcessorClient.getTopicName() : serviceBusProcessorClient.getQueueName()) + "/" + getGroup(subscriptionName), Instrumentation.Type.CONSUMER, Duration.ofMinutes(2L));
                serviceBusProcessorInstrumentation.setStatus(Instrumentation.Status.UP);
                this.instrumentationManager.addHealthInstrumentation(serviceBusProcessorInstrumentation);
            });
        }
        return this.processorFactory;
    }

    private PropertiesSupplier<String, ProducerProperties> getProducerPropertiesSupplier() {
        return str -> {
            if (!this.extendedProducerPropertiesMap.containsKey(str)) {
                LOGGER.debug("Can't find extended properties for {}", str);
                return null;
            }
            ServiceBusProducerProperties serviceBusProducerProperties = (ServiceBusProducerProperties) this.extendedProducerPropertiesMap.get(str).getExtension();
            serviceBusProducerProperties.setEntityName(str);
            return serviceBusProducerProperties;
        };
    }

    public void setNamespaceProperties(NamespaceProperties namespaceProperties) {
        this.namespaceProperties = namespaceProperties;
    }

    public void setMessageConverter(ServiceBusMessageConverter serviceBusMessageConverter) {
        this.messageConverter = serviceBusMessageConverter;
    }

    public InstrumentationManager getInstrumentationManager() {
        return this.instrumentationManager;
    }

    private String getGroup(String str) {
        return str != null ? str : "";
    }

    public void addProducerFactoryCustomizer(ServiceBusProducerFactoryCustomizer serviceBusProducerFactoryCustomizer) {
        if (serviceBusProducerFactoryCustomizer != null) {
            this.producerFactoryCustomizers.add(serviceBusProducerFactoryCustomizer);
        }
    }

    public void addProcessorFactoryCustomizer(ServiceBusProcessorFactoryCustomizer serviceBusProcessorFactoryCustomizer) {
        if (serviceBusProcessorFactoryCustomizer != null) {
            this.processorFactoryCustomizers.add(serviceBusProcessorFactoryCustomizer);
        }
    }
}
