package com.azure.cosmos.util;

import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.IterableStream;
import com.azure.core.util.paging.ContinuablePagedFlux;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
import com.azure.cosmos.implementation.DiagnosticsProvider;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.models.FeedResponse;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;

/* loaded from: input_file:com/azure/cosmos/util/CosmosPagedFlux.class */
public final class CosmosPagedFlux<T> extends ContinuablePagedFlux<String, T, FeedResponse<T>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CosmosPagedFlux.class);
    private static final ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor cosmosDiagnosticsAccessor = ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor();
    private static final ImplementationBridgeHelpers.CosmosDiagnosticsContextHelper.CosmosDiagnosticsContextAccessor ctxAccessor = ImplementationBridgeHelpers.CosmosDiagnosticsContextHelper.getCosmosDiagnosticsContextAccessor();
    private final Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction;
    private final Consumer<FeedResponse<T>> feedResponseConsumer;
    private final int defaultPageSize;

    /* renamed from: com.azure.cosmos.util.CosmosPagedFlux$1, reason: invalid class name */
    /* loaded from: input_file:com/azure/cosmos/util/CosmosPagedFlux$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$reactor$core$publisher$SignalType = new int[SignalType.values().length];

        static {
            try {
                $SwitchMap$reactor$core$publisher$SignalType[SignalType.ON_COMPLETE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$reactor$core$publisher$SignalType[SignalType.ON_NEXT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$reactor$core$publisher$SignalType[SignalType.ON_ERROR.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> function) {
        this(function, null, -1);
    }

    CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> function, Consumer<FeedResponse<T>> consumer) {
        this(function, consumer, -1);
    }

    CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> function, Consumer<FeedResponse<T>> consumer, int i) {
        this.optionsFluxFunction = function;
        this.feedResponseConsumer = consumer;
        this.defaultPageSize = i;
    }

    public CosmosPagedFlux<T> handle(Consumer<FeedResponse<T>> consumer) {
        return this.feedResponseConsumer != null ? new CosmosPagedFlux<>(this.optionsFluxFunction, this.feedResponseConsumer.andThen(consumer)) : new CosmosPagedFlux<>(this.optionsFluxFunction, consumer);
    }

    public Flux<FeedResponse<T>> byPage() {
        CosmosPagedFluxOptions createCosmosPagedFluxOptions = createCosmosPagedFluxOptions();
        return FluxUtil.fluxContext(context -> {
            return byPage(createCosmosPagedFluxOptions, context);
        });
    }

    public Flux<FeedResponse<T>> byPage(String str) {
        CosmosPagedFluxOptions createCosmosPagedFluxOptions = createCosmosPagedFluxOptions();
        createCosmosPagedFluxOptions.setRequestContinuation(str);
        return FluxUtil.fluxContext(context -> {
            return byPage(createCosmosPagedFluxOptions, context);
        });
    }

    public Flux<FeedResponse<T>> byPage(int i) {
        CosmosPagedFluxOptions createCosmosPagedFluxOptions = createCosmosPagedFluxOptions();
        createCosmosPagedFluxOptions.setMaxItemCount(Integer.valueOf(i));
        return FluxUtil.fluxContext(context -> {
            return byPage(createCosmosPagedFluxOptions, context);
        });
    }

    public Flux<FeedResponse<T>> byPage(String str, int i) {
        CosmosPagedFluxOptions createCosmosPagedFluxOptions = createCosmosPagedFluxOptions();
        createCosmosPagedFluxOptions.setRequestContinuation(str);
        createCosmosPagedFluxOptions.setMaxItemCount(Integer.valueOf(i));
        return FluxUtil.fluxContext(context -> {
            return byPage(createCosmosPagedFluxOptions, context);
        });
    }

    public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
        byPage().flatMap(feedResponse -> {
            IterableStream<T> elements = feedResponse.getElements();
            return elements == null ? Flux.empty() : Flux.fromIterable(elements);
        }).subscribe(coreSubscriber);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CosmosPagedFlux<T> withDefaultPageSize(int i) {
        return new CosmosPagedFlux<>(this.optionsFluxFunction, this.feedResponseConsumer, i);
    }

    private CosmosPagedFluxOptions createCosmosPagedFluxOptions() {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
        if (this.defaultPageSize > 0) {
            cosmosPagedFluxOptions.setMaxItemCount(Integer.valueOf(this.defaultPageSize));
        }
        return cosmosPagedFluxOptions;
    }

    private <TOutput> Flux<TOutput> wrapWithTracingIfEnabled(CosmosPagedFluxOptions cosmosPagedFluxOptions, Flux<TOutput> flux) {
        DiagnosticsProvider diagnosticsProvider = cosmosPagedFluxOptions.getDiagnosticsProvider();
        return (diagnosticsProvider == null || !diagnosticsProvider.isEnabled()) ? flux : diagnosticsProvider.runUnderSpanInContext(flux);
    }

    private void recordFeedResponse(Context context, DiagnosticsProvider diagnosticsProvider, FeedResponse<T> feedResponse, AtomicLong atomicLong) {
        CosmosDiagnostics cosmosDiagnostics = feedResponse != null ? feedResponse.getCosmosDiagnostics() : null;
        Integer valueOf = (feedResponse == null || feedResponse.getResults() == null) ? null : Integer.valueOf(feedResponse.getResults().size());
        if (cosmosDiagnostics == null || !cosmosDiagnosticsAccessor.isDiagnosticsCapturedInPagedFlux(cosmosDiagnostics).compareAndSet(false, true)) {
            return;
        }
        if (isTracerEnabled(diagnosticsProvider)) {
            diagnosticsProvider.recordPage(context, feedResponse != null ? feedResponse.getCosmosDiagnostics() : null, valueOf, feedResponse != null ? Double.valueOf(feedResponse.getRequestCharge()) : null);
        }
        if (this.feedResponseConsumer != null) {
            Instant now = Instant.now();
            this.feedResponseConsumer.accept(feedResponse);
            atomicLong.addAndGet(Duration.between(Instant.now(), now).toNanos());
        }
    }

    private Flux<FeedResponse<T>> byPage(CosmosPagedFluxOptions cosmosPagedFluxOptions, Context context) {
        AtomicReference atomicReference = new AtomicReference();
        AtomicLong atomicLong = new AtomicLong(0L);
        Flux<FeedResponse<T>> doOnEach = wrapWithTracingIfEnabled(cosmosPagedFluxOptions, this.optionsFluxFunction.apply(cosmosPagedFluxOptions)).doOnSubscribe(subscription -> {
            atomicReference.set(Instant.now());
            atomicLong.set(0L);
        }).doOnEach(signal -> {
            FeedResponse<T> feedResponse = (FeedResponse) signal.get();
            Context contextFromReactorOrNull = DiagnosticsProvider.getContextFromReactorOrNull(signal.getContextView());
            DiagnosticsProvider diagnosticsProvider = cosmosPagedFluxOptions.getDiagnosticsProvider();
            switch (AnonymousClass1.$SwitchMap$reactor$core$publisher$SignalType[signal.getType().ordinal()]) {
                case 1:
                    recordFeedResponse(contextFromReactorOrNull, diagnosticsProvider, feedResponse, atomicLong);
                    if (isTracerEnabled(diagnosticsProvider)) {
                        diagnosticsProvider.recordFeedResponseConsumerLatency(signal, Duration.ofNanos(atomicLong.get()));
                        diagnosticsProvider.endSpan(contextFromReactorOrNull);
                        return;
                    }
                    return;
                case 2:
                    recordFeedResponse(contextFromReactorOrNull, diagnosticsProvider, feedResponse, atomicLong);
                    return;
                case 3:
                    if (isTracerEnabled(diagnosticsProvider)) {
                        diagnosticsProvider.recordFeedResponseConsumerLatency(signal, Duration.ofNanos(atomicLong.get()));
                        diagnosticsProvider.endSpan(contextFromReactorOrNull, signal.getThrowable());
                        return;
                    }
                    return;
                default:
                    return;
            }
        });
        DiagnosticsProvider diagnosticsProvider = cosmosPagedFluxOptions.getDiagnosticsProvider();
        if (!isTracerEnabled(diagnosticsProvider)) {
            return doOnEach;
        }
        return Flux.deferContextual(contextView -> {
            return doOnEach.doOnCancel(() -> {
                diagnosticsProvider.endSpan(DiagnosticsProvider.getContextFromReactorOrNull(contextView));
            }).doOnComplete(() -> {
                diagnosticsProvider.endSpan(DiagnosticsProvider.getContextFromReactorOrNull(contextView));
            });
        }).contextWrite(DiagnosticsProvider.setContextInReactor(cosmosPagedFluxOptions.getDiagnosticsProvider().startSpan(cosmosPagedFluxOptions.getSpanName(), ctxAccessor.create(cosmosPagedFluxOptions.getSpanName(), cosmosPagedFluxOptions.getAccountTag(), BridgeInternal.getServiceEndpoint(cosmosPagedFluxOptions.getCosmosAsyncClient()), cosmosPagedFluxOptions.getDatabaseId(), cosmosPagedFluxOptions.getContainerId(), cosmosPagedFluxOptions.getResourceType(), cosmosPagedFluxOptions.getOperationType(), cosmosPagedFluxOptions.getOperationId(), cosmosPagedFluxOptions.getEffectiveConsistencyLevel(), cosmosPagedFluxOptions.getMaxItemCount(), cosmosPagedFluxOptions.getDiagnosticsThresholds(), null), context)));
    }

    private boolean isTracerEnabled(DiagnosticsProvider diagnosticsProvider) {
        return diagnosticsProvider != null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void initialize() {
        ImplementationBridgeHelpers.CosmosPageFluxHelper.setCosmosPageFluxAccessor(CosmosPagedFlux::new);
    }

    static {
        initialize();
    }
}
