package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.ErrorContextProvider;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RequestResponseChannelClosedException;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.Messages;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusSenderInstrumentation;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.CreateMessageBatchOptions;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.StreamSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@ServiceClient(builder = ServiceBusClientBuilder.class, isAsync = true)
/* loaded from: input_file:com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.class */
public final class ServiceBusSenderAsyncClient implements AutoCloseable {
    static final int MAX_MESSAGE_LENGTH_BYTES = 262144;
    private static final String TRANSACTION_LINK_NAME = "coordinator";
    private static final CreateMessageBatchOptions DEFAULT_BATCH_OPTIONS = new CreateMessageBatchOptions();
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusSenderAsyncClient.class);
    private final AtomicReference<String> linkName = new AtomicReference<>();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final MessageSerializer messageSerializer;
    private final AmqpRetryOptions retryOptions;
    private final AmqpRetryPolicy retryPolicy;
    private final MessagingEntityType entityType;
    private final Runnable onClientClose;
    private final String entityName;
    private final ServiceBusConnectionProcessor connectionProcessor;
    private final String viaEntityName;
    private final String identifier;
    private final ServiceBusSenderInstrumentation instrumentation;
    private final ServiceBusTracer tracer;

    /* loaded from: input_file:com/azure/messaging/servicebus/ServiceBusSenderAsyncClient$AmqpMessageCollector.class */
    private static class AmqpMessageCollector implements Collector<ServiceBusMessage, List<ServiceBusMessageBatch>, List<ServiceBusMessageBatch>> {
        private final int maxMessageSize;
        private final Integer maxNumberOfBatches;
        private final ErrorContextProvider contextProvider;
        private final ServiceBusTracer tracer;
        private final MessageSerializer serializer;
        private volatile ServiceBusMessageBatch currentBatch;

        AmqpMessageCollector(CreateMessageBatchOptions createMessageBatchOptions, Integer num, ErrorContextProvider errorContextProvider, ServiceBusTracer serviceBusTracer, MessageSerializer messageSerializer) {
            this.maxNumberOfBatches = num;
            this.maxMessageSize = createMessageBatchOptions.getMaximumSizeInBytes() > 0 ? createMessageBatchOptions.getMaximumSizeInBytes() : ServiceBusSenderAsyncClient.MAX_MESSAGE_LENGTH_BYTES;
            this.contextProvider = errorContextProvider;
            this.tracer = serviceBusTracer;
            this.serializer = messageSerializer;
            this.currentBatch = new ServiceBusMessageBatch(this.maxMessageSize, errorContextProvider, serviceBusTracer, messageSerializer);
        }

        @Override // java.util.stream.Collector
        public Supplier<List<ServiceBusMessageBatch>> supplier() {
            return ArrayList::new;
        }

        @Override // java.util.stream.Collector
        public BiConsumer<List<ServiceBusMessageBatch>, ServiceBusMessage> accumulator() {
            return (list, serviceBusMessage) -> {
                ServiceBusMessageBatch serviceBusMessageBatch = this.currentBatch;
                if (serviceBusMessageBatch.tryAddMessage(serviceBusMessage)) {
                    return;
                }
                if (this.maxNumberOfBatches != null && list.size() == this.maxNumberOfBatches.intValue()) {
                    throw new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, String.format(Locale.US, "EventData does not fit into maximum number of batches. '%s'", this.maxNumberOfBatches), this.contextProvider.getErrorContext());
                }
                this.currentBatch = new ServiceBusMessageBatch(this.maxMessageSize, this.contextProvider, this.tracer, this.serializer);
                this.currentBatch.tryAddMessage(serviceBusMessage);
                list.add(serviceBusMessageBatch);
            };
        }

        @Override // java.util.stream.Collector
        public BinaryOperator<List<ServiceBusMessageBatch>> combiner() {
            return (list, list2) -> {
                list.addAll(list2);
                return list;
            };
        }

        @Override // java.util.stream.Collector
        public Function<List<ServiceBusMessageBatch>, List<ServiceBusMessageBatch>> finisher() {
            return list -> {
                ServiceBusMessageBatch serviceBusMessageBatch = this.currentBatch;
                this.currentBatch = null;
                if (serviceBusMessageBatch != null) {
                    list.add(serviceBusMessageBatch);
                }
                return list;
            };
        }

        @Override // java.util.stream.Collector
        public Set<Collector.Characteristics> characteristics() {
            return Collections.emptySet();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusSenderAsyncClient(String str, MessagingEntityType messagingEntityType, ServiceBusConnectionProcessor serviceBusConnectionProcessor, AmqpRetryOptions amqpRetryOptions, ServiceBusSenderInstrumentation serviceBusSenderInstrumentation, MessageSerializer messageSerializer, Runnable runnable, String str2, String str3) {
        this.messageSerializer = (MessageSerializer) Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.retryOptions = (AmqpRetryOptions) Objects.requireNonNull(amqpRetryOptions, "'retryOptions' cannot be null.");
        this.entityName = (String) Objects.requireNonNull(str, "'entityPath' cannot be null.");
        this.connectionProcessor = (ServiceBusConnectionProcessor) Objects.requireNonNull(serviceBusConnectionProcessor, "'connectionProcessor' cannot be null.");
        this.instrumentation = (ServiceBusSenderInstrumentation) Objects.requireNonNull(serviceBusSenderInstrumentation, "'instrumentation' cannot be null.");
        this.tracer = serviceBusSenderInstrumentation.getTracer();
        this.retryPolicy = RetryUtil.getRetryPolicy(amqpRetryOptions);
        this.entityType = messagingEntityType;
        this.viaEntityName = str2;
        this.onClientClose = runnable;
        this.identifier = str3;
    }

    public String getFullyQualifiedNamespace() {
        return this.connectionProcessor.getFullyQualifiedNamespace();
    }

    public String getEntityPath() {
        return this.entityName;
    }

    public String getIdentifier() {
        return this.identifier;
    }

    public Mono<Void> sendMessage(ServiceBusMessage serviceBusMessage) {
        return Objects.isNull(serviceBusMessage) ? FluxUtil.monoError(LOGGER, new NullPointerException("'message' cannot be null.")) : sendInternal(Flux.just(serviceBusMessage), (ServiceBusTransactionContext) null);
    }

    public Mono<Void> sendMessage(ServiceBusMessage serviceBusMessage, ServiceBusTransactionContext serviceBusTransactionContext) {
        return Objects.isNull(serviceBusTransactionContext) ? FluxUtil.monoError(LOGGER, new NullPointerException("'transactionContext' cannot be null.")) : Objects.isNull(serviceBusTransactionContext.getTransactionId()) ? FluxUtil.monoError(LOGGER, new NullPointerException("'transactionContext.transactionId' cannot be null.")) : sendInternal(Flux.just(serviceBusMessage), serviceBusTransactionContext);
    }

    public Mono<Void> sendMessages(Iterable<ServiceBusMessage> iterable, ServiceBusTransactionContext serviceBusTransactionContext) {
        return Objects.isNull(serviceBusTransactionContext) ? FluxUtil.monoError(LOGGER, new NullPointerException("'transactionContext' cannot be null.")) : Objects.isNull(serviceBusTransactionContext.getTransactionId()) ? FluxUtil.monoError(LOGGER, new NullPointerException("'transactionContext.transactionId' cannot be null.")) : sendIterable(iterable, serviceBusTransactionContext);
    }

    public Mono<Void> sendMessages(Iterable<ServiceBusMessage> iterable) {
        return sendIterable(iterable, null);
    }

    public Mono<Void> sendMessages(ServiceBusMessageBatch serviceBusMessageBatch) {
        return sendInternal(serviceBusMessageBatch, (ServiceBusTransactionContext) null);
    }

    public Mono<Void> sendMessages(ServiceBusMessageBatch serviceBusMessageBatch, ServiceBusTransactionContext serviceBusTransactionContext) {
        return Objects.isNull(serviceBusTransactionContext) ? FluxUtil.monoError(LOGGER, new NullPointerException("'transactionContext' cannot be null.")) : Objects.isNull(serviceBusTransactionContext.getTransactionId()) ? FluxUtil.monoError(LOGGER, new NullPointerException("'transactionContext.transactionId' cannot be null.")) : sendInternal(serviceBusMessageBatch, serviceBusTransactionContext);
    }

    public Mono<ServiceBusMessageBatch> createMessageBatch() {
        return createMessageBatch(DEFAULT_BATCH_OPTIONS);
    }

    public Mono<ServiceBusMessageBatch> createMessageBatch(CreateMessageBatchOptions createMessageBatchOptions) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "createMessageBatch")));
        }
        if (Objects.isNull(createMessageBatchOptions)) {
            return FluxUtil.monoError(LOGGER, new NullPointerException("'options' cannot be null."));
        }
        int maximumSizeInBytes = createMessageBatchOptions.getMaximumSizeInBytes();
        return RetryUtil.withRetry(getSendLink().flatMap(amqpSendLink -> {
            return amqpSendLink.getLinkSize().flatMap(num -> {
                int intValue = num.intValue() > 0 ? num.intValue() : MAX_MESSAGE_LENGTH_BYTES;
                if (maximumSizeInBytes > intValue) {
                    return FluxUtil.monoError(LOGGER, new IllegalArgumentException(String.format(Locale.US, "CreateMessageBatchOptions.getMaximumSizeInBytes (%s bytes) is larger than the link size (%s bytes).", Integer.valueOf(maximumSizeInBytes), Integer.valueOf(intValue))));
                }
                int i = maximumSizeInBytes > 0 ? maximumSizeInBytes : intValue;
                Objects.requireNonNull(amqpSendLink);
                return Mono.just(new ServiceBusMessageBatch(i, amqpSendLink::getErrorContext, this.tracer, this.messageSerializer));
            });
        }).onErrorMap(RequestResponseChannelClosedException.class, requestResponseChannelClosedException -> {
            return new AmqpException(true, requestResponseChannelClosedException.getMessage(), requestResponseChannelClosedException, (AmqpErrorContext) null);
        }), this.retryOptions, String.format("entityPath[%s]: Creating batch timed out.", this.entityName)).onErrorMap(this::mapError);
    }

    public Mono<Long> scheduleMessage(ServiceBusMessage serviceBusMessage, OffsetDateTime offsetDateTime, ServiceBusTransactionContext serviceBusTransactionContext) {
        return Objects.isNull(serviceBusTransactionContext) ? FluxUtil.monoError(LOGGER, new NullPointerException("'transactionContext' cannot be null.")) : Objects.isNull(serviceBusTransactionContext.getTransactionId()) ? FluxUtil.monoError(LOGGER, new NullPointerException("'transactionContext.transactionId' cannot be null.")) : scheduleMessageInternal(serviceBusMessage, offsetDateTime, serviceBusTransactionContext);
    }

    public Mono<Long> scheduleMessage(ServiceBusMessage serviceBusMessage, OffsetDateTime offsetDateTime) {
        return scheduleMessageInternal(serviceBusMessage, offsetDateTime, null);
    }

    public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> iterable, OffsetDateTime offsetDateTime) {
        return scheduleMessages(iterable, offsetDateTime, null);
    }

    public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> iterable, OffsetDateTime offsetDateTime, ServiceBusTransactionContext serviceBusTransactionContext) {
        return this.isDisposed.get() ? FluxUtil.fluxError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "scheduleMessages"))) : Objects.isNull(iterable) ? FluxUtil.fluxError(LOGGER, new NullPointerException("'messages' cannot be null.")) : Objects.isNull(offsetDateTime) ? FluxUtil.fluxError(LOGGER, new NullPointerException("'scheduledEnqueueTime' cannot be null.")) : createMessageBatch().map(serviceBusMessageBatch -> {
            int i = 0;
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                if (!serviceBusMessageBatch.tryAddMessage((ServiceBusMessage) it.next())) {
                    throw LOGGER.logExceptionAsError(new IllegalArgumentException(String.format(Locale.US, "Messages exceed max allowed size for all the messages together. Failed to add message at index '%s'.", Integer.valueOf(i))));
                }
                i++;
            }
            return serviceBusMessageBatch;
        }).flatMapMany(serviceBusMessageBatch2 -> {
            return this.tracer.traceScheduleFlux("ServiceBus.scheduleMessages", this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
                return serviceBusAmqpConnection.getManagementNode(this.entityName, this.entityType);
            }).flatMapMany(serviceBusManagementNode -> {
                return serviceBusManagementNode.schedule(serviceBusMessageBatch2.getMessages(), offsetDateTime, serviceBusMessageBatch2.getMaxSizeInBytes(), this.linkName.get(), serviceBusTransactionContext);
            }), serviceBusMessageBatch2.getMessages());
        }).onErrorMap(this::mapError);
    }

    public Mono<Void> cancelScheduledMessage(long j) {
        return this.isDisposed.get() ? FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "cancelScheduledMessage"))) : j < 0 ? FluxUtil.monoError(LOGGER, new IllegalArgumentException("'sequenceNumber' cannot be negative.")) : this.tracer.traceMono("ServiceBus.cancelScheduledMessage", this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityName, this.entityType);
        }).flatMap(serviceBusManagementNode -> {
            return serviceBusManagementNode.cancelScheduledMessages(Collections.singletonList(Long.valueOf(j)), this.linkName.get());
        })).onErrorMap(this::mapError);
    }

    public Mono<Void> cancelScheduledMessages(Iterable<Long> iterable) {
        return this.isDisposed.get() ? FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "cancelScheduledMessages"))) : Objects.isNull(iterable) ? FluxUtil.monoError(LOGGER, new NullPointerException("'messages' cannot be null.")) : this.tracer.traceMono("ServiceBus.cancelScheduledMessages", this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityName, this.entityType);
        }).flatMap(serviceBusManagementNode -> {
            return serviceBusManagementNode.cancelScheduledMessages(iterable, this.linkName.get());
        })).onErrorMap(this::mapError);
    }

    public Mono<ServiceBusTransactionContext> createTransaction() {
        return this.isDisposed.get() ? FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "createTransaction"))) : this.tracer.traceMono("ServiceBus.createTransaction", this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.createSession(TRANSACTION_LINK_NAME);
        }).flatMap(amqpSession -> {
            return amqpSession.createTransaction();
        }).map(amqpTransaction -> {
            return new ServiceBusTransactionContext(amqpTransaction.getTransactionId());
        })).onErrorMap(this::mapError);
    }

    public Mono<Void> commitTransaction(ServiceBusTransactionContext serviceBusTransactionContext) {
        return this.isDisposed.get() ? FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "commitTransaction"))) : Objects.isNull(serviceBusTransactionContext) ? FluxUtil.monoError(LOGGER, new NullPointerException("'transactionContext' cannot be null.")) : Objects.isNull(serviceBusTransactionContext.getTransactionId()) ? FluxUtil.monoError(LOGGER, new NullPointerException("'transactionContext.transactionId' cannot be null.")) : this.tracer.traceMono("ServiceBus.commitTransaction", this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.createSession(TRANSACTION_LINK_NAME);
        }).flatMap(amqpSession -> {
            return amqpSession.commitTransaction(new AmqpTransaction(serviceBusTransactionContext.getTransactionId()));
        })).onErrorMap(this::mapError);
    }

    public Mono<Void> rollbackTransaction(ServiceBusTransactionContext serviceBusTransactionContext) {
        return this.isDisposed.get() ? FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "rollbackTransaction"))) : Objects.isNull(serviceBusTransactionContext) ? FluxUtil.monoError(LOGGER, new NullPointerException("'transactionContext' cannot be null.")) : Objects.isNull(serviceBusTransactionContext.getTransactionId()) ? FluxUtil.monoError(LOGGER, new NullPointerException("'transactionContext.transactionId' cannot be null.")) : this.tracer.traceMono("ServiceBus.rollbackTransaction", this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.createSession(TRANSACTION_LINK_NAME);
        }).flatMap(amqpSession -> {
            return amqpSession.rollbackTransaction(new AmqpTransaction(serviceBusTransactionContext.getTransactionId()));
        })).onErrorMap(this::mapError);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.onClientClose.run();
    }

    private Mono<Void> sendIterable(Iterable<ServiceBusMessage> iterable, ServiceBusTransactionContext serviceBusTransactionContext) {
        return Objects.isNull(iterable) ? FluxUtil.monoError(LOGGER, new NullPointerException("'messages' cannot be null.")) : createMessageBatch().flatMap(serviceBusMessageBatch -> {
            StreamSupport.stream(iterable.spliterator(), false).forEach(serviceBusMessage -> {
                serviceBusMessageBatch.tryAddMessage(serviceBusMessage);
            });
            return sendInternal(serviceBusMessageBatch, serviceBusTransactionContext);
        }).onErrorMap(this::mapError);
    }

    private Mono<Long> scheduleMessageInternal(ServiceBusMessage serviceBusMessage, OffsetDateTime offsetDateTime, ServiceBusTransactionContext serviceBusTransactionContext) {
        return this.isDisposed.get() ? FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "scheduleMessage"))) : Objects.isNull(serviceBusMessage) ? FluxUtil.monoError(LOGGER, new NullPointerException("'message' cannot be null.")) : Objects.isNull(offsetDateTime) ? FluxUtil.monoError(LOGGER, new NullPointerException("'scheduledEnqueueTime' cannot be null.")) : this.tracer.traceScheduleMono("ServiceBus.scheduleMessage", getSendLink().flatMap(amqpSendLink -> {
            return amqpSendLink.getLinkSize().flatMap(num -> {
                int intValue = num.intValue() > 0 ? num.intValue() : MAX_MESSAGE_LENGTH_BYTES;
                return this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
                    return serviceBusAmqpConnection.getManagementNode(this.entityName, this.entityType);
                }).flatMap(serviceBusManagementNode -> {
                    return serviceBusManagementNode.schedule(Arrays.asList(serviceBusMessage), offsetDateTime, intValue, amqpSendLink.getLinkName(), serviceBusTransactionContext).next();
                });
            });
        }), serviceBusMessage, serviceBusMessage.getContext()).onErrorMap(this::mapError);
    }

    private Mono<Void> sendInternal(ServiceBusMessageBatch serviceBusMessageBatch, ServiceBusTransactionContext serviceBusTransactionContext) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "sendMessages")));
        }
        if (Objects.isNull(serviceBusMessageBatch)) {
            return FluxUtil.monoError(LOGGER, new NullPointerException("'batch' cannot be null."));
        }
        if (serviceBusMessageBatch.getMessages().isEmpty()) {
            LOGGER.info("Cannot send an EventBatch that is empty.");
            return Mono.empty();
        }
        LOGGER.atInfo().addKeyValue("batchSize", serviceBusMessageBatch.getCount()).log("Sending batch.");
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        serviceBusMessageBatch.getMessages().forEach(serviceBusMessage -> {
            Message serialize = this.messageSerializer.serialize(serviceBusMessage);
            serialize.setMessageAnnotations(serialize.getMessageAnnotations() == null ? new MessageAnnotations(new HashMap()) : serialize.getMessageAnnotations());
            synchronizedList.add(serialize);
        });
        return this.instrumentation.instrumentSendBatch("ServiceBus.send", RetryUtil.withRetry(getSendLink().flatMap(amqpSendLink -> {
            if (serviceBusTransactionContext == null || serviceBusTransactionContext.getTransactionId() == null) {
                return synchronizedList.size() == 1 ? amqpSendLink.send((Message) synchronizedList.get(0)) : amqpSendLink.send(synchronizedList);
            }
            TransactionalState transactionalState = new TransactionalState();
            transactionalState.setTxnId(Binary.create(serviceBusTransactionContext.getTransactionId()));
            return synchronizedList.size() == 1 ? amqpSendLink.send((Message) synchronizedList.get(0), transactionalState) : amqpSendLink.send(synchronizedList, transactionalState);
        }).onErrorMap(RequestResponseChannelClosedException.class, requestResponseChannelClosedException -> {
            return new AmqpException(true, requestResponseChannelClosedException.getMessage(), requestResponseChannelClosedException, (AmqpErrorContext) null);
        }), this.retryOptions, String.format("entityPath[%s], messages-count[%s]: Sending messages timed out.", this.entityName, Integer.valueOf(serviceBusMessageBatch.getCount()))).onErrorMap(this::mapError), serviceBusMessageBatch.getMessages());
    }

    private Mono<Void> sendInternal(Flux<ServiceBusMessage> flux, ServiceBusTransactionContext serviceBusTransactionContext) {
        return this.isDisposed.get() ? FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_SENDER, "sendMessage"))) : RetryUtil.withRetry(getSendLink(), this.retryOptions, "Failed to create send link " + this.linkName).flatMap(amqpSendLink -> {
            return amqpSendLink.getLinkSize().flatMap(num -> {
                CreateMessageBatchOptions maximumSizeInBytes = new CreateMessageBatchOptions().setMaximumSizeInBytes(num.intValue() > 0 ? num.intValue() : MAX_MESSAGE_LENGTH_BYTES);
                Objects.requireNonNull(amqpSendLink);
                return flux.collect(new AmqpMessageCollector(maximumSizeInBytes, 1, amqpSendLink::getErrorContext, this.tracer, this.messageSerializer));
            }).flatMap(list -> {
                return sendInternalBatch(Flux.fromIterable(list), serviceBusTransactionContext);
            });
        }).onErrorMap(this::mapError);
    }

    private Mono<Void> sendInternalBatch(Flux<ServiceBusMessageBatch> flux, ServiceBusTransactionContext serviceBusTransactionContext) {
        return flux.flatMap(serviceBusMessageBatch -> {
            return sendInternal(serviceBusMessageBatch, serviceBusTransactionContext);
        }).then().doOnError(th -> {
            LOGGER.error("Error sending batch.", new Object[]{th});
        });
    }

    private Mono<AmqpSendLink> getSendLink() {
        return this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return !CoreUtils.isNullOrEmpty(this.viaEntityName) ? serviceBusAmqpConnection.createSendLink("VIA-".concat(this.viaEntityName), this.viaEntityName, this.retryOptions, this.entityName, this.identifier) : serviceBusAmqpConnection.createSendLink(this.entityName, this.entityName, this.retryOptions, null, this.identifier);
        }).doOnNext(amqpSendLink -> {
            this.linkName.compareAndSet(null, amqpSendLink.getLinkName());
        });
    }

    private Throwable mapError(Throwable th) {
        return !(th instanceof ServiceBusException) ? new ServiceBusException(th, ServiceBusErrorSource.SEND) : th;
    }
}
