/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.util;

import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.cosmos.CosmosDiagnosticsContext;
import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
import com.azure.cosmos.implementation.DiagnosticsProvider;
import com.azure.cosmos.implementation.FeedOperationState;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.CosmosPagedFluxStaticListImpl;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.util.context.ContextView;

final class CosmosPagedFluxDefaultImpl<T>
extends CosmosPagedFlux<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CosmosPagedFluxStaticListImpl.class);
    private static final ImplementationBridgeHelpers.CosmosDiagnosticsContextHelper.CosmosDiagnosticsContextAccessor ctxAccessor = ImplementationBridgeHelpers.CosmosDiagnosticsContextHelper.getCosmosDiagnosticsContextAccessor();
    private final Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction;
    private final AtomicReference<Consumer<FeedResponse<T>>> feedResponseConsumer;
    private final AtomicInteger defaultPageSize;

    CosmosPagedFluxDefaultImpl(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction) {
        this(optionsFluxFunction, null, -1);
    }

    CosmosPagedFluxDefaultImpl(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction, Consumer<FeedResponse<T>> feedResponseConsumer) {
        this(optionsFluxFunction, feedResponseConsumer, -1);
    }

    CosmosPagedFluxDefaultImpl(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction, Consumer<FeedResponse<T>> feedResponseConsumer, int defaultPageSize) {
        this.optionsFluxFunction = optionsFluxFunction;
        this.feedResponseConsumer = new AtomicReference<Consumer<FeedResponse<T>>>(feedResponseConsumer);
        this.defaultPageSize = new AtomicInteger(defaultPageSize);
    }

    @Override
    public CosmosPagedFlux<T> handle(Consumer<FeedResponse<T>> newFeedResponseConsumer) {
        int i = 0;
        while (true) {
            Consumer<FeedResponse<FeedResponse<T>>> feedResponseConsumerSnapshot = this.feedResponseConsumer.get();
            ++i;
            if (feedResponseConsumerSnapshot == null ? this.feedResponseConsumer.compareAndSet(null, newFeedResponseConsumer) : this.feedResponseConsumer.compareAndSet(feedResponseConsumerSnapshot, feedResponseConsumerSnapshot.andThen(newFeedResponseConsumer))) break;
            if (i <= 10) continue;
            LOGGER.warn("Highly concurrent calls to CosmosPagedFlux.handle are not expected and can result in perf regressions. Avoid this by reducing concurrency.");
        }
        return this;
    }

    @Override
    CosmosPagedFlux<T> withDefaultPageSize(int pageSize) {
        this.defaultPageSize.set(pageSize);
        return this;
    }

    @Override
    public Flux<FeedResponse<T>> byPage() {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = this.createCosmosPagedFluxOptions();
        return FluxUtil.fluxContext(context -> this.byPage(cosmosPagedFluxOptions, (Context)context));
    }

    @Override
    public Flux<FeedResponse<T>> byPage(String continuationToken) {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = this.createCosmosPagedFluxOptions();
        cosmosPagedFluxOptions.setRequestContinuation(continuationToken);
        return FluxUtil.fluxContext(context -> this.byPage(cosmosPagedFluxOptions, (Context)context));
    }

    @Override
    public Flux<FeedResponse<T>> byPage(int preferredPageSize) {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = this.createCosmosPagedFluxOptions();
        cosmosPagedFluxOptions.setMaxItemCount(preferredPageSize);
        return FluxUtil.fluxContext(context -> this.byPage(cosmosPagedFluxOptions, (Context)context));
    }

    @Override
    public Flux<FeedResponse<T>> byPage(String continuationToken, int preferredPageSize) {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = this.createCosmosPagedFluxOptions();
        cosmosPagedFluxOptions.setRequestContinuation(continuationToken);
        cosmosPagedFluxOptions.setMaxItemCount(preferredPageSize);
        return FluxUtil.fluxContext(context -> this.byPage(cosmosPagedFluxOptions, (Context)context));
    }

    private CosmosPagedFluxOptions createCosmosPagedFluxOptions() {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
        int defaultPageSizeSnapshot = this.defaultPageSize.get();
        if (defaultPageSizeSnapshot > 0) {
            cosmosPagedFluxOptions.setMaxItemCount(defaultPageSizeSnapshot);
        }
        return cosmosPagedFluxOptions;
    }

    private Flux<FeedResponse<T>> wrapWithTracingIfEnabled(CosmosPagedFluxOptions pagedFluxOptions, Flux<FeedResponse<T>> publisher, AtomicLong feedResponseConsumerLatencyInNanos, Context context) {
        FeedOperationState state = pagedFluxOptions.getFeedOperationState();
        DiagnosticsProvider tracerProvider = state != null ? state.getDiagnosticsProvider() : null;
        Object lockHolder = new Object();
        if (tracerProvider == null) {
            return publisher.doOnEach(signal -> {
                FeedResponse response = (FeedResponse)signal.get();
                Object object = lockHolder;
                synchronized (object) {
                    switch (signal.getType()) {
                        case ON_COMPLETE: 
                        case ON_NEXT: {
                            DiagnosticsProvider.recordFeedResponse(this.feedResponseConsumer.get(), pagedFluxOptions.getFeedOperationState(), () -> pagedFluxOptions.getSamplingRateSnapshot(), tracerProvider, response, feedResponseConsumerLatencyInNanos);
                            break;
                        }
                    }
                }
            });
        }
        if (!tracerProvider.isEnabled()) {
            pagedFluxOptions.setSamplingRateSnapshot(0.0, true);
        }
        boolean isSampledOut = tracerProvider.shouldSampleOutOperation(pagedFluxOptions);
        double samplingRateSnapshot = pagedFluxOptions.getSamplingRateSnapshot();
        Flux result = tracerProvider.runUnderSpanInContext(publisher).doOnEach(signal -> {
            FeedResponse response = (FeedResponse)signal.get();
            Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(signal.getContextView());
            Object object = lockHolder;
            synchronized (object) {
                switch (signal.getType()) {
                    case ON_COMPLETE: {
                        if (response != null) {
                            DiagnosticsProvider.recordFeedResponse(this.feedResponseConsumer.get(), pagedFluxOptions.getFeedOperationState(), () -> pagedFluxOptions.getSamplingRateSnapshot(), tracerProvider, response, feedResponseConsumerLatencyInNanos);
                        }
                        state.mergeDiagnosticsContext();
                        CosmosDiagnosticsContext ctxSnapshot = state.getDiagnosticsContextSnapshot();
                        ctxAccessor.setSamplingRateSnapshot(ctxSnapshot, samplingRateSnapshot, isSampledOut);
                        tracerProvider.recordFeedResponseConsumerLatency(signal, ctxSnapshot, Duration.ofNanos(feedResponseConsumerLatencyInNanos.get()));
                        tracerProvider.endSpan(ctxSnapshot, traceCtx, ctxAccessor.isEmptyCompletion(ctxSnapshot), isSampledOut);
                        break;
                    }
                    case ON_NEXT: {
                        DiagnosticsProvider.recordFeedResponse(this.feedResponseConsumer.get(), pagedFluxOptions.getFeedOperationState(), () -> pagedFluxOptions.getSamplingRateSnapshot(), tracerProvider, response, feedResponseConsumerLatencyInNanos);
                        state.mergeDiagnosticsContext();
                        CosmosDiagnosticsContext ctxSnapshotOnNext = state.getDiagnosticsContextSnapshot();
                        ctxAccessor.setSamplingRateSnapshot(ctxSnapshotOnNext, samplingRateSnapshot, isSampledOut);
                        tracerProvider.endSpan(ctxSnapshotOnNext, traceCtx, false, isSampledOut);
                        state.resetDiagnosticsContext();
                        DiagnosticsProvider.setContextInReactor(tracerProvider.startSpan(state.getSpanName(), state.getDiagnosticsContextSnapshot(), traceCtx, isSampledOut));
                        break;
                    }
                    case ON_ERROR: {
                        state.mergeDiagnosticsContext();
                        CosmosDiagnosticsContext ctxSnapshotOnError = state.getDiagnosticsContextSnapshot();
                        ctxAccessor.setSamplingRateSnapshot(ctxSnapshotOnError, samplingRateSnapshot, isSampledOut);
                        tracerProvider.recordFeedResponseConsumerLatency(signal, ctxSnapshotOnError, Duration.ofNanos(feedResponseConsumerLatencyInNanos.get()));
                        tracerProvider.endSpan(state.getDiagnosticsContextSnapshot(), traceCtx, signal.getThrowable(), isSampledOut);
                        break;
                    }
                }
            }
        });
        return Flux.deferContextual(reactorCtx -> result.doOnCancel(() -> {
            Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(reactorCtx);
            Object object = lockHolder;
            synchronized (object) {
                state.mergeDiagnosticsContext();
                CosmosDiagnosticsContext ctxSnapshot = state.getDiagnosticsContextSnapshot();
                ctxAccessor.setSamplingRateSnapshot(ctxSnapshot, samplingRateSnapshot, isSampledOut);
                tracerProvider.endSpan(ctxSnapshot, traceCtx, false, isSampledOut);
            }
        }).doOnComplete(() -> {
            Context traceCtx = DiagnosticsProvider.getContextFromReactorOrNull(reactorCtx);
            Object object = lockHolder;
            synchronized (object) {
                state.mergeDiagnosticsContext();
                CosmosDiagnosticsContext ctxSnapshot = state.getDiagnosticsContextSnapshot();
                ctxAccessor.setSamplingRateSnapshot(ctxSnapshot, samplingRateSnapshot, isSampledOut);
                tracerProvider.endSpan(ctxSnapshot, traceCtx, ctxAccessor.isEmptyCompletion(ctxSnapshot), isSampledOut);
            }
        })).contextWrite((ContextView)DiagnosticsProvider.setContextInReactor(tracerProvider.startSpan(state.getSpanName(), state.getDiagnosticsContextSnapshot(), context, isSampledOut)));
    }

    private Flux<FeedResponse<T>> byPage(CosmosPagedFluxOptions pagedFluxOptions, Context context) {
        AtomicReference startTime = new AtomicReference();
        AtomicLong feedResponseConsumerLatencyInNanos = new AtomicLong(0L);
        Flux result = this.wrapWithTracingIfEnabled(pagedFluxOptions, this.optionsFluxFunction.apply(pagedFluxOptions), feedResponseConsumerLatencyInNanos, context).doOnSubscribe(ignoredValue -> {
            startTime.set(Instant.now());
            feedResponseConsumerLatencyInNanos.set(0L);
        });
        return result;
    }

    static void initialize() {
        ImplementationBridgeHelpers.CosmosPageFluxHelper.setCosmosPageFluxAccessor(CosmosPagedFluxDefaultImpl::new);
    }

    static {
        CosmosPagedFluxDefaultImpl.initialize();
    }
}

