package com.azure.messaging.eventhubs;

import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.implementation.UncheckedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/messaging/eventhubs/EventDataAggregator.class */
public class EventDataAggregator extends FluxOperator<EventData, EventDataBatch> {
    private volatile EventDataAggregatorMain downstreamSubscription;
    private final Supplier<EventDataBatch> batchSupplier;
    private final String namespace;
    private final EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions options;
    private final String partitionId;
    private static final ClientLogger LOGGER = new ClientLogger(EventDataAggregator.class);
    private static final AtomicReferenceFieldUpdater<EventDataAggregator, EventDataAggregatorMain> DOWNSTREAM_SUBSCRIPTION = AtomicReferenceFieldUpdater.newUpdater(EventDataAggregator.class, EventDataAggregatorMain.class, "downstreamSubscription");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/azure/messaging/eventhubs/EventDataAggregator$EventDataAggregatorMain.class */
    public static class EventDataAggregatorMain implements Subscription, CoreSubscriber<EventData> {
        private volatile long requested;
        private static final AtomicLongFieldUpdater<EventDataAggregatorMain> REQUESTED = AtomicLongFieldUpdater.newUpdater(EventDataAggregatorMain.class, "requested");
        private final Disposable disposable;
        private final CoreSubscriber<? super EventDataBatch> downstream;
        private final String partitionId;
        private final ClientLogger logger;
        private final Supplier<EventDataBatch> batchSupplier;
        private final String namespace;
        private Subscription subscription;
        private EventDataBatch currentBatch;
        private volatile Throwable lastError;
        private final AtomicBoolean isCompleted = new AtomicBoolean(false);
        private final Object lock = new Object();
        private final Sinks.Many<Long> eventSink = Sinks.many().unicast().onBackpressureError();

        EventDataAggregatorMain(CoreSubscriber<? super EventDataBatch> coreSubscriber, String str, EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions bufferedProducerClientOptions, Supplier<EventDataBatch> supplier, String str2, ClientLogger clientLogger) {
            this.namespace = str;
            this.downstream = coreSubscriber;
            this.partitionId = str2;
            this.logger = clientLogger;
            this.batchSupplier = supplier;
            this.currentBatch = supplier.get();
            this.disposable = Flux.switchOnNext(this.eventSink.asFlux().map(l -> {
                return Flux.interval(bufferedProducerClientOptions.getMaxWaitTime()).takeUntil(l -> {
                    return this.isCompleted.get();
                });
            })).subscribe(l2 -> {
                clientLogger.atVerbose().addKeyValue(ClientConstants.PARTITION_ID_KEY, str2).log("Time elapsed. Attempt to publish downstream.");
                updateOrPublishBatch(null, true);
            });
        }

        public int getNumberOfEventsInCurrentBatch() {
            EventDataBatch eventDataBatch;
            synchronized (this.lock) {
                eventDataBatch = this.currentBatch;
            }
            if (eventDataBatch != null) {
                return eventDataBatch.getCount();
            }
            return 0;
        }

        public void request(long j) {
            if (Operators.validate(j)) {
                Operators.addCap(REQUESTED, this, j);
                this.subscription.request(j);
            }
        }

        public void cancel() {
            if (this.isCompleted.compareAndSet(false, true)) {
                this.logger.atVerbose().addKeyValue(ClientConstants.PARTITION_ID_KEY, this.partitionId).log("Disposing of aggregator.");
                this.subscription.cancel();
                updateOrPublishBatch(null, true);
                this.downstream.onComplete();
                this.disposable.dispose();
            }
        }

        public void onSubscribe(Subscription subscription) {
            if (this.subscription != null) {
                this.logger.warning("Subscription was already set. Cancelling existing subscription.");
                this.subscription.cancel();
            } else {
                this.subscription = subscription;
                this.downstream.onSubscribe(this);
            }
        }

        public void onNext(EventData eventData) {
            updateOrPublishBatch(eventData, false);
            this.eventSink.emitNext(1L, Sinks.EmitFailureHandler.FAIL_FAST);
            if (REQUESTED.get(this) > 0) {
                this.subscription.request(1L);
            }
        }

        public void onError(Throwable th) {
            if (!this.isCompleted.compareAndSet(false, true)) {
                Operators.onErrorDropped(th, this.downstream.currentContext());
            } else {
                updateOrPublishBatch(null, true);
                this.downstream.onError(th);
            }
        }

