/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.cloud.stream.binder.eventhubs.implementation;

import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.spring.cloud.core.implementation.util.AzurePropertiesUtils;
import com.azure.spring.cloud.core.properties.AzureProperties;
import com.azure.spring.cloud.stream.binder.eventhubs.config.EventHubsProcessorFactoryCustomizer;
import com.azure.spring.cloud.stream.binder.eventhubs.config.EventHubsProducerFactoryCustomizer;
import com.azure.spring.cloud.stream.binder.eventhubs.core.implementation.provisioning.EventHubsChannelProvisioner;
import com.azure.spring.cloud.stream.binder.eventhubs.core.properties.EventHubsConsumerProperties;
import com.azure.spring.cloud.stream.binder.eventhubs.core.properties.EventHubsExtendedBindingProperties;
import com.azure.spring.cloud.stream.binder.eventhubs.core.properties.EventHubsProducerProperties;
import com.azure.spring.integration.core.handler.DefaultMessageHandler;
import com.azure.spring.integration.core.implementation.instrumentation.DefaultInstrumentation;
import com.azure.spring.integration.core.implementation.instrumentation.DefaultInstrumentationManager;
import com.azure.spring.integration.core.implementation.instrumentation.InstrumentationSendCallback;
import com.azure.spring.integration.core.instrumentation.Instrumentation;
import com.azure.spring.integration.core.instrumentation.InstrumentationManager;
import com.azure.spring.integration.eventhubs.implementation.health.EventHubsProcessorInstrumentation;
import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.messaging.ListenerMode;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.messaging.core.SendOperation;
import com.azure.spring.messaging.eventhubs.core.DefaultEventHubsNamespaceProcessorFactory;
import com.azure.spring.messaging.eventhubs.core.DefaultEventHubsNamespaceProducerFactory;
import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory;
import com.azure.spring.messaging.eventhubs.core.EventHubsProducerFactory;
import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate;
import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer;
import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties;
import com.azure.spring.messaging.eventhubs.core.properties.NamespaceProperties;
import com.azure.spring.messaging.eventhubs.core.properties.ProcessorProperties;
import com.azure.spring.messaging.eventhubs.core.properties.ProducerProperties;
import com.azure.spring.messaging.eventhubs.implementation.properties.merger.ProcessorPropertiesMerger;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFutureCallback;

