package com.azure.messaging.eventhubs;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.implementation.UncheckedExecutionException;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import com.azure.messaging.eventhubs.models.SendBatchFailedContext;
import com.azure.messaging.eventhubs.models.SendBatchSucceededContext;
import java.io.Closeable;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.Scannable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.class */
public class EventHubBufferedPartitionProducer implements Closeable {
    private final ClientLogger logger;
    private final AmqpRetryOptions retryOptions;
    private final EventHubProducerAsyncClient client;
    private final String partitionId;
    private final AmqpErrorContext errorContext;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final Disposable publishSubscription;
    private final Sinks.Many<EventData> eventSink;
    private final CreateBatchOptions createBatchOptions;
    private final PublishResultSubscriber publishResultSubscriber;
    private final EventHubsTracer tracer;
    private final EventDataAggregator eventDataAggregator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer$PublishResult.class */
    public static class PublishResult {
        private static final PublishResult EMPTY = new PublishResult();
        private final List<EventData> events;
        private final Throwable error;

        PublishResult(List<EventData> list, Throwable th) {
            this.events = (List) Objects.requireNonNull(list);
            this.error = th;
        }

        private PublishResult() {
            this.events = null;
            this.error = null;
        }
    }

    /* loaded from: input_file:com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer$PublishResultSubscriber.class */
    private static class PublishResultSubscriber extends BaseSubscriber<PublishResult> {
        private final String partitionId;
        private final Sinks.Many<EventData> eventSink;
        private final Consumer<SendBatchSucceededContext> onSucceed;
        private final Consumer<SendBatchFailedContext> onFailed;
        private final Duration operationTimeout;
        private final ClientLogger logger;
        private final AtomicReference<FlushSignal> pendingFlushSignal = new AtomicReference<>(null);
        private final Semaphore flushSemaphore = new Semaphore(1);
        private final AtomicBoolean terminated = new AtomicBoolean(false);

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer$PublishResultSubscriber$FlushCompletionOrigin.class */
        public enum FlushCompletionOrigin {
            TERMINAL_COMPLETION,
            TERMINAL_ERROR,
            ON_NEXT_EMPTY,
            ON_NEXT
        }

        PublishResultSubscriber(String str, Sinks.Many<EventData> many, Consumer<SendBatchSucceededContext> consumer, Consumer<SendBatchFailedContext> consumer2, Duration duration, ClientLogger clientLogger) {
            this.partitionId = str;
            this.eventSink = many;
            this.onSucceed = consumer;
            this.onFailed = consumer2;
            this.operationTimeout = duration;
            this.logger = clientLogger;
        }

