package com.azure.messaging.eventhubs;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.ErrorContextProvider;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import com.azure.messaging.eventhubs.models.SendOptions;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:com/azure/messaging/eventhubs/EventHubProducerAsyncClient.class */
public class EventHubProducerAsyncClient implements Closeable {
    private static final int MAX_PARTITION_KEY_LENGTH = 128;
    private static final String SENDER_ENTITY_PATH_FORMAT = "%s/Partitions/%s";
    private static final SendOptions DEFAULT_SEND_OPTIONS = new SendOptions();
    private static final CreateBatchOptions DEFAULT_BATCH_OPTIONS = new CreateBatchOptions();
    private final ClientLogger logger = new ClientLogger(EventHubProducerAsyncClient.class);
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final String fullyQualifiedNamespace;
    private final String eventHubName;
    private final EventHubConnectionProcessor connectionProcessor;
    private final AmqpRetryOptions retryOptions;
    private final AmqpRetryPolicy retryPolicy;
    private final TracerProvider tracerProvider;
    private final MessageSerializer messageSerializer;
    private final Scheduler scheduler;
    private final boolean isSharedConnection;
    private final Runnable onClientClose;

    /* loaded from: input_file:com/azure/messaging/eventhubs/EventHubProducerAsyncClient$EventDataCollector.class */
    private static class EventDataCollector implements Collector<EventData, List<EventDataBatch>, List<EventDataBatch>> {
        private final String partitionKey;
        private final String partitionId;
        private final int maxMessageSize;
        private final Integer maxNumberOfBatches;
        private final ErrorContextProvider contextProvider;
        private final TracerProvider tracerProvider;
        private final String entityPath;
        private final String hostname;
        private volatile EventDataBatch currentBatch;

        EventDataCollector(CreateBatchOptions createBatchOptions, Integer num, ErrorContextProvider errorContextProvider, TracerProvider tracerProvider, String str, String str2) {
            this.maxNumberOfBatches = num;
            this.maxMessageSize = createBatchOptions.getMaximumSizeInBytes() > 0 ? createBatchOptions.getMaximumSizeInBytes() : ClientConstants.MAX_MESSAGE_LENGTH_BYTES;
            this.partitionKey = createBatchOptions.getPartitionKey();
            this.partitionId = createBatchOptions.getPartitionId();
            this.contextProvider = errorContextProvider;
            this.tracerProvider = tracerProvider;
            this.entityPath = str;
            this.hostname = str2;
            this.currentBatch = new EventDataBatch(this.maxMessageSize, this.partitionId, this.partitionKey, errorContextProvider, tracerProvider, str, str2);
        }

        @Override // java.util.stream.Collector
        public Supplier<List<EventDataBatch>> supplier() {
            return ArrayList::new;
        }

        @Override // java.util.stream.Collector
        public BiConsumer<List<EventDataBatch>, EventData> accumulator() {
            return (list, eventData) -> {
                EventDataBatch eventDataBatch = this.currentBatch;
                if (eventDataBatch.tryAdd(eventData)) {
                    return;
                }
                if (this.maxNumberOfBatches != null && list.size() == this.maxNumberOfBatches.intValue()) {
                    throw new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, String.format(Locale.US, Messages.EVENT_DATA_DOES_NOT_FIT, this.maxNumberOfBatches), this.contextProvider.getErrorContext());
                }
                this.currentBatch = new EventDataBatch(this.maxMessageSize, this.partitionId, this.partitionKey, this.contextProvider, this.tracerProvider, this.entityPath, this.hostname);
                this.currentBatch.tryAdd(eventData);
                list.add(eventDataBatch);
            };
        }

        @Override // java.util.stream.Collector
        public BinaryOperator<List<EventDataBatch>> combiner() {
            return (list, list2) -> {
                list.addAll(list2);
                return list;
            };
        }

        @Override // java.util.stream.Collector
        public Function<List<EventDataBatch>, List<EventDataBatch>> finisher() {
            return list -> {
                EventDataBatch eventDataBatch = this.currentBatch;
                this.currentBatch = null;
                if (eventDataBatch != null) {
                    list.add(eventDataBatch);
                }
                return list;
            };
        }