public class EventHubsMessageChannelBinder
extends AbstractMessageChannelBinder<ExtendedConsumerProperties<EventHubsConsumerProperties>, ExtendedProducerProperties<EventHubsProducerProperties>, EventHubsChannelProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, EventHubsConsumerProperties, EventHubsProducerProperties> {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubsMessageChannelBinder.class);
    private static final ExpressionParser EXPRESSION_PARSER = new SpelExpressionParser();
    private NamespaceProperties namespaceProperties;
    private EventHubsTemplate eventHubsTemplate;
    private CheckpointStore checkpointStore;
    private DefaultEventHubsNamespaceProcessorFactory processorFactory;
    private final List<EventHubsMessageListenerContainer> eventHubsMessageListenerContainers = new ArrayList<EventHubsMessageListenerContainer>();
    private final InstrumentationManager instrumentationManager = new DefaultInstrumentationManager();
    private EventHubsExtendedBindingProperties bindingProperties = new EventHubsExtendedBindingProperties();
    private final Map<String, ExtendedProducerProperties<EventHubsProducerProperties>> extendedProducerPropertiesMap = new ConcurrentHashMap<String, ExtendedProducerProperties<EventHubsProducerProperties>>();
    private final List<EventHubsProducerFactoryCustomizer> producerFactoryCustomizers = new ArrayList<EventHubsProducerFactoryCustomizer>();
    private final List<EventHubsProcessorFactoryCustomizer> processorFactoryCustomizers = new ArrayList<EventHubsProcessorFactoryCustomizer>();

    public EventHubsMessageChannelBinder(String[] headersToEmbed, EventHubsChannelProvisioner provisioningProvider) {
        super(headersToEmbed, (ProvisioningProvider)provisioningProvider);
    }

    protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<EventHubsProducerProperties> producerProperties, MessageChannel errorChannel) {
        this.extendedProducerPropertiesMap.put(destination.getName(), producerProperties);
        Assert.notNull((Object)this.getEventHubTemplate(), (String)"eventHubsTemplate can't be null when create a producer");
        DefaultMessageHandler handler = new DefaultMessageHandler(destination.getName(), (SendOperation)this.eventHubsTemplate);
        handler.setBeanFactory((BeanFactory)this.getBeanFactory());
        handler.setSync(((EventHubsProducerProperties)producerProperties.getExtension()).isSync());
        handler.setSendTimeout(((EventHubsProducerProperties)producerProperties.getExtension()).getSendTimeout().toMillis());
        handler.setSendFailureChannel(errorChannel);
        String instrumentationId = Instrumentation.buildId((Instrumentation.Type)Instrumentation.Type.PRODUCER, (String)destination.getName());
        handler.setSendCallback((ListenableFutureCallback)new InstrumentationSendCallback(instrumentationId, this.instrumentationManager));
        if (producerProperties.isPartitioned()) {
            handler.setPartitionIdExpression(EXPRESSION_PARSER.parseExpression("headers['scst_partition']"));
        }
        return handler;
    }

    protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<EventHubsConsumerProperties> properties) {
        boolean anonymous;
        Assert.notNull((Object)this.getProcessorFactory(), (String)"processor factory can't be null when create a consumer");
        boolean bl = anonymous = !StringUtils.hasText((String)group);
        if (anonymous) {
            group = "anonymous." + String.valueOf(UUID.randomUUID());
        }
        EventHubsContainerProperties containerProperties = this.createContainerProperties(destination, (String)group, properties);
        EventHubsMessageListenerContainer listenerContainer = new EventHubsMessageListenerContainer(this.getProcessorFactory(), containerProperties);
        this.eventHubsMessageListenerContainers.add(listenerContainer);
        EventHubsInboundChannelAdapter inboundAdapter = properties.isBatchMode() ? new EventHubsInboundChannelAdapter(listenerContainer, ListenerMode.BATCH) : new EventHubsInboundChannelAdapter(listenerContainer);
        inboundAdapter.setBeanFactory((BeanFactory)this.getBeanFactory());
        String instrumentationId = Instrumentation.buildId((Instrumentation.Type)Instrumentation.Type.CONSUMER, (String)(destination.getName() + "/" + (String)group));
        inboundAdapter.setInstrumentationManager(this.instrumentationManager);
        inboundAdapter.setInstrumentationId(instrumentationId);
        AbstractMessageChannelBinder.ErrorInfrastructure errorInfrastructure = this.registerErrorInfrastructure(destination, (String)group, (ConsumerProperties)properties);
        inboundAdapter.setErrorChannel((MessageChannel)errorInfrastructure.getErrorChannel());
        return inboundAdapter;
    }

    private EventHubsContainerProperties createContainerProperties(ConsumerDestination destination, String group, ExtendedConsumerProperties<EventHubsConsumerProperties> properties) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        AzurePropertiesUtils.copyAzureCommonProperties((AzureProperties)((AzureProperties)properties.getExtension()), (AzureProperties)containerProperties);
        ProcessorPropertiesMerger.copyProcessorPropertiesIfNotNull((ProcessorProperties)((ProcessorProperties)properties.getExtension()), (ProcessorProperties)containerProperties);
        containerProperties.setEventHubName(destination.getName());
        containerProperties.setConsumerGroup(group);
        containerProperties.setCheckpointConfig(((EventHubsConsumerProperties)properties.getExtension()).getCheckpoint());
        return containerProperties;
    }

    public EventHubsConsumerProperties getExtendedConsumerProperties(String destination) {
        return (EventHubsConsumerProperties)this.bindingProperties.getExtendedConsumerProperties(destination);
    }

    public EventHubsProducerProperties getExtendedProducerProperties(String destination) {
        return (EventHubsProducerProperties)this.bindingProperties.getExtendedProducerProperties(destination);
    }

    public String getDefaultsPrefix() {
        return this.bindingProperties.getDefaultsPrefix();
    }

    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return this.bindingProperties.getExtendedPropertiesEntryClass();
    }

    public void setBindingProperties(EventHubsExtendedBindingProperties bindingProperties) {
        this.bindingProperties = bindingProperties;
    }

    private PropertiesSupplier<String, ProducerProperties> getProducerPropertiesSupplier() {
        return key -> {
            if (this.extendedProducerPropertiesMap.containsKey(key)) {
                EventHubsProducerProperties producerProperties = (EventHubsProducerProperties)this.extendedProducerPropertiesMap.get(key).getExtension();
                producerProperties.setEventHubName(key);
                return producerProperties;
            }
            LOGGER.debug("Can't find extended properties for {}", key);
            return null;
        };
    }

    private EventHubsTemplate getEventHubTemplate() {
        if (this.eventHubsTemplate == null) {
            DefaultEventHubsNamespaceProducerFactory factory = new DefaultEventHubsNamespaceProducerFactory(this.namespaceProperties, this.getProducerPropertiesSupplier());
            this.producerFactoryCustomizers.forEach(customizer -> customizer.customize((EventHubsProducerFactory)factory));
            factory.addListener((name, producerAsyncClient) -> {
                DefaultInstrumentation instrumentation = new DefaultInstrumentation(name, Instrumentation.Type.PRODUCER);
                instrumentation.setStatus(Instrumentation.Status.UP);
                this.instrumentationManager.addHealthInstrumentation((Instrumentation)instrumentation);
            });
            this.eventHubsTemplate = new EventHubsTemplate((EventHubsProducerFactory)factory);
        }
        return this.eventHubsTemplate;
    }

    private EventHubsProcessorFactory getProcessorFactory() {
        if (this.processorFactory == null) {
            this.processorFactory = new DefaultEventHubsNamespaceProcessorFactory(this.checkpointStore, this.namespaceProperties);
            this.processorFactoryCustomizers.forEach(customizer -> customizer.customize((EventHubsProcessorFactory)this.processorFactory));
            this.processorFactory.addListener((name, consumerGroup, processorClient) -> {
                String instrumentationName = name + "/" + consumerGroup;
                EventHubsProcessorInstrumentation instrumentation = new EventHubsProcessorInstrumentation(instrumentationName, Instrumentation.Type.CONSUMER, Duration.ofMinutes(2L));
                instrumentation.setStatus(Instrumentation.Status.UP);
                this.instrumentationManager.addHealthInstrumentation((Instrumentation)instrumentation);
            });
        }
        return this.processorFactory;
    }

    public void setNamespaceProperties(NamespaceProperties namespaceProperties) {
        this.namespaceProperties = namespaceProperties;
    }

    public void setCheckpointStore(CheckpointStore checkpointStore) {
        this.checkpointStore = checkpointStore;
    }

    InstrumentationManager getInstrumentationManager() {
        return this.instrumentationManager;
    }

    public void addProducerFactoryCustomizer(EventHubsProducerFactoryCustomizer producerFactoryCustomizer) {
        if (producerFactoryCustomizer != null) {
            this.producerFactoryCustomizers.add(producerFactoryCustomizer);
        }
    }

    public void addProcessorFactoryCustomizer(EventHubsProcessorFactoryCustomizer processorFactoryCustomizer) {
        if (processorFactoryCustomizer != null) {
            this.processorFactoryCustomizers.add(processorFactoryCustomizer);
        }
    }
}