        protected void hookOnSubscribe(Subscription subscription) {
            requestUnbounded();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void hookOnNext(PublishResult publishResult) {
            if (publishResult != PublishResult.EMPTY) {
                if (publishResult.error == null) {
                    this.onSucceed.accept(new SendBatchSucceededContext(publishResult.events, this.partitionId));
                } else {
                    this.onFailed.accept(new SendBatchFailedContext(publishResult.events, this.partitionId, publishResult.error));
                }
            }
            tryCompleteFlush(publishResult == PublishResult.EMPTY ? FlushCompletionOrigin.ON_NEXT_EMPTY : FlushCompletionOrigin.ON_NEXT);
        }

        protected void hookOnError(Throwable th) {
            if (this.terminated.getAndSet(true)) {
                return;
            }
            this.logger.atError().log("Publishing-subscription terminated with an error.", new Object[]{th});
            this.onFailed.accept(new SendBatchFailedContext(null, this.partitionId, th));
            tryCompleteFlush(FlushCompletionOrigin.TERMINAL_ERROR);
        }

        protected void hookOnComplete() {
            if (this.terminated.getAndSet(true)) {
                return;
            }
            this.logger.atInfo().log("Publishing-subscription terminated.");
            tryCompleteFlush(FlushCompletionOrigin.TERMINAL_COMPLETION);
        }

        TimeoutException awaitPendingFlush() {
            if (this.pendingFlushSignal.get() == null) {
                return null;
            }
            try {
                if (!this.flushSemaphore.tryAcquire(this.operationTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                    return new TimeoutException("Timed out waiting for flush operation to complete.");
                }
                this.flushSemaphore.release();
                return null;
            } catch (InterruptedException e) {
                return new TimeoutException("Unable to acquire flush-semaphore due to interrupted exception.");
            }
        }

        Mono<Void> startFlush() {
            return Mono.create(monoSink -> {
                if (this.terminated.get()) {
                    this.logger.atInfo().log("Nothing to flush as publishing-subscription is terminated.");
                    monoSink.success();
                    return;
                }
                FlushSignal flushSignal = new FlushSignal(monoSink);
                if (!this.pendingFlushSignal.compareAndSet(null, flushSignal)) {
                    this.logger.atInfo().log("Another flush operation is already in progress.");
                    monoSink.success();
                    return;
                }
                try {
                    if (!this.flushSemaphore.tryAcquire(this.operationTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                        this.pendingFlushSignal.set(null);
                        monoSink.error(new TimeoutException("timout waiting for acquiring flush-semaphore."));
                        return;
                    }
                    this.logger.atVerbose().addKeyValue("signal-id", flushSignal.getId()).addKeyValue("permits", this.flushSemaphore.availablePermits()).log("Enqueuing flush.");
                    Sinks.EmitResult tryEmitNext = this.eventSink.tryEmitNext(flushSignal);
                    if (tryEmitNext != Sinks.EmitResult.OK) {
                        this.pendingFlushSignal.set(null);
                        this.flushSemaphore.release();
                        monoSink.error(new RuntimeException("Unable to enqueue flush: id" + flushSignal.getId() + " (" + tryEmitNext + ")"));
                    }
                } catch (InterruptedException e) {
                    this.pendingFlushSignal.set(null);
                    this.logger.atWarning().log("Unable to acquire flush-semaphore.");
                    monoSink.error(e);
                }
            });
        }

        private void tryCompleteFlush(FlushCompletionOrigin flushCompletionOrigin) {
            FlushSignal andSet = this.pendingFlushSignal.getAndSet(null);
            if (andSet != null) {
                this.logger.atVerbose().addKeyValue("signal-id", andSet.getId()).addKeyValue("permits", this.flushSemaphore.availablePermits()).addKeyValue("completion-origin", flushCompletionOrigin).log("Completing flush.");
                this.flushSemaphore.release();
                andSet.flushed();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventHubBufferedPartitionProducer(EventHubProducerAsyncClient eventHubProducerAsyncClient, String str, EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions bufferedProducerClientOptions, AmqpRetryOptions amqpRetryOptions, Sinks.Many<EventData> many, Tracer tracer) {
        HashMap hashMap = new HashMap();
        hashMap.put(ClientConstants.PARTITION_ID_KEY, str);
        this.logger = new ClientLogger(EventHubBufferedPartitionProducer.class, hashMap);
        this.client = eventHubProducerAsyncClient;
        this.partitionId = str;
        this.errorContext = new AmqpErrorContext(eventHubProducerAsyncClient.getFullyQualifiedNamespace());
        this.createBatchOptions = new CreateBatchOptions().setPartitionId(str);
        this.retryOptions = amqpRetryOptions;
        this.eventSink = many;
        this.eventDataAggregator = new EventDataAggregator(this.eventSink.asFlux(), this::createNewBatch, eventHubProducerAsyncClient.getFullyQualifiedNamespace(), bufferedProducerClientOptions, str);
        this.publishResultSubscriber = new PublishResultSubscriber(str, this.eventSink, bufferedProducerClientOptions.getSendSucceededContext(), bufferedProducerClientOptions.getSendFailedContext(), amqpRetryOptions.getTryTimeout(), this.logger);
        this.publishSubscription = publishEvents(this.eventDataAggregator).publishOn(Schedulers.boundedElastic(), 1).subscribeWith(this.publishResultSubscriber);
        this.tracer = new EventHubsTracer(tracer, eventHubProducerAsyncClient.getFullyQualifiedNamespace(), eventHubProducerAsyncClient.getEventHubName(), null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> enqueueEvent(EventData eventData) {
        return RetryUtil.withRetry(Mono.create(monoSink -> {
            if (this.isClosed.get()) {
                monoSink.error(new IllegalStateException(String.format("Partition publisher id[%s] is already closed. Cannot enqueue more events.", this.partitionId)));
                return;
            }
            TimeoutException awaitPendingFlush = this.publishResultSubscriber.awaitPendingFlush();
            if (awaitPendingFlush != null) {
                monoSink.error(awaitPendingFlush);
                return;
            }
            if (this.isClosed.get()) {
                monoSink.error(new IllegalStateException(String.format("Partition publisher id[%s] was closed between flushing events and now. Cannot enqueue events.", this.partitionId)));
                return;
            }
            this.tracer.reportMessageSpan(eventData, eventData.getContext());
            Sinks.EmitResult tryEmitNext = this.eventSink.tryEmitNext(eventData);
            if (tryEmitNext.isSuccess()) {
                monoSink.success();
                return;
            }
            if (tryEmitNext == Sinks.EmitResult.FAIL_NON_SERIALIZED || tryEmitNext == Sinks.EmitResult.FAIL_OVERFLOW) {
                this.logger.atInfo().addKeyValue(ClientConstants.EMIT_RESULT_KEY, tryEmitNext).log("Event could not be published downstream. Applying retry.");
                monoSink.error(new AmqpException(true, tryEmitNext + " occurred.", this.errorContext));
            } else {
                this.logger.atWarning().addKeyValue(ClientConstants.EMIT_RESULT_KEY, tryEmitNext).log("Event could not be published downstream. Not retrying.", new Object[]{tryEmitNext});
                monoSink.error(new AmqpException(false, "Unable to buffer message for partition: " + getPartitionId(), this.errorContext));
            }
        }), this.retryOptions, "Timed out trying to enqueue event data.", true).onErrorMap(IllegalStateException.class, illegalStateException -> {
            return new AmqpException(true, "Retries exhausted.", illegalStateException, this.errorContext);
        });
    }

    String getPartitionId() {
        return this.partitionId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getBufferedEventCount() {
        return ((Integer) this.eventSink.scanOrDefault(Scannable.Attr.BUFFERED, 0)).intValue() + this.eventDataAggregator.getNumberOfEvents();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> flush() {
        return this.publishResultSubscriber.startFlush();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isClosed.getAndSet(true)) {
            return;
        }
        try {
            this.publishResultSubscriber.startFlush().block(this.retryOptions.getTryTimeout());
        } catch (IllegalStateException e) {
            this.logger.info("Timed out waiting for flush to complete.", new Object[]{e});
        } finally {
            this.publishSubscription.dispose();
            this.client.close();
        }
    }

    private Flux<PublishResult> publishEvents(Flux<EventDataBatch> flux) {
        return flux.flatMap(eventDataBatch -> {
            return eventDataBatch == EventDataBatch.EMPTY ? Mono.just(PublishResult.EMPTY) : this.client.send(eventDataBatch).thenReturn(new PublishResult(eventDataBatch.getEvents(), null)).onErrorResume(th -> {
                return Mono.just(new PublishResult(eventDataBatch.getEvents(), th));
            });
        }, 1, 1);
    }

    private EventDataBatch createNewBatch() {
        try {
            return (EventDataBatch) this.client.createBatch(this.createBatchOptions).toFuture().get();
        } catch (InterruptedException e) {
            throw this.logger.logExceptionAsError(new UncheckedExecutionException(e));
        } catch (ExecutionException e2) {
            throw this.logger.logExceptionAsError(new UncheckedExecutionException(e2));
        }
    }
}
