/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.CreditFlowMode;
import com.azure.core.amqp.implementation.MessageFlux;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RequestResponseChannelClosedException;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.ConnectionCacheWrapper;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubPartitionAsyncConsumer;
import com.azure.messaging.eventhubs.EventHubProperties;
import com.azure.messaging.eventhubs.MessageFluxWrapper;
import com.azure.messaging.eventhubs.PartitionProperties;
import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor;
import com.azure.messaging.eventhubs.implementation.EventHubManagementNode;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsConsumerInstrumentation;
import com.azure.messaging.eventhubs.implementation.instrumentation.InstrumentedMessageFlux;
import com.azure.messaging.eventhubs.implementation.instrumentation.OperationName;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import java.io.Closeable;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

@ServiceClient(builder=EventHubClientBuilder.class, isAsync=true)
public class EventHubConsumerAsyncClient
implements Closeable {
    private static final String RECEIVER_ENTITY_PATH_FORMAT = "%s/ConsumerGroups/%s/Partitions/%s";
    private static final ClientLogger LOGGER = new ClientLogger(EventHubConsumerAsyncClient.class);
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final ReceiveOptions defaultReceiveOptions = new ReceiveOptions();
    private final String fullyQualifiedNamespace;
    private final String eventHubName;
    private final ConnectionCacheWrapper connectionProcessor;
    private final MessageSerializer messageSerializer;
    private final String consumerGroup;
    private final int prefetchCount;
    private final boolean isSharedConnection;
    private final Runnable onClientClosed;
    private final String identifier;
    private final EventHubsConsumerInstrumentation instrumentation;
    private final ConcurrentHashMap<String, EventHubPartitionAsyncConsumer> openPartitionConsumers = new ConcurrentHashMap();

    EventHubConsumerAsyncClient(String fullyQualifiedNamespace, String eventHubName, ConnectionCacheWrapper connectionProcessor, MessageSerializer messageSerializer, String consumerGroup, int prefetchCount, boolean isSharedConnection, Runnable onClientClosed, String identifier, EventHubsConsumerInstrumentation instrumentation) {
        this.fullyQualifiedNamespace = fullyQualifiedNamespace;
        this.eventHubName = eventHubName;
        this.connectionProcessor = connectionProcessor;
        this.messageSerializer = messageSerializer;
        this.consumerGroup = consumerGroup;
        this.prefetchCount = prefetchCount;
        this.isSharedConnection = isSharedConnection;
        this.onClientClosed = onClientClosed;
        this.identifier = identifier;
        this.instrumentation = instrumentation;
    }

    public String getFullyQualifiedNamespace() {
        return this.fullyQualifiedNamespace;
    }

    public String getEventHubName() {
        return this.eventHubName;
    }

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    boolean isV2() {
        return this.connectionProcessor.isV2();
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<EventHubProperties> getEventHubProperties() {
        return this.instrumentation.instrumentMono(this.connectionProcessor.getManagementNodeWithRetries().flatMap(EventHubManagementNode::getEventHubProperties), OperationName.GET_EVENT_HUB_PROPERTIES, null);
    }

    @ServiceMethod(returns=ReturnType.COLLECTION)
    public Flux<String> getPartitionIds() {
        return this.getEventHubProperties().flatMapMany(properties -> Flux.fromIterable(properties.getPartitionIds()));
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<PartitionProperties> getPartitionProperties(String partitionId) {
        if (Objects.isNull(partitionId)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'partitionId' cannot be null."));
        }
        if (partitionId.isEmpty()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("'partitionId' cannot be an empty string."));
        }
        return this.instrumentation.instrumentMono(this.connectionProcessor.getManagementNodeWithRetries().flatMap(node -> node.getPartitionProperties(partitionId)), OperationName.GET_PARTITION_PROPERTIES, partitionId);
    }

    @ServiceMethod(returns=ReturnType.COLLECTION)
    public Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPosition startingPosition) {
        return this.receiveFromPartition(partitionId, startingPosition, this.defaultReceiveOptions);
    }

    @ServiceMethod(returns=ReturnType.COLLECTION)
    public Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions) {
        if (Objects.isNull(partitionId)) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'partitionId' cannot be null."));
        }
        if (partitionId.isEmpty()) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("'partitionId' cannot be an empty string."));
        }
        if (Objects.isNull(startingPosition)) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'startingPosition' cannot be null."));
        }
        String linkName = StringUtil.getRandomString((String)partitionId);
        return this.createConsumer(linkName, partitionId, startingPosition, receiveOptions);
    }

    @ServiceMethod(returns=ReturnType.COLLECTION)
    public Flux<PartitionEvent> receive() {
        return this.receive(true, this.defaultReceiveOptions);
    }

    @ServiceMethod(returns=ReturnType.COLLECTION)
    public Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent) {
        return this.receive(startReadingAtEarliestEvent, this.defaultReceiveOptions);
    }

    @ServiceMethod(returns=ReturnType.COLLECTION)
    public Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions) {
        if (Objects.isNull(receiveOptions)) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'receiveOptions' cannot be null."));
        }
        EventPosition startingPosition = startReadingAtEarliestEvent ? EventPosition.earliest() : EventPosition.latest();
        String prefix = StringUtil.getRandomString((String)"all");
        Flux allPartitionEvents = this.getPartitionIds().flatMap(partitionId -> {
            String linkName = prefix + "-" + partitionId;
            return this.createConsumer(linkName, (String)partitionId, startingPosition, receiveOptions);
        });
        return Flux.merge((Publisher[])new Publisher[]{allPartitionEvents});
    }

    @Override
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.openPartitionConsumers.forEach((key, value) -> value.close());
        this.openPartitionConsumers.clear();
        if (this.isSharedConnection) {
            this.onClientClosed.run();
        } else {
            this.connectionProcessor.dispose();
        }
    }

    private Flux<PartitionEvent> createConsumer(String linkName, String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions) {
        return this.openPartitionConsumers.computeIfAbsent(linkName, name -> this.createPartitionConsumer((String)name, partitionId, startingPosition, receiveOptions)).receive().doFinally(signal -> this.removeLink(linkName, partitionId, (SignalType)signal));
    }

    private void removeLink(String linkName, String partitionId, SignalType signalType) {
        LOGGER.atInfo().addKeyValue("linkName", linkName).addKeyValue("partitionId", partitionId).addKeyValue("signalType", (Object)signalType).log("Receiving completed.");
        EventHubPartitionAsyncConsumer consumer = this.openPartitionConsumers.remove(linkName);
        if (consumer != null) {
            consumer.close();
        }
    }

    private EventHubPartitionAsyncConsumer createPartitionConsumer(String linkName, String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions) {
        MessageFluxWrapper linkMessageProcessor;
        String entityPath = String.format(Locale.US, RECEIVER_ENTITY_PATH_FORMAT, this.getEventHubName(), this.consumerGroup, partitionId);
        AtomicReference<Supplier<EventPosition>> initialPosition = new AtomicReference<Supplier<EventPosition>>(() -> startingPosition);
        Mono receiveLinkMono = this.connectionProcessor.getConnection().flatMap(connection -> {
            LOGGER.atInfo().addKeyValue("linkName", linkName).addKeyValue("partitionId", partitionId).addKeyValue("connectionId", connection.getId()).log("Creating receive consumer for partition.");
            return connection.createReceiveLink(linkName, entityPath, (EventPosition)((Supplier)initialPosition.get()).get(), receiveOptions, this.identifier);
        });
        Mono retryableReceiveLinkMono = RetryUtil.withRetry((Mono)receiveLinkMono.onErrorMap(RequestResponseChannelClosedException.class, e -> new AmqpException(true, e.getMessage(), (Throwable)e, null)), (AmqpRetryOptions)this.connectionProcessor.getRetryOptions(), (String)("Failed to create receive link " + linkName), (boolean)true);
        Flux receiveLinkFlux = retryableReceiveLinkMono.repeat().filter(link -> !link.isDisposed());
        if (this.connectionProcessor.isV2()) {
            MessageFlux messageFlux = new MessageFlux(receiveLinkFlux, this.prefetchCount, CreditFlowMode.EmissionDriven, MessageFlux.NULL_RETRY_POLICY);
            linkMessageProcessor = new MessageFluxWrapper(InstrumentedMessageFlux.instrument(messageFlux, partitionId, this.instrumentation));
        } else {
            AmqpReceiveLinkProcessor receiveLinkProcessor = (AmqpReceiveLinkProcessor)receiveLinkFlux.subscribeWith((Subscriber)new AmqpReceiveLinkProcessor(entityPath, this.prefetchCount, partitionId, this.connectionProcessor, this.instrumentation));
            linkMessageProcessor = new MessageFluxWrapper(receiveLinkProcessor);
        }
        return new EventHubPartitionAsyncConsumer(linkMessageProcessor, this.messageSerializer, this.getFullyQualifiedNamespace(), this.getEventHubName(), this.consumerGroup, partitionId, initialPosition, receiveOptions.getTrackLastEnqueuedEventProperties());
    }

    boolean isConnectionClosed() {
        return this.connectionProcessor.isChannelClosed();
    }

    EventHubsConsumerInstrumentation getInstrumentation() {
        return this.instrumentation;
    }

    public String getIdentifier() {
        return this.identifier;
    }
}

