package com.azure.messaging.eventhubs;

import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RequestResponseChannelClosedException;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import java.io.Closeable;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

@ServiceClient(builder = EventHubClientBuilder.class, isAsync = true)
/* loaded from: input_file:com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.class */
public class EventHubConsumerAsyncClient implements Closeable {
    private static final String RECEIVER_ENTITY_PATH_FORMAT = "%s/ConsumerGroups/%s/Partitions/%s";
    private static final ClientLogger LOGGER = new ClientLogger(EventHubConsumerAsyncClient.class);
    private final String fullyQualifiedNamespace;
    private final String eventHubName;
    private final EventHubConnectionProcessor connectionProcessor;
    private final MessageSerializer messageSerializer;
    private final String consumerGroup;
    private final int prefetchCount;
    private final boolean isSharedConnection;
    private final Runnable onClientClosed;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final ReceiveOptions defaultReceiveOptions = new ReceiveOptions();
    private final ConcurrentHashMap<String, EventHubPartitionAsyncConsumer> openPartitionConsumers = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventHubConsumerAsyncClient(String str, String str2, EventHubConnectionProcessor eventHubConnectionProcessor, MessageSerializer messageSerializer, String str3, int i, boolean z, Runnable runnable) {
        this.fullyQualifiedNamespace = str;
        this.eventHubName = str2;
        this.connectionProcessor = eventHubConnectionProcessor;
        this.messageSerializer = messageSerializer;
        this.consumerGroup = str3;
        this.prefetchCount = i;
        this.isSharedConnection = z;
        this.onClientClosed = runnable;
    }

    public String getFullyQualifiedNamespace() {
        return this.fullyQualifiedNamespace;
    }

    public String getEventHubName() {
        return this.eventHubName;
    }

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<EventHubProperties> getEventHubProperties() {
        return this.connectionProcessor.flatMap(eventHubAmqpConnection -> {
            return eventHubAmqpConnection.getManagementNode();
        }).flatMap((v0) -> {
            return v0.getEventHubProperties();
        });
    }

    @ServiceMethod(returns = ReturnType.COLLECTION)
    public Flux<String> getPartitionIds() {
        return getEventHubProperties().flatMapMany(eventHubProperties -> {
            return Flux.fromIterable(eventHubProperties.getPartitionIds());
        });
    }

    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<PartitionProperties> getPartitionProperties(String str) {
        return Objects.isNull(str) ? FluxUtil.monoError(LOGGER, new NullPointerException("'partitionId' cannot be null.")) : str.isEmpty() ? FluxUtil.monoError(LOGGER, new IllegalArgumentException("'partitionId' cannot be an empty string.")) : this.connectionProcessor.flatMap(eventHubAmqpConnection -> {
            return eventHubAmqpConnection.getManagementNode();
        }).flatMap(eventHubManagementNode -> {
            return eventHubManagementNode.getPartitionProperties(str);
        });
    }

    @ServiceMethod(returns = ReturnType.COLLECTION)
    public Flux<PartitionEvent> receiveFromPartition(String str, EventPosition eventPosition) {
        return receiveFromPartition(str, eventPosition, this.defaultReceiveOptions);
    }

    @ServiceMethod(returns = ReturnType.COLLECTION)
    public Flux<PartitionEvent> receiveFromPartition(String str, EventPosition eventPosition, ReceiveOptions receiveOptions) {
        return Objects.isNull(str) ? FluxUtil.fluxError(LOGGER, new NullPointerException("'partitionId' cannot be null.")) : str.isEmpty() ? FluxUtil.fluxError(LOGGER, new IllegalArgumentException("'partitionId' cannot be an empty string.")) : Objects.isNull(eventPosition) ? FluxUtil.fluxError(LOGGER, new NullPointerException("'startingPosition' cannot be null.")) : createConsumer(StringUtil.getRandomString(str), str, eventPosition, receiveOptions);
    }