        @Override // java.util.stream.Collector
        public Set<Collector.Characteristics> characteristics() {
            return Collections.emptySet();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventHubProducerAsyncClient(String str, String str2, EventHubConnectionProcessor eventHubConnectionProcessor, AmqpRetryOptions amqpRetryOptions, TracerProvider tracerProvider, MessageSerializer messageSerializer, Scheduler scheduler, boolean z, Runnable runnable) {
        this.fullyQualifiedNamespace = (String) Objects.requireNonNull(str, "'fullyQualifiedNamespace' cannot be null.");
        this.eventHubName = (String) Objects.requireNonNull(str2, "'eventHubName' cannot be null.");
        this.connectionProcessor = (EventHubConnectionProcessor) Objects.requireNonNull(eventHubConnectionProcessor, "'connectionProcessor' cannot be null.");
        this.retryOptions = (AmqpRetryOptions) Objects.requireNonNull(amqpRetryOptions, "'retryOptions' cannot be null.");
        this.tracerProvider = (TracerProvider) Objects.requireNonNull(tracerProvider, "'tracerProvider' cannot be null.");
        this.messageSerializer = (MessageSerializer) Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.onClientClose = (Runnable) Objects.requireNonNull(runnable, "'onClientClose' cannot be null.");
        this.retryPolicy = RetryUtil.getRetryPolicy(amqpRetryOptions);
        this.scheduler = scheduler;
        this.isSharedConnection = z;
    }

    public String getFullyQualifiedNamespace() {
        return this.fullyQualifiedNamespace;
    }

    public String getEventHubName() {
        return this.eventHubName;
    }

    public Mono<EventHubProperties> getEventHubProperties() {
        return this.connectionProcessor.flatMap(eventHubAmqpConnection -> {
            return eventHubAmqpConnection.getManagementNode();
        }).flatMap((v0) -> {
            return v0.getEventHubProperties();
        });
    }

    public Flux<String> getPartitionIds() {
        return getEventHubProperties().flatMapMany(eventHubProperties -> {
            return Flux.fromIterable(eventHubProperties.getPartitionIds());
        });
    }

    public Mono<PartitionProperties> getPartitionProperties(String str) {
        return this.connectionProcessor.flatMap(eventHubAmqpConnection -> {
            return eventHubAmqpConnection.getManagementNode();
        }).flatMap(eventHubManagementNode -> {
            return eventHubManagementNode.getPartitionProperties(str);
        });
    }

    public Mono<EventDataBatch> createBatch() {
        return createBatch(DEFAULT_BATCH_OPTIONS);
    }

    public Mono<EventDataBatch> createBatch(CreateBatchOptions createBatchOptions) {
        if (createBatchOptions == null) {
            return FluxUtil.monoError(this.logger, new NullPointerException("'options' cannot be null."));
        }
        String partitionKey = createBatchOptions.getPartitionKey();
        String partitionId = createBatchOptions.getPartitionId();
        int maximumSizeInBytes = createBatchOptions.getMaximumSizeInBytes();
        return (CoreUtils.isNullOrEmpty(partitionKey) || CoreUtils.isNullOrEmpty(partitionId)) ? (CoreUtils.isNullOrEmpty(partitionKey) || partitionKey.length() <= MAX_PARTITION_KEY_LENGTH) ? getSendLink(partitionId).flatMap(amqpSendLink -> {
            return amqpSendLink.getLinkSize().flatMap(num -> {
                int intValue = num.intValue() > 0 ? num.intValue() : ClientConstants.MAX_MESSAGE_LENGTH_BYTES;
                if (maximumSizeInBytes > intValue) {
                    return FluxUtil.monoError(this.logger, new IllegalArgumentException(String.format(Locale.US, "BatchOptions.maximumSizeInBytes (%s bytes) is larger than the link size (%s bytes).", Integer.valueOf(maximumSizeInBytes), Integer.valueOf(intValue))));
                }
                int i = maximumSizeInBytes > 0 ? maximumSizeInBytes : intValue;
                Objects.requireNonNull(amqpSendLink);
                return Mono.just(new EventDataBatch(i, partitionId, partitionKey, amqpSendLink::getErrorContext, this.tracerProvider, amqpSendLink.getEntityPath(), amqpSendLink.getHostname()));
            });
        }) : FluxUtil.monoError(this.logger, new IllegalArgumentException(String.format(Locale.US, "Partition key '%s' exceeds the maximum allowed length: '%s'.", partitionKey, Integer.valueOf(MAX_PARTITION_KEY_LENGTH)))) : FluxUtil.monoError(this.logger, new IllegalArgumentException(String.format(Locale.US, "CreateBatchOptions.getPartitionKey() and CreateBatchOptions.getPartitionId() are both set. Only one or the other can be used. partitionKey: '%s'. partitionId: '%s'", partitionKey, partitionId)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> send(EventData eventData) {
        return eventData == null ? FluxUtil.monoError(this.logger, new NullPointerException("'event' cannot be null.")) : send(Flux.just(eventData));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> send(EventData eventData, SendOptions sendOptions) {
        return eventData == null ? FluxUtil.monoError(this.logger, new NullPointerException("'event' cannot be null.")) : sendOptions == null ? FluxUtil.monoError(this.logger, new NullPointerException("'options' cannot be null.")) : send(Flux.just(eventData), sendOptions);
    }

    public Mono<Void> send(Iterable<EventData> iterable) {
        return iterable == null ? FluxUtil.monoError(this.logger, new NullPointerException("'events' cannot be null.")) : send(Flux.fromIterable(iterable));
    }

    public Mono<Void> send(Iterable<EventData> iterable, SendOptions sendOptions) {
        return iterable == null ? FluxUtil.monoError(this.logger, new NullPointerException("'events' cannot be null.")) : sendOptions == null ? FluxUtil.monoError(this.logger, new NullPointerException("'options' cannot be null.")) : send(Flux.fromIterable(iterable), sendOptions);
    }

    Mono<Void> send(Flux<EventData> flux) {
        return flux == null ? FluxUtil.monoError(this.logger, new NullPointerException("'events' cannot be null.")) : send(flux, DEFAULT_SEND_OPTIONS);
    }

    Mono<Void> send(Flux<EventData> flux, SendOptions sendOptions) {
        return flux == null ? FluxUtil.monoError(this.logger, new NullPointerException("'events' cannot be null.")) : sendOptions == null ? FluxUtil.monoError(this.logger, new NullPointerException("'options' cannot be null.")) : sendInternal(flux, sendOptions).publishOn(this.scheduler);
    }

    public Mono<Void> send(EventDataBatch eventDataBatch) {
        if (eventDataBatch == null) {
            return FluxUtil.monoError(this.logger, new NullPointerException("'batch' cannot be null."));
        }
        if (eventDataBatch.getEvents().isEmpty()) {
            this.logger.warning(Messages.CANNOT_SEND_EVENT_BATCH_EMPTY);
            return Mono.empty();
        }
        if (!CoreUtils.isNullOrEmpty(eventDataBatch.getPartitionId())) {
            this.logger.verbose("Sending batch with size[{}] to partitionId[{}].", new Object[]{Integer.valueOf(eventDataBatch.getCount()), eventDataBatch.getPartitionId()});
        } else if (CoreUtils.isNullOrEmpty(eventDataBatch.getPartitionKey())) {
            this.logger.verbose("Sending batch with size[{}] to be distributed round-robin in service.", new Object[]{Integer.valueOf(eventDataBatch.getCount())});
        } else {
            this.logger.verbose("Sending batch with size[{}] with partitionKey[{}].", new Object[]{Integer.valueOf(eventDataBatch.getCount()), eventDataBatch.getPartitionKey()});
        }
        String partitionKey = eventDataBatch.getPartitionKey();
        boolean isEnabled = this.tracerProvider.isEnabled();
        AtomicReference atomicReference = isEnabled ? new AtomicReference(Context.NONE) : null;
        Context context = null;
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < eventDataBatch.getEvents().size(); i++) {
            EventData eventData = eventDataBatch.getEvents().get(i);
            if (isEnabled) {
                atomicReference.set(eventData.getContext());
                if (i == 0) {
                    context = this.tracerProvider.getSharedSpanBuilder(ClientConstants.AZ_TRACING_SERVICE_NAME, (Context) atomicReference.get());
                }
                this.tracerProvider.addSpanLinks(context.addData("span-context", eventData.getContext()));
            }
            Message serialize = this.messageSerializer.serialize(eventData);
            if (!CoreUtils.isNullOrEmpty(partitionKey)) {
                MessageAnnotations messageAnnotations = serialize.getMessageAnnotations() == null ? new MessageAnnotations(new HashMap()) : serialize.getMessageAnnotations();
                messageAnnotations.getValue().put(AmqpConstants.PARTITION_KEY, partitionKey);
                serialize.setMessageAnnotations(messageAnnotations);
            }
            arrayList.add(serialize);
        }
        if (isEnabled) {
            atomicReference.set(this.tracerProvider.startSpan(ClientConstants.AZ_TRACING_SERVICE_NAME, context == null ? Context.NONE : context.addData("entity-path", this.eventHubName).addData("hostname", this.fullyQualifiedNamespace).addData("az.namespace", ClientConstants.AZ_NAMESPACE_VALUE), ProcessKind.SEND));
        }
        return RetryUtil.withRetry(getSendLink(eventDataBatch.getPartitionId()).flatMap(amqpSendLink -> {
            return arrayList.size() == 1 ? amqpSendLink.send((Message) arrayList.get(0)) : amqpSendLink.send(arrayList);
        }), this.retryOptions, String.format("partitionId[%s]: Sending messages timed out.", eventDataBatch.getPartitionId())).publishOn(this.scheduler).doOnEach(signal -> {
            if (isEnabled) {
                this.tracerProvider.endSpan((Context) atomicReference.get(), signal);
            }
        });
    }

    private Mono<Void> sendInternal(Flux<EventData> flux, SendOptions sendOptions) {
        String partitionKey = sendOptions.getPartitionKey();
        String partitionId = sendOptions.getPartitionId();
        return (CoreUtils.isNullOrEmpty(partitionKey) || CoreUtils.isNullOrEmpty(partitionId)) ? getSendLink(sendOptions.getPartitionId()).flatMap(amqpSendLink -> {
            return amqpSendLink.getLinkSize().flatMap(num -> {
                CreateBatchOptions maximumSizeInBytes = new CreateBatchOptions().setPartitionKey(sendOptions.getPartitionKey()).setPartitionId(sendOptions.getPartitionId()).setMaximumSizeInBytes(num.intValue() > 0 ? num.intValue() : ClientConstants.MAX_MESSAGE_LENGTH_BYTES);
                Objects.requireNonNull(amqpSendLink);
                return flux.collect(new EventDataCollector(maximumSizeInBytes, 1, amqpSendLink::getErrorContext, this.tracerProvider, amqpSendLink.getEntityPath(), amqpSendLink.getHostname()));
            }).flatMap(list -> {
                return sendInternal(Flux.fromIterable(list));
            });
        }) : FluxUtil.monoError(this.logger, new IllegalArgumentException(String.format(Locale.US, "SendOptions.getPartitionKey() and SendOptions.getPartitionId() are both set. Only one or the other can be used. partitionKey: '%s'. partitionId: '%s'", partitionKey, partitionId)));
    }

    private Mono<Void> sendInternal(Flux<EventDataBatch> flux) {
        return flux.flatMap(this::send).then().doOnError(th -> {
            this.logger.error(Messages.ERROR_SENDING_BATCH, new Object[]{th});
        });
    }

    private String getEntityPath(String str) {
        return CoreUtils.isNullOrEmpty(str) ? this.eventHubName : String.format(Locale.US, SENDER_ENTITY_PATH_FORMAT, this.eventHubName, str);
    }

    private Mono<AmqpSendLink> getSendLink(String str) {
        String entityPath = getEntityPath(str);
        String entityPath2 = getEntityPath(str);
        return this.connectionProcessor.flatMap(eventHubAmqpConnection -> {
            return eventHubAmqpConnection.createSendLink(entityPath2, entityPath, this.retryOptions);
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        if (this.isSharedConnection) {
            this.onClientClose.run();
        } else {
            this.connectionProcessor.dispose();
        }
    }
}
