package com.azure.messaging.eventhubs;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.implementation.UncheckedExecutionException;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import com.azure.messaging.eventhubs.models.SendBatchFailedContext;
import com.azure.messaging.eventhubs.models.SendBatchSucceededContext;
import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.class */
public class EventHubBufferedPartitionProducer implements Closeable {
    private static final ClientLogger LOGGER = new ClientLogger(EventHubBufferedPartitionProducer.class);
    private final AmqpRetryOptions retryOptions;
    private final EventHubProducerAsyncClient client;
    private final String partitionId;
    private final AmqpErrorContext errorContext;
    private final Disposable publishSubscription;
    private final Sinks.Many<EventData> eventSink;
    private final CreateBatchOptions createBatchOptions;
    private final Queue<EventData> eventQueue;
    private final PublishResultSubscriber publishResultSubscriber;
    private final EventHubsTracer tracer;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final AtomicBoolean isFlushing = new AtomicBoolean(false);
    private final Semaphore flushSemaphore = new Semaphore(1);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer$PublishResult.class */
    public static class PublishResult {
        private final EventDataBatch batch;
        private final Throwable error;

        PublishResult(EventDataBatch eventDataBatch, Throwable th) {
            this.batch = eventDataBatch;
            this.error = th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer$PublishResultSubscriber.class */
    public static class PublishResultSubscriber extends BaseSubscriber<PublishResult> {
        private final String partitionId;
        private final Consumer<SendBatchSucceededContext> onSucceed;
        private final Consumer<SendBatchFailedContext> onFailed;
        private final Queue<EventData> dataQueue;
        private final Duration operationTimeout;
        private final ClientLogger logger;
        private final AtomicBoolean isFlushing;
        private final Semaphore flushSemaphore;
        private MonoSink<Void> flushSink;

        PublishResultSubscriber(String str, Consumer<SendBatchSucceededContext> consumer, Consumer<SendBatchFailedContext> consumer2, Queue<EventData> queue, Semaphore semaphore, AtomicBoolean atomicBoolean, Duration duration, ClientLogger clientLogger) {
            this.partitionId = str;
            this.onSucceed = consumer;
            this.onFailed = consumer2;
            this.dataQueue = queue;
            this.flushSemaphore = semaphore;
            this.isFlushing = atomicBoolean;
            this.operationTimeout = duration;
            this.logger = clientLogger;
        }

        protected void hookOnSubscribe(Subscription subscription) {
            requestUnbounded();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void hookOnNext(PublishResult publishResult) {
            if (publishResult.error == null) {
                this.onSucceed.accept(new SendBatchSucceededContext(publishResult.batch.getEvents(), this.partitionId));
            } else {
                this.onFailed.accept(new SendBatchFailedContext(publishResult.batch.getEvents(), this.partitionId, publishResult.error));
            }
            tryCompleteFlush();
        }

        protected void hookOnError(Throwable th) {
            this.logger.atError().addKeyValue(ClientConstants.PARTITION_ID_KEY, this.partitionId).log("Publishing subscription completed and ended in an error.", new Object[]{th});
            this.onFailed.accept(new SendBatchFailedContext(null, this.partitionId, th));
            tryCompleteFlush();
        }

        protected void hookOnComplete() {
            this.logger.atInfo().addKeyValue(ClientConstants.PARTITION_ID_KEY, this.partitionId).log("Publishing subscription completed. Clearing rest of queue.");
            ArrayList arrayList = new ArrayList(this.dataQueue);
            this.dataQueue.clear();
            this.onFailed.accept(new SendBatchFailedContext(arrayList, this.partitionId, null));
            tryCompleteFlush();
        }

        Mono<Void> startFlush() {
            return Mono.create(monoSink -> {
                if (!this.isFlushing.compareAndSet(false, true)) {
                    this.logger.atInfo().addKeyValue(ClientConstants.PARTITION_ID_KEY, this.partitionId).log("Flush operation already in progress.");
                    monoSink.success();
                    return;
                }
                this.flushSink = monoSink;
                try {
                    if (!this.flushSemaphore.tryAcquire(this.operationTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
                        monoSink.error(new TimeoutException("Unable to acquire flush semaphore to begin timeout operation."));
                    }
                    tryCompleteFlush();
                } catch (InterruptedException e) {
                    this.logger.atWarning().addKeyValue(ClientConstants.PARTITION_ID_KEY, this.partitionId).log("Unable to acquire flush semaphore.");
                    monoSink.error(e);
                }
            });
        }

        private void tryCompleteFlush() {
            if (this.isFlushing.get()) {
                if (!this.dataQueue.isEmpty()) {
                    this.logger.atVerbose().addKeyValue(ClientConstants.PARTITION_ID_KEY, this.partitionId).log("Data queue is not empty. Not completing flush.");
                    return;
                }
                this.logger.atVerbose().addKeyValue(ClientConstants.PARTITION_ID_KEY, this.partitionId).log("Completing flush operation.");
                if (this.flushSemaphore != null) {
                    this.flushSemaphore.release();
                }
                this.isFlushing.compareAndSet(true, false);
                this.flushSink.success();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventHubBufferedPartitionProducer(EventHubProducerAsyncClient eventHubProducerAsyncClient, String str, EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions bufferedProducerClientOptions, AmqpRetryOptions amqpRetryOptions, Sinks.Many<EventData> many, Queue<EventData> queue, Tracer tracer) {
        this.client = eventHubProducerAsyncClient;
        this.partitionId = str;
        this.errorContext = new AmqpErrorContext(eventHubProducerAsyncClient.getFullyQualifiedNamespace());
        this.createBatchOptions = new CreateBatchOptions().setPartitionId(str);
        this.retryOptions = amqpRetryOptions;
        this.eventSink = many;
        this.eventQueue = queue;
        EventDataAggregator eventDataAggregator = new EventDataAggregator(many.asFlux(), this::createNewBatch, eventHubProducerAsyncClient.getFullyQualifiedNamespace(), bufferedProducerClientOptions, str);
        this.publishResultSubscriber = new PublishResultSubscriber(str, bufferedProducerClientOptions.getSendSucceededContext(), bufferedProducerClientOptions.getSendFailedContext(), queue, this.flushSemaphore, this.isFlushing, amqpRetryOptions.getTryTimeout(), LOGGER);
        this.publishSubscription = publishEvents(eventDataAggregator).publishOn(Schedulers.boundedElastic(), 1).subscribeWith(this.publishResultSubscriber);
        this.tracer = new EventHubsTracer(tracer, eventHubProducerAsyncClient.getFullyQualifiedNamespace(), eventHubProducerAsyncClient.getEventHubName());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> enqueueEvent(EventData eventData) {
        return RetryUtil.withRetry(Mono.create(monoSink -> {
            if (this.isClosed.get()) {
                monoSink.error(new IllegalStateException(String.format("Partition publisher id[%s] is already closed. Cannot enqueue more events.", this.partitionId)));
                return;
            }
            try {
                if (this.isFlushing.get() && !this.flushSemaphore.tryAcquire(this.retryOptions.getTryTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
                    monoSink.error(new TimeoutException("Timed out waiting for flush operation to complete."));
                    return;
                }
                if (this.isClosed.get()) {
                    monoSink.error(new IllegalStateException(String.format("Partition publisher id[%s] was closed between flushing events and now. Cannot enqueue events.", this.partitionId)));
                    return;
                }
                this.tracer.reportMessageSpan(eventData, eventData.getContext());
                Sinks.EmitResult tryEmitNext = this.eventSink.tryEmitNext(eventData);
                if (tryEmitNext.isSuccess()) {
                    monoSink.success();
                    return;
                }
                if (tryEmitNext == Sinks.EmitResult.FAIL_NON_SERIALIZED || tryEmitNext == Sinks.EmitResult.FAIL_OVERFLOW) {
                    LOGGER.atInfo().addKeyValue(ClientConstants.PARTITION_ID_KEY, this.partitionId).addKeyValue(ClientConstants.EMIT_RESULT_KEY, tryEmitNext).log("Event could not be published downstream. Applying retry.");
                    monoSink.error(new AmqpException(true, tryEmitNext + " occurred.", this.errorContext));
                } else {
                    LOGGER.atWarning().addKeyValue(ClientConstants.EMIT_RESULT_KEY, tryEmitNext).log("Event could not be published downstream. Not retrying.", new Object[]{tryEmitNext});
                    monoSink.error(new AmqpException(false, "Unable to buffer message for partition: " + getPartitionId(), this.errorContext));
                }
            } catch (InterruptedException e) {
                monoSink.error(new TimeoutException("Unable to acquire flush semaphore due to interrupted exception."));
            }
        }), this.retryOptions, "Timed out trying to enqueue event data.", true).onErrorMap(IllegalStateException.class, illegalStateException -> {
            return new AmqpException(true, "Retries exhausted.", illegalStateException, this.errorContext);
        });
    }

    String getPartitionId() {
        return this.partitionId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getBufferedEventCount() {
        return this.eventQueue.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> flush() {
        return this.publishResultSubscriber.startFlush();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isClosed.getAndSet(true)) {
            return;
        }
        try {
            this.publishResultSubscriber.startFlush().block(this.retryOptions.getTryTimeout());
        } catch (IllegalStateException e) {
            LOGGER.info("Timed out waiting for flush to complete.", new Object[]{e});
        } finally {
            this.publishSubscription.dispose();
            this.client.close();
        }
    }

    private Flux<PublishResult> publishEvents(Flux<EventDataBatch> flux) {
        return flux.flatMap(eventDataBatch -> {
            return this.client.send(eventDataBatch).thenReturn(new PublishResult(eventDataBatch, null)).onErrorResume(th -> {
                return Mono.just(new PublishResult(eventDataBatch, th));
            });
        }, 1, 1);
    }

    private EventDataBatch createNewBatch() {
        try {
            return (EventDataBatch) this.client.createBatch(this.createBatchOptions).toFuture().get();
        } catch (InterruptedException e) {
            throw LOGGER.logExceptionAsError(new UncheckedExecutionException(e));
        } catch (ExecutionException e2) {
            throw LOGGER.logExceptionAsError(new UncheckedExecutionException(e2));
        }
    }
}
