package org.apache.camel.component.azure.servicebus;

import com.azure.core.util.BinaryData;
import java.io.File;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.azure.servicebus.client.ServiceBusClientFactory;
import org.apache.camel.component.azure.servicebus.client.ServiceBusSenderAsyncClientWrapper;
import org.apache.camel.component.azure.servicebus.operations.ServiceBusSenderOperations;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/camel/component/azure/servicebus/ServiceBusProducer.class */
public class ServiceBusProducer extends DefaultAsyncProducer {
    private static final Logger LOG = LoggerFactory.getLogger(ServiceBusProducer.class);
    private ServiceBusSenderAsyncClientWrapper senderClientWrapper;
    private ServiceBusConfigurationOptionsProxy configurationOptionsProxy;
    private ServiceBusSenderOperations serviceBusSenderOperations;
    private final Map<ServiceBusProducerOperationDefinition, BiConsumer<Exchange, AsyncCallback>> operationsToExecute;

    public ServiceBusProducer(Endpoint endpoint) {
        super(endpoint);
        this.operationsToExecute = new EnumMap(ServiceBusProducerOperationDefinition.class);
        bind(ServiceBusProducerOperationDefinition.sendMessages, sendMessages());
        bind(ServiceBusProducerOperationDefinition.scheduleMessages, scheduleMessages());
    }

    protected void doInit() throws Exception {
        super.doInit();
        this.configurationOptionsProxy = new ServiceBusConfigurationOptionsProxy(getConfiguration());
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.senderClientWrapper = new ServiceBusSenderAsyncClientWrapper(getConfiguration().getSenderAsyncClient() != null ? getConfiguration().getSenderAsyncClient() : ServiceBusClientFactory.createServiceBusSenderAsyncClient(getConfiguration()));
        this.serviceBusSenderOperations = new ServiceBusSenderOperations(this.senderClientWrapper);
    }

