package com.azure.messaging.eventhubs;

import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.class */
public class EventHubPartitionAsyncConsumer implements AutoCloseable {
    private final ClientLogger logger = new ClientLogger(EventHubPartitionAsyncConsumer.class);
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicReference<LastEnqueuedEventProperties> lastEnqueuedEventProperties = new AtomicReference<>();
    private final AmqpReceiveLinkProcessor amqpReceiveLinkProcessor;
    private final MessageSerializer messageSerializer;
    private final String fullyQualifiedNamespace;
    private final String eventHubName;
    private final String consumerGroup;
    private final String partitionId;
    private final boolean trackLastEnqueuedEventProperties;
    private final Scheduler scheduler;
    private final EmitterProcessor<PartitionEvent> emitterProcessor;
    private final EventPosition initialPosition;
    private volatile Long currentOffset;

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventHubPartitionAsyncConsumer(AmqpReceiveLinkProcessor amqpReceiveLinkProcessor, MessageSerializer messageSerializer, String str, String str2, String str3, String str4, AtomicReference<Supplier<EventPosition>> atomicReference, boolean z, Scheduler scheduler) {
        this.initialPosition = (EventPosition) Objects.requireNonNull(atomicReference.get().get(), "'currentEventPosition.get().get()' cannot be null.");
        this.amqpReceiveLinkProcessor = amqpReceiveLinkProcessor;
        this.messageSerializer = messageSerializer;
        this.fullyQualifiedNamespace = str;
        this.eventHubName = str2;
        this.consumerGroup = str3;
        this.partitionId = str4;
        this.trackLastEnqueuedEventProperties = z;
        this.scheduler = (Scheduler) Objects.requireNonNull(scheduler, "'scheduler' cannot be null.");
        if (z) {
            this.lastEnqueuedEventProperties.set(new LastEnqueuedEventProperties(null, null, null, null));
        }
        atomicReference.set(() -> {
            Long l = this.currentOffset;
            return l == null ? this.initialPosition : EventPosition.fromOffset(l.longValue());
        });
        this.emitterProcessor = amqpReceiveLinkProcessor.map(message -> {
            return onMessageReceived(message);
        }).doOnNext(partitionEvent -> {
            Long offset = partitionEvent.getData().getOffset();
            if (offset != null) {
                this.currentOffset = offset;
            } else {
                this.logger.warning("Offset for received event should not be null. Partition Id: {}. Consumer group: {}. Data: {}", new Object[]{partitionEvent.getPartitionContext().getPartitionId(), partitionEvent.getPartitionContext().getConsumerGroup(), partitionEvent.getData().getBodyAsString()});
            }
        }).subscribeWith(EmitterProcessor.create(false));
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.emitterProcessor.onComplete();
        this.amqpReceiveLinkProcessor.cancel();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<PartitionEvent> receive() {
        return this.emitterProcessor.publishOn(this.scheduler);
    }

    private PartitionEvent onMessageReceived(Message message) {
        LastEnqueuedEventProperties lastEnqueuedEventProperties;
        EventData eventData = (EventData) this.messageSerializer.deserialize(message, EventData.class);
        if (this.trackLastEnqueuedEventProperties && (lastEnqueuedEventProperties = (LastEnqueuedEventProperties) this.messageSerializer.deserialize(message, LastEnqueuedEventProperties.class)) != null) {
            this.lastEnqueuedEventProperties.set(new LastEnqueuedEventProperties(lastEnqueuedEventProperties.getSequenceNumber(), lastEnqueuedEventProperties.getOffset(), lastEnqueuedEventProperties.getEnqueuedTime(), lastEnqueuedEventProperties.getRetrievalTime()));
        }
        return new PartitionEvent(new PartitionContext(this.fullyQualifiedNamespace, this.eventHubName, this.consumerGroup, this.partitionId), eventData, this.lastEnqueuedEventProperties.get());
    }
}
