package com.azure.messaging.eventhubs.implementation.instrumentation;

import com.azure.core.util.Context;
import com.azure.core.util.tracing.SpanKind;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import java.util.List;
import java.util.Objects;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/messaging/eventhubs/implementation/instrumentation/InstrumentedCheckpointStore.class */
public final class InstrumentedCheckpointStore implements CheckpointStore {
    private final CheckpointStore checkpointStore;
    private final EventHubsConsumerInstrumentation instrumentation;
    private final EventHubsTracer tracer;

    private InstrumentedCheckpointStore(CheckpointStore checkpointStore, EventHubsConsumerInstrumentation eventHubsConsumerInstrumentation) {
        this.checkpointStore = checkpointStore;
        this.instrumentation = eventHubsConsumerInstrumentation;
        this.tracer = eventHubsConsumerInstrumentation.getTracer();
    }

    public static CheckpointStore create(CheckpointStore checkpointStore, EventHubsConsumerInstrumentation eventHubsConsumerInstrumentation) {
        return !eventHubsConsumerInstrumentation.isEnabled() ? checkpointStore : new InstrumentedCheckpointStore(checkpointStore, eventHubsConsumerInstrumentation);
    }

    @Override // com.azure.messaging.eventhubs.CheckpointStore
    public Flux<PartitionOwnership> listOwnership(String str, String str2, String str3) {
        return this.checkpointStore.listOwnership(str, str2, str3);
    }

    @Override // com.azure.messaging.eventhubs.CheckpointStore
    public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> list) {
        return this.checkpointStore.claimOwnership(list);
    }

    @Override // com.azure.messaging.eventhubs.CheckpointStore
    public Flux<Checkpoint> listCheckpoints(String str, String str2, String str3) {
        return this.checkpointStore.listCheckpoints(str, str2, str3);
    }

    @Override // com.azure.messaging.eventhubs.CheckpointStore
    public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
        return Mono.using(() -> {
            return this.instrumentation.createScope((eventHubsMetricsProvider, instrumentationScope) -> {
                eventHubsMetricsProvider.reportCheckpoint(checkpoint, instrumentationScope);
            }).setSpan(startSpan(checkpoint.getPartitionId()));
        }, instrumentationScope -> {
            Mono<Void> updateCheckpoint = this.checkpointStore.updateCheckpoint(checkpoint);
            Objects.requireNonNull(instrumentationScope);
            Mono doOnError = updateCheckpoint.doOnError(instrumentationScope::setError);
            Objects.requireNonNull(instrumentationScope);
            return doOnError.doOnCancel(instrumentationScope::setCancelled).contextWrite(context -> {
                return context.putAllMap(instrumentationScope.getSpan().getValues());
            });
        }, (v0) -> {
            v0.close();
        });
    }

    private Context startSpan(String str) {
        return this.tracer.isEnabled() ? this.tracer.startSpan(OperationName.CHECKPOINT, this.tracer.createStartOptions(SpanKind.INTERNAL, OperationName.CHECKPOINT, str), Context.NONE) : Context.NONE;
    }
}