    public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
        try {
            invokeOperation(this.configurationOptionsProxy.getServiceBusProducerOperationDefinition(exchange), exchange, asyncCallback);
            return false;
        } catch (Exception e) {
            exchange.setException(e);
            asyncCallback.done(true);
            return true;
        }
    }

    protected void doStop() throws Exception {
        if (this.senderClientWrapper != null) {
            this.senderClientWrapper.close();
        }
        super.doStop();
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public ServiceBusEndpoint m6getEndpoint() {
        return super.getEndpoint();
    }

    public ServiceBusConfiguration getConfiguration() {
        return m6getEndpoint().getConfiguration();
    }

    private void bind(ServiceBusProducerOperationDefinition serviceBusProducerOperationDefinition, BiConsumer<Exchange, AsyncCallback> biConsumer) {
        this.operationsToExecute.put(serviceBusProducerOperationDefinition, biConsumer);
    }

    private void invokeOperation(ServiceBusProducerOperationDefinition serviceBusProducerOperationDefinition, Exchange exchange, AsyncCallback asyncCallback) {
        ServiceBusProducerOperationDefinition serviceBusProducerOperationDefinition2 = ObjectHelper.isEmpty(serviceBusProducerOperationDefinition) ? ServiceBusProducerOperationDefinition.sendMessages : serviceBusProducerOperationDefinition;
        BiConsumer<Exchange, AsyncCallback> biConsumer = this.operationsToExecute.get(serviceBusProducerOperationDefinition2);
        if (biConsumer == null) {
            throw new RuntimeCamelException("Operation not supported. Value: " + serviceBusProducerOperationDefinition2);
        }
        biConsumer.accept(exchange, asyncCallback);
    }

    private BiConsumer<Exchange, AsyncCallback> sendMessages() {
        return (exchange, asyncCallback) -> {
            Mono<Void> sendMessages;
            Object body = exchange.getMessage().getBody();
            Map<String, Object> map = (Map) exchange.getMessage().getHeader(ServiceBusConstants.APPLICATION_PROPERTIES, Map.class);
            if (map == null) {
                map = new HashMap();
            }
            propagateHeaders(exchange, map);
            String str = (String) exchange.getMessage().getHeader(ServiceBusConstants.CORRELATION_ID, String.class);
            if (body instanceof Iterable) {
                sendMessages = this.serviceBusSenderOperations.sendMessages((Object) convertBodyToList((Iterable) body), this.configurationOptionsProxy.getServiceBusTransactionContext(exchange), map, str);
            } else {
                sendMessages = this.serviceBusSenderOperations.sendMessages(body instanceof BinaryData ? body : getConfiguration().isBinary() ? convertBodyToBinary(exchange) : exchange.getMessage().getBody(String.class), this.configurationOptionsProxy.getServiceBusTransactionContext(exchange), map, str);
            }
            subscribeToMono(sendMessages, exchange, r1 -> {
            }, asyncCallback);
        };
    }

    private BiConsumer<Exchange, AsyncCallback> scheduleMessages() {
        return (exchange, asyncCallback) -> {
            Mono<List<Long>> scheduleMessages;
            Object body = exchange.getMessage().getBody();
            Map<String, Object> map = (Map) exchange.getMessage().getHeader(ServiceBusConstants.APPLICATION_PROPERTIES, Map.class);
            if (map == null) {
                map = new HashMap();
            }
            propagateHeaders(exchange, map);
            String str = (String) exchange.getMessage().getHeader(ServiceBusConstants.CORRELATION_ID, String.class);
            if (body instanceof Iterable) {
                scheduleMessages = this.serviceBusSenderOperations.scheduleMessages((Object) convertBodyToList((Iterable) body), this.configurationOptionsProxy.getScheduledEnqueueTime(exchange), this.configurationOptionsProxy.getServiceBusTransactionContext(exchange), map, str);
            } else {
                scheduleMessages = this.serviceBusSenderOperations.scheduleMessages(body instanceof BinaryData ? body : getConfiguration().isBinary() ? convertBodyToBinary(exchange) : exchange.getMessage().getBody(String.class), this.configurationOptionsProxy.getScheduledEnqueueTime(exchange), this.configurationOptionsProxy.getServiceBusTransactionContext(exchange), map, str);
            }
            subscribeToMono(scheduleMessages, exchange, list -> {
                exchange.getMessage().setBody(list);
            }, asyncCallback);
        };
    }

    private List<?> convertBodyToList(Iterable<?> iterable) {
        return StreamSupport.stream(iterable.spliterator(), false).map(this::convertMessageBody).toList();
    }

    private Object convertBodyToBinary(Exchange exchange) {
        Object body = exchange.getMessage().getBody();
        return body instanceof InputStream ? BinaryData.fromStream((InputStream) body) : body instanceof Path ? BinaryData.fromFile((Path) body) : body instanceof File ? BinaryData.fromFile(((File) body).toPath()) : BinaryData.fromBytes((byte[]) exchange.getMessage().getBody(byte[].class));
    }

    private Object convertMessageBody(Object obj) {
        TypeConverter typeConverter = m6getEndpoint().getCamelContext().getTypeConverter();
        return obj instanceof BinaryData ? obj : getConfiguration().isBinary() ? obj instanceof InputStream ? BinaryData.fromStream((InputStream) obj) : obj instanceof Path ? BinaryData.fromFile((Path) obj) : obj instanceof File ? BinaryData.fromFile(((File) obj).toPath()) : typeConverter.convertTo(byte[].class, obj) : typeConverter.convertTo(String.class, obj);
    }

    private void propagateHeaders(Exchange exchange, Map<String, Object> map) {
        HeaderFilterStrategy headerFilterStrategy = getConfiguration().getHeaderFilterStrategy();
        map.putAll((Map) exchange.getMessage().getHeaders().entrySet().stream().filter(entry -> {
            return !headerFilterStrategy.applyFilterToCamelHeaders((String) entry.getKey(), entry.getValue(), exchange);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
    }

    private <T> void subscribeToMono(Mono<T> mono, Exchange exchange, Consumer<T> consumer, AsyncCallback asyncCallback) {
        mono.subscribe(consumer, th -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Error processing async exchange with error: {}", th.getMessage());
            }
            exchange.setException(th);
            asyncCallback.done(false);
        }, () -> {
            LOG.trace("All events with exchange have been sent successfully.");
            asyncCallback.done(false);
        });
    }
}
