package com.azure.messaging.eventhubs;

import com.azure.core.amqp.implementation.WindowedSubscriber;
import com.azure.core.util.IterableStream;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsConsumerInstrumentation;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.publisher.Flux;

/* loaded from: input_file:com/azure/messaging/eventhubs/SynchronousPartitionReceiver.class */
final class SynchronousPartitionReceiver {
    private static final String TERMINAL_MESSAGE = "The receiver client is terminated. Re-create the client to continue receive attempt.";
    private final EventHubsConsumerInstrumentation instrumentation;
    private final AtomicReference<Receiver> receiver = new AtomicReference<>(null);

    /* loaded from: input_file:com/azure/messaging/eventhubs/SynchronousPartitionReceiver$DelegatingReceiver.class */
    private static final class DelegatingReceiver implements Receiver {
        private final EventHubConsumerAsyncClient client;
        static final /* synthetic */ boolean $assertionsDisabled;

        DelegatingReceiver(EventHubConsumerAsyncClient eventHubConsumerAsyncClient) {
            this.client = (EventHubConsumerAsyncClient) Objects.requireNonNull(eventHubConsumerAsyncClient, "'client' cannot be null.");
        }

        @Override // com.azure.messaging.eventhubs.SynchronousPartitionReceiver.Receiver
        public Flux<PartitionEvent> receive(String str, EventPosition eventPosition, ReceiveOptions receiveOptions) {
            if ($assertionsDisabled || this.client.isV2()) {
                return this.client.receiveFromPartition(str, eventPosition, receiveOptions);
            }
            throw new AssertionError();
        }

        static {
            $assertionsDisabled = !SynchronousPartitionReceiver.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:com/azure/messaging/eventhubs/SynchronousPartitionReceiver$Receiver.class */
    private interface Receiver {
        public static final Receiver DISPOSED = (str, eventPosition, receiveOptions) -> {
            return Flux.error(new RuntimeException(SynchronousPartitionReceiver.TERMINAL_MESSAGE));
        };

        Flux<PartitionEvent> receive(String str, EventPosition eventPosition, ReceiveOptions receiveOptions);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SynchronousPartitionReceiver(EventHubConsumerAsyncClient eventHubConsumerAsyncClient) {
        Objects.requireNonNull(eventHubConsumerAsyncClient, "'client' cannot be null.");
        this.receiver.set(new DelegatingReceiver(eventHubConsumerAsyncClient));
        this.instrumentation = eventHubConsumerAsyncClient.getInstrumentation();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IterableStream<PartitionEvent> receive(String str, EventPosition eventPosition, ReceiveOptions receiveOptions, int i, Duration duration) {
        Objects.requireNonNull(str, "'partitionId' cannot be null.");
        Objects.requireNonNull(eventPosition, "'startingPosition' cannot be null.");
        Objects.requireNonNull(receiveOptions, "'receiveOptions' cannot be null.");
        WindowedSubscriber<PartitionEvent> createSubscriber = createSubscriber(str);
        this.receiver.get().receive(str, eventPosition, receiveOptions).subscribeWith(createSubscriber);
        Flux enqueueRequestFlux = createSubscriber.enqueueRequestFlux(i, duration);
        Objects.requireNonNull(createSubscriber);
        return new IterableStream<>(enqueueRequestFlux.doOnComplete(createSubscriber::cancel).doOnError(th -> {
            createSubscriber.cancel();
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispose() {
        this.receiver.set(Receiver.DISPOSED);
    }

    private WindowedSubscriber<PartitionEvent> createSubscriber(String str) {
        WindowedSubscriber.WindowedSubscriberOptions windowedSubscriberOptions = new WindowedSubscriber.WindowedSubscriberOptions();
        windowedSubscriberOptions.setWindowDecorator(flux -> {
            return this.instrumentation.syncReceive(flux, str);
        });
        return new WindowedSubscriber<>(Collections.singletonMap(ClientConstants.PARTITION_ID_KEY, str), TERMINAL_MESSAGE, windowedSubscriberOptions);
    }
}