        public void onComplete() {
            if (this.isCompleted.compareAndSet(false, true)) {
                updateOrPublishBatch(null, true);
                this.downstream.onComplete();
            }
        }

        private void updateOrPublishBatch(EventData eventData, boolean z) {
            boolean isFlushSignal = isFlushSignal(eventData);
            if (z || isFlushSignal) {
                publishDownstream(isFlushSignal);
                return;
            }
            if (eventData == null) {
                return;
            }
            synchronized (this.lock) {
                if (this.currentBatch.tryAdd(eventData)) {
                    return;
                }
                publishDownstream(false);
                boolean tryAdd = this.currentBatch.tryAdd(eventData);
                if (tryAdd) {
                    return;
                }
                onError(new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, "EventData exceeded maximum size.", new AmqpErrorContext(this.namespace)));
            }
        }

        private void publishDownstream(boolean z) {
            try {
                synchronized (this.lock) {
                    EventDataBatch eventDataBatch = this.currentBatch;
                    if (eventDataBatch == null) {
                        this.logger.warning("Batch should not be null, setting a new batch.");
                        this.currentBatch = this.batchSupplier.get();
                        if (z) {
                            this.downstream.onNext(EventDataBatch.EMPTY);
                        }
                        return;
                    }
                    if (eventDataBatch.getEvents().isEmpty()) {
                        if (z) {
                            this.downstream.onNext(EventDataBatch.EMPTY);
                        }
                        return;
                    }
                    this.downstream.onNext(eventDataBatch);
                    this.logger.verbose(eventDataBatch + ": Batch published. Requested batches left: {}", new Object[]{Long.valueOf(REQUESTED.updateAndGet(this, j -> {
                        return j == Long.MAX_VALUE ? j : j - 1;
                    }))});
                    if (this.isCompleted.get()) {
                        this.logger.verbose("Aggregator is completed. Not setting another batch.");
                        this.currentBatch = null;
                    } else {
                        this.currentBatch = this.batchSupplier.get();
                    }
                }
            } catch (UncheckedExecutionException e) {
                this.logger.info("An exception occurred while trying to create a new batch.", new Object[]{e});
                if (this.lastError == null) {
                    this.lastError = e;
                    return;
                }
                this.logger.info("Exception has been set already, terminating EventDataAggregator.");
                Throwable onNextError = Operators.onNextError((Object) null, e, this.downstream.currentContext(), this.subscription);
                if (onNextError != null) {
                    onError(onNextError);
                }
            } catch (Throwable th) {
                Throwable onNextError2 = Operators.onNextError((Object) null, th, this.downstream.currentContext(), this.subscription);
                this.logger.warning("Unable to push batch downstream to publish.", new Object[]{onNextError2});
                if (onNextError2 != null) {
                    onError(onNextError2);
                }
            }
        }

        private static boolean isFlushSignal(EventData eventData) {
            return eventData instanceof FlushSignal;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventDataAggregator(Flux<? extends EventData> flux, Supplier<EventDataBatch> supplier, String str, EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions bufferedProducerClientOptions, String str2) {
        super(flux);
        this.partitionId = str2;
        this.batchSupplier = supplier;
        this.namespace = str;
        this.options = bufferedProducerClientOptions;
    }

    public void subscribe(CoreSubscriber<? super EventDataBatch> coreSubscriber) {
        EventDataAggregatorMain eventDataAggregatorMain = new EventDataAggregatorMain(coreSubscriber, this.namespace, this.options, this.batchSupplier, this.partitionId, LOGGER);
        if (!DOWNSTREAM_SUBSCRIPTION.compareAndSet(this, null, eventDataAggregatorMain)) {
            throw ((IllegalArgumentException) LOGGER.logThrowableAsError(new IllegalArgumentException("Cannot resubscribe to multiple upstreams.")));
        }
        this.source.subscribe(eventDataAggregatorMain);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumberOfEvents() {
        EventDataAggregatorMain eventDataAggregatorMain = this.downstreamSubscription;
        if (eventDataAggregatorMain == null) {
            return 0;
        }
        return eventDataAggregatorMain.getNumberOfEventsInCurrentBatch();
    }
}
