/*
 * Decompiled with CFR 0.152.
 */
package com.palantir.dialogue.core;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.palantir.dialogue.Channel;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import com.palantir.dialogue.core.Config;
import com.palantir.dialogue.core.DialogueClientMetrics;
import com.palantir.dialogue.core.ImmutableDeferredCall;
import com.palantir.dialogue.core.LimitedChannel;
import com.palantir.dialogue.core.NeverThrowLimitedChannel;
import com.palantir.dialogue.futures.DialogueFutures;
import com.palantir.logsafe.Arg;
import com.palantir.logsafe.Safe;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.tracing.CloseableSpan;
import com.palantir.tracing.DetachedSpan;
import com.palantir.tracing.TagTranslator;
import java.util.Deque;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.immutables.value.Value;
import org.jspecify.annotations.Nullable;

final class QueuedChannel
implements Channel {
    private static final SafeLogger log = SafeLoggerFactory.get(QueuedChannel.class);
    private final Deque<DeferredCall> queuedCalls;
    private final NeverThrowLimitedChannel delegate;
    @Safe
    private final String channelName;
    @Safe
    private final String queueType;
    private final AtomicInteger queueSizeEstimate = new AtomicInteger(0);
    private final int maxQueueSize;
    private final Supplier<Counter> queueSizeCounter;
    private final Timer queuedTime;
    private final Supplier<ListenableFuture<Response>> limitedResultSupplier;
    private volatile boolean shouldRecordQueueMetrics;
    private final AtomicInteger inFlight = new AtomicInteger();

    QueuedChannel(LimitedChannel delegate, @Safe String channelName, @Safe String queueType, QueuedChannelInstrumentation metrics, int maxQueueSize) {
        this.delegate = new NeverThrowLimitedChannel(delegate);
        this.channelName = channelName;
        this.queueType = queueType;
        this.queuedCalls = new ProtectedConcurrentLinkedDeque<DeferredCall>();
        this.maxQueueSize = maxQueueSize;
        this.queueSizeCounter = Suppliers.memoize(metrics::requestsQueued);
        this.queuedTime = metrics.requestQueuedTime();
        this.limitedResultSupplier = () -> Futures.immediateFailedFuture((Throwable)new SafeRuntimeException("Unable to make a request (queue is full)", new Arg[]{SafeArg.of((String)"maxQueueSize", (Object)maxQueueSize)}));
    }

    static QueuedChannel createForSticky(String channelName, int maxQueueSize, QueuedChannelInstrumentation queuedChannelInstrumentation, LimitedChannel delegate) {
        return new QueuedChannel(delegate, channelName, "sticky", queuedChannelInstrumentation, maxQueueSize);
    }

    static QueuedChannel create(Config cf, LimitedChannel delegate) {
        return new QueuedChannel(delegate, cf.channelName(), "channel", QueuedChannel.channelInstrumentation(DialogueClientMetrics.of(cf.clientConf().taggedMetricRegistry()), cf.channelName()), cf.maxQueueSize());
    }

    static QueuedChannel create(Config cf, Endpoint endpoint, LimitedChannel delegate) {
        return new QueuedChannel(delegate, cf.channelName(), "endpoint", QueuedChannel.endpointInstrumentation(DialogueClientMetrics.of(cf.clientConf().taggedMetricRegistry()), cf.channelName(), endpoint.serviceName(), endpoint.endpointName()), cf.maxQueueSize());
    }

    public ListenableFuture<Response> execute(Endpoint endpoint, Request request) {
        return this.maybeExecute(endpoint, request).orElseGet(this.limitedResultSupplier);
    }

    @VisibleForTesting
    Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request request) {
        if (this.queueSizeEstimate.get() <= 0) {
            LimitedChannel.LimitEnforcement limitEnforcement = this.limitEnforcement();
            Optional<ListenableFuture<Response>> maybeResult = this.delegate.maybeExecute(endpoint, request, limitEnforcement);
            if (maybeResult.isPresent()) {
                this.inFlight.incrementAndGet();
                ListenableFuture<Response> result = maybeResult.get();
                DialogueFutures.addDirectListener(result, this::onCompletion);
                if (this.shouldRecordQueueMetrics) {
                    this.queuedTime.update(0L, TimeUnit.NANOSECONDS);
                }
                return maybeResult;
            }
            if (!limitEnforcement.enforceLimits()) {
                return Optional.of(Futures.immediateFailedFuture((Throwable)this.limitEnforcementExpectationFailure(endpoint)));
            }
        }
        if (this.queueSizeEstimate.get() >= this.maxQueueSize) {
            return Optional.empty();
        }
        this.shouldRecordQueueMetrics = true;
        ImmutableDeferredCall components = DeferredCall.builder().endpoint(endpoint).request(request).response((SettableFuture<Response>)SettableFuture.create()).span(DetachedSpan.start((String)"Dialogue-request-enqueued")).timer(this.queuedTime.time()).build();
        if (!this.queuedCalls.offer(components)) {
            return Optional.empty();
        }
        int newSize = this.incrementQueueSize();
        if (log.isDebugEnabled()) {
            log.debug("Request queued {} on channel {}", (Arg)SafeArg.of((String)"queueSize", (Object)newSize), (Arg)SafeArg.of((String)"channelName", (Object)this.channelName));
        }
        this.schedule();
        return Optional.of(components.response());
    }

    private void onCompletion() {
        this.inFlight.decrementAndGet();
        this.schedule();
    }

    @VisibleForTesting
    void schedule() {
        int numScheduled = 0;
        while (this.scheduleNextTask()) {
            ++numScheduled;
        }
        if (log.isDebugEnabled()) {
            log.debug("Scheduled {} requests on channel {}", (Arg)SafeArg.of((String)"numScheduled", (Object)numScheduled), (Arg)SafeArg.of((String)"channelName", (Object)this.channelName));
        }
    }

    private int incrementQueueSize() {
        this.queueSizeCounter.get().inc();
        return this.queueSizeEstimate.incrementAndGet();
    }

    private void decrementQueueSize() {
        this.queueSizeEstimate.decrementAndGet();
        this.queueSizeCounter.get().dec();
    }

    private boolean scheduleNextTask() {
        DeferredCall queueHead = this.queuedCalls.poll();
        if (queueHead == null) {
            return false;
        }
        if (queueHead.response().isDone()) {
            this.decrementQueueSize();
            queueHead.span().complete((TagTranslator)QueuedChannelTagTranslator.INSTANCE, (Object)this);
            queueHead.timer().stop();
            return true;
        }
        return this.scheduleTaskFromQueue(queueHead);
    }

    private boolean scheduleTaskFromQueue(DeferredCall queueHead) {
        SettableFuture<Response> queuedResponse = queueHead.response();
        try (CloseableSpan ignored = queueHead.span().attach();){
            Endpoint endpoint = queueHead.endpoint();
            LimitedChannel.LimitEnforcement limitEnforcement = this.limitEnforcement();
            Optional<ListenableFuture<Response>> maybeResponse = this.delegate.maybeExecute(endpoint, queueHead.request(), limitEnforcement);
            if (maybeResponse.isPresent()) {
                this.inFlight.incrementAndGet();
                this.decrementQueueSize();
                ListenableFuture<Response> response = maybeResponse.get();
                queueHead.span().complete((TagTranslator)QueuedChannelTagTranslator.INSTANCE, (Object)this);
                queueHead.timer().stop();
                DialogueFutures.addDirectCallback(response, (FutureCallback)new ForwardAndSchedule(queuedResponse));
                DialogueFutures.addDirectListener(queuedResponse, () -> {
                    if (queuedResponse.isCancelled() && !response.cancel(true) && log.isDebugEnabled()) {
                        log.debug("Failed to cancel delegate response, it should be reported by ForwardAndSchedule logging", (Arg)SafeArg.of((String)"channel", (Object)this.channelName), (Arg)SafeArg.of((String)"service", (Object)endpoint.serviceName()), (Arg)SafeArg.of((String)"endpoint", (Object)endpoint.endpointName()));
                    }
                });
                boolean bl = true;
                return bl;
            }
            if (!limitEnforcement.enforceLimits()) {
                this.decrementQueueSize();
                queueHead.span().complete((TagTranslator)QueuedChannelTagTranslator.INSTANCE, (Object)this);
                queueHead.timer().stop();
                queuedResponse.setException((Throwable)this.limitEnforcementExpectationFailure(queueHead.endpoint()));
                log.warn("Failed to make a request bypassing concurrency limits, which should not be possible", (Arg)SafeArg.of((String)"channel", (Object)this.channelName), (Arg)SafeArg.of((String)"service", (Object)endpoint.serviceName()), (Arg)SafeArg.of((String)"endpoint", (Object)endpoint.endpointName()));
                boolean bl = true;
                return bl;
            }
            if (!this.queuedCalls.offerFirst(queueHead)) {
                log.error("Failed to add an attempted call back to the deque", (Arg)SafeArg.of((String)"channel", (Object)this.channelName), (Arg)SafeArg.of((String)"service", (Object)endpoint.serviceName()), (Arg)SafeArg.of((String)"endpoint", (Object)endpoint.endpointName()));
                this.decrementQueueSize();
                queueHead.timer().stop();
                if (!queuedResponse.setException((Throwable)new SafeRuntimeException("Failed to req-queue request", new Arg[]{SafeArg.of((String)"channel", (Object)this.channelName), SafeArg.of((String)"service", (Object)endpoint.serviceName()), SafeArg.of((String)"endpoint", (Object)endpoint.endpointName())})) && log.isDebugEnabled()) {
                    log.debug("Queued response has already been completed", (Arg)SafeArg.of((String)"channel", (Object)this.channelName), (Arg)SafeArg.of((String)"service", (Object)endpoint.serviceName()), (Arg)SafeArg.of((String)"endpoint", (Object)endpoint.endpointName()));
                }
            }
            boolean bl = false;
            return bl;
        }
    }

    private LimitedChannel.LimitEnforcement limitEnforcement() {
        return this.inFlight.get() <= 0 ? LimitedChannel.LimitEnforcement.DANGEROUS_BYPASS_LIMITS : LimitedChannel.LimitEnforcement.DEFAULT_ENABLED;
    }

    private SafeIllegalStateException limitEnforcementExpectationFailure(Endpoint endpoint) {
        return new SafeIllegalStateException("A request which explicitly bypassed rate limits failed to execute, which violates the requirements of the QueuedChannel. Please report this to the Dialogue maintainers!", new Arg[]{SafeArg.of((String)"channel", (Object)this.channelName), SafeArg.of((String)"service", (Object)endpoint.serviceName()), SafeArg.of((String)"endpoint", (Object)endpoint.endpointName())});
    }

    public String toString() {
        return "QueuedChannel{queueSizeEstimate=" + String.valueOf(this.queueSizeEstimate) + ", maxQueueSize=" + this.maxQueueSize + ", delegate=" + String.valueOf(this.delegate) + "}";
    }

    static QueuedChannelInstrumentation channelInstrumentation(final DialogueClientMetrics metrics, final String channelName) {
        return new QueuedChannelInstrumentation(){

            @Override
            public Counter requestsQueued() {
                return metrics.requestsQueued(channelName);
            }

            @Override
            public Timer requestQueuedTime() {
                return metrics.requestQueuedTime(channelName);
            }
        };
    }

    static QueuedChannelInstrumentation stickyInstrumentation(final DialogueClientMetrics metrics, final String channelName) {
        return new MemoizedQueuedChannelInstrumentation(new QueuedChannelInstrumentation(){

            @Override
            public Counter requestsQueued() {
                return metrics.requestsStickyQueued(channelName);
            }

            @Override
            public Timer requestQueuedTime() {
                return metrics.requestStickyQueuedTime(channelName);
            }
        });
    }

    static QueuedChannelInstrumentation endpointInstrumentation(final DialogueClientMetrics metrics, final String channelName, final String service, final String endpoint) {
        return new QueuedChannelInstrumentation(){

            @Override
            public Counter requestsQueued() {
                return metrics.requestsEndpointQueued().channelName(channelName).serviceName(service).endpoint(endpoint).build();
            }

            @Override
            public Timer requestQueuedTime() {
                return metrics.requestEndpointQueuedTime().channelName(channelName).serviceName(service).endpoint(endpoint).build();
            }
        };
    }

    private static final class ProtectedConcurrentLinkedDeque<T>
    extends ConcurrentLinkedDeque<T> {
        private ProtectedConcurrentLinkedDeque() {
        }

        @Override
        public int size() {
            throw new UnsupportedOperationException("size should never be called on a ConcurrentLinkedDeque");
        }
    }

    static interface QueuedChannelInstrumentation {
        public Counter requestsQueued();

        public Timer requestQueuedTime();
    }

    @Value.Immutable
    static interface DeferredCall {
        public Endpoint endpoint();

        public Request request();

        public SettableFuture<Response> response();

        public DetachedSpan span();

        public Timer.Context timer();

        public static Builder builder() {
            return new Builder();
        }

        public static class Builder
        extends ImmutableDeferredCall.Builder {
        }
    }

    private static enum QueuedChannelTagTranslator implements TagTranslator<QueuedChannel>
    {
        INSTANCE;


        public <T> void translate(TagTranslator.TagAdapter<T> adapter, T target, QueuedChannel data) {
            adapter.tag(target, "queue", data.queueType);
            adapter.tag(target, "channel", data.channelName);
        }
    }

    private class ForwardAndSchedule
    implements FutureCallback<Response> {
        private final SettableFuture<Response> response;

        ForwardAndSchedule(SettableFuture<Response> response) {
            this.response = response;
        }

        public void onSuccess(@Nullable Response result) {
            QueuedChannel.this.inFlight.decrementAndGet();
            if (result != null && !this.response.set((Object)result)) {
                result.close();
            }
            QueuedChannel.this.schedule();
        }

        public void onFailure(Throwable throwable) {
            QueuedChannel.this.inFlight.decrementAndGet();
            if (!this.response.setException(throwable)) {
                if (throwable instanceof CancellationException) {
                    log.debug("Call was canceled", throwable);
                } else {
                    log.info("Call failed after the future completed", throwable);
                }
            }
            QueuedChannel.this.schedule();
        }
    }

    private static final class MemoizedQueuedChannelInstrumentation
    implements QueuedChannelInstrumentation {
        private final Supplier<Counter> requestsQueuedSupplier = Suppliers.memoize(delegate::requestsQueued);
        private final Supplier<Timer> requestQueuedTimeSupplier = Suppliers.memoize(delegate::requestQueuedTime);

        MemoizedQueuedChannelInstrumentation(QueuedChannelInstrumentation delegate) {
        }

        @Override
        public Counter requestsQueued() {
            return this.requestsQueuedSupplier.get();
        }

        @Override
        public Timer requestQueuedTime() {
            return this.requestQueuedTimeSupplier.get();
        }
    }
}