    @ServiceMethod(returns = ReturnType.COLLECTION)
    public Flux<PartitionEvent> receive() {
        return receive(true, this.defaultReceiveOptions);
    }

    @ServiceMethod(returns = ReturnType.COLLECTION)
    public Flux<PartitionEvent> receive(boolean z) {
        return receive(z, this.defaultReceiveOptions);
    }

    @ServiceMethod(returns = ReturnType.COLLECTION)
    public Flux<PartitionEvent> receive(boolean z, ReceiveOptions receiveOptions) {
        if (Objects.isNull(receiveOptions)) {
            return FluxUtil.fluxError(LOGGER, new NullPointerException("'receiveOptions' cannot be null."));
        }
        EventPosition earliest = z ? EventPosition.earliest() : EventPosition.latest();
        String randomString = StringUtil.getRandomString("all");
        return Flux.merge(new Publisher[]{getPartitionIds().flatMap(str -> {
            return createConsumer(randomString + "-" + str, str, earliest, receiveOptions);
        })});
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.openPartitionConsumers.forEach((str, eventHubPartitionAsyncConsumer) -> {
            eventHubPartitionAsyncConsumer.close();
        });
        this.openPartitionConsumers.clear();
        if (this.isSharedConnection) {
            this.onClientClosed.run();
        } else {
            this.connectionProcessor.dispose();
        }
    }

    private Flux<PartitionEvent> createConsumer(String str, String str2, EventPosition eventPosition, ReceiveOptions receiveOptions) {
        return this.openPartitionConsumers.computeIfAbsent(str, str3 -> {
            return createPartitionConsumer(str3, str2, eventPosition, receiveOptions);
        }).receive().doFinally(signalType -> {
            removeLink(str, str2, signalType);
        });
    }

    private void removeLink(String str, String str2, SignalType signalType) {
        LOGGER.atInfo().addKeyValue(ClientConstants.LINK_NAME_KEY, str).addKeyValue(ClientConstants.PARTITION_ID_KEY, str2).addKeyValue(ClientConstants.SIGNAL_TYPE_KEY, signalType).log("Receiving completed.");
        EventHubPartitionAsyncConsumer remove = this.openPartitionConsumers.remove(str);
        if (remove != null) {
            remove.close();
        }
    }

    private EventHubPartitionAsyncConsumer createPartitionConsumer(String str, String str2, EventPosition eventPosition, ReceiveOptions receiveOptions) {
        String format = String.format(Locale.US, RECEIVER_ENTITY_PATH_FORMAT, getEventHubName(), this.consumerGroup, str2);
        AtomicReference atomicReference = new AtomicReference(() -> {
            return eventPosition;
        });
        return new EventHubPartitionAsyncConsumer(RetryUtil.withRetry(this.connectionProcessor.flatMap(eventHubAmqpConnection -> {
            LOGGER.atInfo().addKeyValue(ClientConstants.LINK_NAME_KEY, str).addKeyValue(ClientConstants.PARTITION_ID_KEY, str2).addKeyValue(ClientConstants.CONNECTION_ID_KEY, eventHubAmqpConnection.getId()).log("Creating receive consumer for partition.");
            return eventHubAmqpConnection.createReceiveLink(str, format, (EventPosition) ((Supplier) atomicReference.get()).get(), receiveOptions);
        }).onErrorMap(RequestResponseChannelClosedException.class, requestResponseChannelClosedException -> {
            return new AmqpException(true, requestResponseChannelClosedException.getMessage(), requestResponseChannelClosedException, (AmqpErrorContext) null);
        }), this.connectionProcessor.getRetryOptions(), "Failed to create receive link " + str, true).repeat().subscribeWith(new AmqpReceiveLinkProcessor(format, this.prefetchCount, this.connectionProcessor)), this.messageSerializer, getFullyQualifiedNamespace(), getEventHubName(), this.consumerGroup, str2, atomicReference, receiveOptions.getTrackLastEnqueuedEventProperties());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isConnectionClosed() {
        return this.connectionProcessor.isChannelClosed();
    }
}
