/*
 * Decompiled with CFR 0.152.
 */
package com.azure.spring.messaging.eventhubs.core;

import com.azure.core.amqp.exception.AmqpException;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import com.azure.spring.messaging.converter.AzureMessageConverter;
import com.azure.spring.messaging.core.SendOperation;
import com.azure.spring.messaging.eventhubs.core.EventHubsProducerFactory;
import com.azure.spring.messaging.eventhubs.core.PartitionSupplier;
import com.azure.spring.messaging.eventhubs.implementation.support.converter.EventHubsMessageConverter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class EventHubsTemplate
implements SendOperation {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubsTemplate.class);
    private final EventHubsProducerFactory producerFactory;
    private AzureMessageConverter<EventData, EventData> messageConverter = new EventHubsMessageConverter();

    public EventHubsTemplate(EventHubsProducerFactory producerFactory) {
        this.producerFactory = producerFactory;
    }

    public <T> Mono<Void> sendAsync(String destination, Collection<Message<T>> messages, PartitionSupplier partitionSupplier) {
        List<EventData> eventData = messages.stream().map(m -> (EventData)this.messageConverter.fromMessage(m, EventData.class)).collect(Collectors.toList());
        return this.doSend(destination, eventData, partitionSupplier);
    }

    public <T> Mono<Void> sendAsync(String destination, Collection<Message<T>> messages) {
        return this.sendAsync(destination, messages, null);
    }

    public <T> void send(String destination, Collection<Message<T>> messages, PartitionSupplier partitionSupplier) {
        this.sendAsync(destination, messages, partitionSupplier).block();
    }

    public <T> void send(String destination, Collection<Message<T>> messages) {
        this.send(destination, messages, null);
    }

    public <T> Mono<Void> sendAsync(String destination, Message<T> message) {
        return this.sendAsync(destination, Collections.singleton(message), this.buildPartitionSupplier(message));
    }

    private Mono<Void> doSend(String destination, List<EventData> events, PartitionSupplier partitionSupplier) {
        EventHubProducerAsyncClient producer = this.producerFactory.createProducer(destination);
        CreateBatchOptions options = this.buildCreateBatchOptions(partitionSupplier);
        AtomicReference currentBatch = new AtomicReference();
        return producer.createBatch(options).doOnSuccess(eventDataBatch -> currentBatch.set(eventDataBatch)).then(Flux.fromIterable(events).flatMap(event -> {
            EventDataBatch batch = (EventDataBatch)currentBatch.get();
            try {
                if (batch.tryAdd(event)) {
                    return Mono.empty();
                }
                LOGGER.warn("EventDataBatch is full in the collect process or the first event is too large to fit in an empty batch! Max size: {}", (Object)batch.getMaxSizeInBytes());
            }
            catch (AmqpException e) {
                LOGGER.error("Event is larger than maximum allowed size.", (Throwable)e);
                return Mono.empty();
            }
            return Mono.when((Publisher[])new Publisher[]{producer.send(batch), producer.createBatch(options).map(newBatch -> {
                currentBatch.set(newBatch);
                try {
                    if (!newBatch.tryAdd(event)) {
                        LOGGER.error("Event was too large to fit in an empty batch. Max size:{} ", (Object)newBatch.getMaxSizeInBytes());
                    }
                }
                catch (AmqpException e) {
                    LOGGER.error("Event was too large to fit in an empty batch. Max size:{}", (Object)newBatch.getMaxSizeInBytes(), (Object)e);
                }
                return newBatch;
            })});
        }).then(Mono.defer(() -> producer.send((EventDataBatch)currentBatch.getAndSet(null)))));
    }

    private CreateBatchOptions buildCreateBatchOptions(PartitionSupplier partitionSupplier) {
        return new CreateBatchOptions().setPartitionId(partitionSupplier != null ? partitionSupplier.getPartitionId() : null).setPartitionKey(partitionSupplier != null ? partitionSupplier.getPartitionKey() : null);
    }

    <T> PartitionSupplier buildPartitionSupplier(Message<T> message) {
        PartitionSupplier partitionSupplier = new PartitionSupplier();
        Optional.ofNullable(message.getHeaders().get((Object)"azure_partition_key")).ifPresent(s -> partitionSupplier.setPartitionKey(String.valueOf(s)));
        Optional.ofNullable(message.getHeaders().get((Object)"azure_partition_id")).ifPresent(s -> partitionSupplier.setPartitionId(String.valueOf(s)));
        return partitionSupplier;
    }

    public void setMessageConverter(AzureMessageConverter<EventData, EventData> messageConverter) {
        this.messageConverter = messageConverter;
    }
}

