package com.azure.spring.messaging.eventhubs.core;

import com.azure.core.amqp.exception.AmqpException;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import com.azure.spring.messaging.converter.AzureMessageConverter;
import com.azure.spring.messaging.core.SendOperation;
import com.azure.spring.messaging.eventhubs.implementation.support.converter.EventHubsMessageConverter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/spring/messaging/eventhubs/core/EventHubsTemplate.class */
public class EventHubsTemplate implements SendOperation {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubsTemplate.class);
    private final EventHubsProducerFactory producerFactory;
    private AzureMessageConverter<EventData, EventData> messageConverter = new EventHubsMessageConverter();

    public EventHubsTemplate(EventHubsProducerFactory eventHubsProducerFactory) {
        this.producerFactory = eventHubsProducerFactory;
    }

    public <T> Mono<Void> sendAsync(String str, Collection<Message<T>> collection, PartitionSupplier partitionSupplier) {
        return doSend(str, (List) collection.stream().map(message -> {
            return (EventData) this.messageConverter.fromMessage(message, EventData.class);
        }).collect(Collectors.toList()), partitionSupplier);
    }

    public <T> Mono<Void> sendAsync(String str, Collection<Message<T>> collection) {
        return sendAsync(str, collection, null);
    }

    public <T> void send(String str, Collection<Message<T>> collection, PartitionSupplier partitionSupplier) {
        sendAsync(str, collection, partitionSupplier).block();
    }

    public <T> void send(String str, Collection<Message<T>> collection) {
        send(str, collection, null);
    }

    public <T> Mono<Void> sendAsync(String str, Message<T> message) {
        return sendAsync(str, Collections.singleton(message), buildPartitionSupplier(message));
    }

    private Mono<Void> doSend(String str, List<EventData> list, PartitionSupplier partitionSupplier) {
        EventHubProducerAsyncClient createProducer = this.producerFactory.createProducer(str);
        CreateBatchOptions buildCreateBatchOptions = buildCreateBatchOptions(partitionSupplier);
        try {
            AtomicReference atomicReference = new AtomicReference((EventDataBatch) createProducer.createBatch(buildCreateBatchOptions).block());
            Flux.fromIterable(list).flatMap(eventData -> {
                EventDataBatch eventDataBatch = (EventDataBatch) atomicReference.get();
                try {
                    if (eventDataBatch.tryAdd(eventData)) {
                        return Mono.empty();
                    }
                    LOGGER.warn("EventDataBatch is full in the collect process or the first event is too large to fit in an empty batch! Max size: {}", Integer.valueOf(eventDataBatch.getMaxSizeInBytes()));
                    return Mono.when(new Publisher[]{createProducer.send(eventDataBatch), createProducer.createBatch(buildCreateBatchOptions).map(eventDataBatch2 -> {
                        atomicReference.set(eventDataBatch2);
                        try {
                            if (!eventDataBatch2.tryAdd(eventData)) {
                                LOGGER.error("Event was too large to fit in an empty batch. Max size:{} ", Integer.valueOf(eventDataBatch2.getMaxSizeInBytes()));
                            }
                        } catch (AmqpException e) {
                            LOGGER.error("Event was too large to fit in an empty batch. Max size:{}", Integer.valueOf(eventDataBatch2.getMaxSizeInBytes()), e);
                        }
                        return eventDataBatch2;
                    })});
                } catch (AmqpException e) {
                    LOGGER.error("Event is larger than maximum allowed size.", e);
                    return Mono.empty();
                }
            }).then().block();
            return createProducer.send((EventDataBatch) atomicReference.getAndSet(null));
        } catch (Exception e) {
            LOGGER.error("EventDataBatch create error.", e);
            return Mono.error(e);
        }
    }

    private CreateBatchOptions buildCreateBatchOptions(PartitionSupplier partitionSupplier) {
        return new CreateBatchOptions().setPartitionId(partitionSupplier != null ? partitionSupplier.getPartitionId() : null).setPartitionKey(partitionSupplier != null ? partitionSupplier.getPartitionKey() : null);
    }

    <T> PartitionSupplier buildPartitionSupplier(Message<T> message) {
        PartitionSupplier partitionSupplier = new PartitionSupplier();
        Optional.ofNullable(message.getHeaders().get("azure_partition_key")).ifPresent(obj -> {
            partitionSupplier.setPartitionKey(String.valueOf(obj));
        });
        Optional.ofNullable(message.getHeaders().get("azure_partition_id")).ifPresent(obj2 -> {
            partitionSupplier.setPartitionId(String.valueOf(obj2));
        });
        return partitionSupplier;
    }

    public void setMessageConverter(AzureMessageConverter<EventData, EventData> azureMessageConverter) {
        this.messageConverter = azureMessageConverter;
    }
}
