/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs;

import com.azure.core.amqp.implementation.WindowedSubscriber;
import com.azure.core.util.IterableStream;
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsConsumerInstrumentation;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.publisher.Flux;

final class SynchronousPartitionReceiver {
    private static final String TERMINAL_MESSAGE = "The receiver client is terminated. Re-create the client to continue receive attempt.";
    private final EventHubsConsumerInstrumentation instrumentation;
    private final AtomicReference<Receiver> receiver = new AtomicReference<Object>(null);

    SynchronousPartitionReceiver(EventHubConsumerAsyncClient client) {
        Objects.requireNonNull(client, "'client' cannot be null.");
        this.receiver.set(new DelegatingReceiver(client));
        this.instrumentation = client.getInstrumentation();
    }

    IterableStream<PartitionEvent> receive(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions, int maxEvents, Duration maxWaitTime) {
        Objects.requireNonNull(partitionId, "'partitionId' cannot be null.");
        Objects.requireNonNull(startingPosition, "'startingPosition' cannot be null.");
        Objects.requireNonNull(receiveOptions, "'receiveOptions' cannot be null.");
        WindowedSubscriber<PartitionEvent> subscriber = this.createSubscriber(partitionId);
        Flux<PartitionEvent> upstream = this.receiver.get().receive(partitionId, startingPosition, receiveOptions);
        upstream.subscribeWith(subscriber);
        Flux windowFlux = subscriber.enqueueRequestFlux(maxEvents, maxWaitTime);
        return new IterableStream(windowFlux.doOnComplete(() -> subscriber.cancel()).doOnError(__ -> subscriber.cancel()));
    }

    void dispose() {
        this.receiver.set(Receiver.DISPOSED);
    }

    private WindowedSubscriber<PartitionEvent> createSubscriber(String partitionId) {
        WindowedSubscriber.WindowedSubscriberOptions options = new WindowedSubscriber.WindowedSubscriberOptions();
        options.setWindowDecorator(toDecorate -> this.instrumentation.syncReceive((Flux<PartitionEvent>)toDecorate, partitionId));
        return new WindowedSubscriber(Collections.singletonMap("partitionId", partitionId), TERMINAL_MESSAGE, options);
    }

    private static final class DelegatingReceiver
    implements Receiver {
        private final EventHubConsumerAsyncClient client;

        DelegatingReceiver(EventHubConsumerAsyncClient client) {
            this.client = Objects.requireNonNull(client, "'client' cannot be null.");
        }

        @Override
        public Flux<PartitionEvent> receive(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions) {
            assert (this.client.isV2());
            return this.client.receiveFromPartition(partitionId, startingPosition, receiveOptions);
        }
    }

    private static interface Receiver {
        public static final Receiver DISPOSED = (partitionId, startingPosition, receiveOptions) -> Flux.error((Throwable)new RuntimeException(SynchronousPartitionReceiver.TERMINAL_MESSAGE));

        public Flux<PartitionEvent> receive(String var1, EventPosition var2, ReceiveOptions var3);
    }
}

