/*
 * Decompiled with CFR 0.152.
 */
package ai.vespa.feed.client.impl;

import ai.vespa.feed.client.DocumentId;
import ai.vespa.feed.client.FeedClient;
import ai.vespa.feed.client.FeedException;
import ai.vespa.feed.client.HttpResponse;
import ai.vespa.feed.client.OperationStats;
import ai.vespa.feed.client.impl.BenchmarkingCluster;
import ai.vespa.feed.client.impl.Cluster;
import ai.vespa.feed.client.impl.DynamicThrottler;
import ai.vespa.feed.client.impl.FeedClientBuilderImpl;
import ai.vespa.feed.client.impl.HttpFeedClient;
import ai.vespa.feed.client.impl.HttpRequest;
import ai.vespa.feed.client.impl.RequestStrategy;
import ai.vespa.feed.client.impl.Throttler;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;

class HttpRequestStrategy
implements RequestStrategy {
    private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName());
    private final Cluster cluster;
    private final Map<DocumentId, RetriableFuture<HttpResponse>> inflightById = new ConcurrentHashMap<DocumentId, RetriableFuture<HttpResponse>>();
    private final FeedClient.RetryStrategy strategy;
    private final FeedClient.CircuitBreaker breaker;
    private final Throttler throttler;
    private final Queue<Runnable> queue = new ConcurrentLinkedQueue<Runnable>();
    private final AtomicLong inflight = new AtomicLong(0L);
    private final AtomicBoolean destroyed = new AtomicBoolean(false);
    private final AtomicLong delayedCount = new AtomicLong(0L);
    private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(runnable -> {
        Thread thread = new Thread(runnable, "feed-client-result-executor");
        thread.setDaemon(true);
        return thread;
    });
    private final ResettableCluster resettableCluster;
    private final AtomicBoolean reset = new AtomicBoolean(false);

    HttpRequestStrategy(FeedClientBuilderImpl builder, HttpFeedClient.ClusterFactory clusterFactory) throws IOException {
        this.throttler = new DynamicThrottler(builder);
        this.resettableCluster = new ResettableCluster(clusterFactory);
        this.cluster = builder.benchmark ? new BenchmarkingCluster(this.resettableCluster, this.throttler, System::nanoTime) : this.resettableCluster;
        this.strategy = builder.retryStrategy;
        this.breaker = builder.circuitBreaker;
        Thread dispatcher = new Thread(this::dispatch, "feed-client-dispatcher");
        dispatcher.setDaemon(true);
        dispatcher.start();
    }

    @Override
    public OperationStats stats() {
        return this.cluster.stats();
    }

    @Override
    public void resetStats() {
        this.cluster.resetStats();
    }

    @Override
    public FeedClient.CircuitBreaker.State circuitBreakerState() {
        return this.breaker.state();
    }

    private void dispatch() {
        try {
            while (this.breaker.state() != FeedClient.CircuitBreaker.State.OPEN && !this.destroyed.get()) {
                while (!this.isInExcess() && this.poll() && this.breaker.state() == FeedClient.CircuitBreaker.State.CLOSED) {
                }
                if (this.breaker.state() == FeedClient.CircuitBreaker.State.HALF_OPEN && this.reset.compareAndSet(false, true)) {
                    this.resettableCluster.reset();
                } else if (this.breaker.state() == FeedClient.CircuitBreaker.State.CLOSED) {
                    this.reset.set(false);
                }
                Thread.sleep(this.breaker.state() == FeedClient.CircuitBreaker.State.HALF_OPEN ? 100L : 1L);
            }
        }
        catch (Throwable t) {
            log.log(Level.SEVERE, "Dispatch thread threw; shutting down", t);
        }
        this.destroy();
    }

    private void offer(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
        this.delayedCount.incrementAndGet();
        this.queue.offer(() -> this.cluster.dispatch(request, vessel));
    }

    private boolean poll() {
        Runnable task = this.queue.poll();
        if (task == null) {
            return false;
        }
        this.delayedCount.decrementAndGet();
        task.run();
        return true;
    }

    private boolean isInExcess() {
        return this.inflight.get() - this.delayedCount.get() > this.throttler.targetInflight();
    }

    private boolean retry(HttpRequest request, int attempt) {
        if (attempt > this.strategy.retries() || request.timeLeft().toMillis() <= 0L) {
            return false;
        }
        switch (request.method().toUpperCase()) {
            case "POST": {
                return this.strategy.retry(FeedClient.OperationType.PUT);
            }
            case "PUT": {
                return this.strategy.retry(FeedClient.OperationType.UPDATE);
            }
            case "DELETE": {
                return this.strategy.retry(FeedClient.OperationType.REMOVE);
            }
        }
        throw new IllegalStateException("Unexpected HTTP method: " + request.method());
    }

    private boolean retry(HttpRequest request, Throwable thrown, int attempt) {
        this.breaker.failure(thrown);
        if (thrown instanceof IOException || thrown instanceof IllegalStateException && "session closed".equals(thrown.getMessage())) {
            log.log(Level.FINER, thrown, () -> "Failed attempt " + attempt + " at " + String.valueOf(request));
            return this.retry(request, attempt);
        }
        log.log(Level.FINE, thrown, () -> "Failed attempt " + attempt + " at " + String.valueOf(request));
        return false;
    }

    static boolean isSuccess(int statusCode) {
        return statusCode / 100 == 2 || statusCode == 404 || statusCode == 412;
    }

    private boolean retry(HttpRequest request, HttpResponse response, int attempt) {
        if (HttpRequestStrategy.isSuccess(response.code())) {
            HttpRequestStrategy.logResponse(Level.FINEST, response, request, attempt);
            this.breaker.success();
            this.throttler.success();
            return false;
        }
        if (response.code() == 429) {
            HttpRequestStrategy.logResponse(Level.FINER, response, request, attempt);
            this.throttler.throttled(this.inflight.get() - this.delayedCount.get());
            return true;
        }
        HttpRequestStrategy.logResponse(Level.FINE, response, request, attempt);
        if (response.code() == 503) {
            this.breaker.failure(response);
            return this.retry(request, attempt);
        }
        if (response.code() >= 500) {
            this.breaker.failure(response);
        }
        return false;
    }

    static void logResponse(Level level, HttpResponse response, HttpRequest request, int attempt) {
        if (log.isLoggable(level)) {
            log.log(level, "Status code " + response.code() + " (" + (response.body() == null ? "no body" : new String(response.body(), StandardCharsets.UTF_8)) + ") on attempt " + attempt + " at " + String.valueOf(request));
        }
    }

    private void acquireSlot() {
        try {
            while (this.inflight.get() >= this.throttler.targetInflight()) {
                Thread.sleep(1L);
            }
            this.inflight.incrementAndGet();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void releaseSlot() {
        this.inflight.decrementAndGet();
    }

    @Override
    public void await() {
        try {
            while (this.inflight.get() > 0L) {
                Thread.sleep(10L);
            }
            this.resettableCluster.sync();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) {
        RetriableFuture<HttpResponse> result = new RetriableFuture<HttpResponse>();
        if (this.destroyed.get()) {
            result.complete();
            return result;
        }
        CompletableFuture<HttpResponse> vessel = new CompletableFuture<HttpResponse>();
        RetriableFuture<HttpResponse> previous = this.inflightById.put(documentId, result);
        if (previous == null) {
            this.acquireSlot();
            this.offer(request, vessel);
            this.throttler.sent(this.inflight.get(), result);
        } else {
            result.dependOn(previous);
            previous.whenComplete((__, ___) -> this.offer(request, vessel));
        }
        this.handleAttempt(vessel, request, result, 1);
        return result.handle((response, error) -> {
            if (this.inflightById.compute(documentId, (__, current) -> current == result ? null : current) == null) {
                this.releaseSlot();
            }
            if (error != null) {
                if (error instanceof FeedException) {
                    throw (FeedException)error;
                }
                throw new FeedException(documentId, error);
            }
            return response;
        });
    }

    private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, RetriableFuture<HttpResponse> result, int attempt) {
        vessel.whenCompleteAsync((response, thrown) -> {
            result.set((HttpResponse)response, (Throwable)thrown);
            if (thrown != null ? this.retry(request, (Throwable)thrown, attempt) : this.retry(request, (HttpResponse)response, attempt)) {
                CompletableFuture<HttpResponse> retry = new CompletableFuture<HttpResponse>();
                this.offer(request, retry);
                this.handleAttempt(retry, request, result, attempt + (this.breaker.state() == FeedClient.CircuitBreaker.State.HALF_OPEN ? 0 : 1));
            } else {
                result.complete();
            }
        }, (Executor)this.resultExecutor);
    }

    @Override
    public void destroy() {
        if (this.destroyed.compareAndSet(false, true)) {
            this.inflightById.values().forEach(RetriableFuture::complete);
            this.cluster.close();
            this.resultExecutor.shutdown();
            try {
                if (!this.resultExecutor.awaitTermination(1L, TimeUnit.MINUTES)) {
                    log.log(Level.WARNING, "Failed processing results within 1 minute");
                }
            }
            catch (InterruptedException e) {
                log.log(Level.WARNING, "Interrupted waiting for results to be processed");
                Thread.currentThread().interrupt();
            }
        }
    }

    private static class ResettableCluster
    implements Cluster {
        private final Object monitor = new Object();
        private final HttpFeedClient.ClusterFactory clusterFactory;
        private final ExecutorService executor = Executors.newSingleThreadExecutor();
        private AtomicLong inflight = new AtomicLong(0L);
        private Cluster delegate;

        ResettableCluster(HttpFeedClient.ClusterFactory clusterFactory) throws IOException {
            this.clusterFactory = clusterFactory;
            this.delegate = clusterFactory.create();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
            Object object = this.monitor;
            synchronized (object) {
                AtomicLong usedCounter = this.inflight;
                usedCounter.incrementAndGet();
                Cluster usedCluster = this.delegate;
                usedCluster.dispatch(request, vessel);
                vessel.whenCompleteAsync((__, ___) -> {
                    Object object = this.monitor;
                    synchronized (object) {
                        if (usedCounter.decrementAndGet() == 0L && usedCluster != this.delegate) {
                            log.log(Level.INFO, "Closing old HTTP client");
                            usedCluster.close();
                        }
                    }
                }, (Executor)this.executor);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            Object object = this.monitor;
            synchronized (object) {
                this.delegate.close();
                this.executor.shutdown();
                try {
                    if (!this.executor.awaitTermination(1L, TimeUnit.MINUTES)) {
                        log.log(Level.WARNING, "Failed shutting down HTTP client within 1 minute");
                    }
                }
                catch (InterruptedException e) {
                    log.log(Level.WARNING, "Interrupted waiting for HTTP client to shut down");
                    Thread.currentThread().interrupt();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void sync() throws InterruptedException {
            Future<?> sync;
            Object object = this.monitor;
            synchronized (object) {
                if (this.executor.isShutdown()) {
                    return;
                }
                sync = this.executor.submit(() -> {});
            }
            try {
                sync.get();
            }
            catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public OperationStats stats() {
            return this.delegate.stats();
        }

        @Override
        public void resetStats() {
            this.delegate.resetStats();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void reset() throws IOException {
            Object object = this.monitor;
            synchronized (object) {
                log.log(Level.INFO, "Replacing underlying HTTP client to attempt recovery");
                this.delegate = this.clusterFactory.create();
                this.inflight = new AtomicLong(0L);
            }
        }
    }

    private static class RetriableFuture<T>
    extends CompletableFuture<T> {
        private final AtomicReference<Runnable> completion = new AtomicReference();
        private final AtomicReference<RetriableFuture<T>> dependency = new AtomicReference();

        private RetriableFuture() {
            this.completion.set(() -> this.completeExceptionally((Throwable)new FeedException("Operation aborted")));
        }

        void complete() {
            this.completion.get().run();
            RetriableFuture toComplete = this.dependency.getAndSet(null);
            if (toComplete != null) {
                toComplete.complete();
            }
        }

        void dependOn(RetriableFuture<T> dependency) {
            this.dependency.set(dependency);
            if (this.isDone()) {
                dependency.complete();
            }
        }

        void set(T result, Throwable thrown) {
            this.completion.set(thrown != null ? () -> this.completeExceptionally(thrown) : () -> this.complete(result));
        }
    }
}

