/*
 * Decompiled with CFR 0.152.
 */
package ai.vespa.feed.client.impl;

import ai.vespa.feed.client.FeedClientBuilder;
import ai.vespa.feed.client.HttpResponse;
import ai.vespa.feed.client.impl.Cluster;
import ai.vespa.feed.client.impl.FeedClientBuilderImpl;
import ai.vespa.feed.client.impl.HttpRequest;
import ai.vespa.feed.client.impl.RetryableException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
import org.eclipse.jetty.client.Authentication;
import org.eclipse.jetty.client.BufferingResponseListener;
import org.eclipse.jetty.client.BytesRequestContent;
import org.eclipse.jetty.client.Destination;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.MultiplexConnectionPool;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.ProxyConfiguration;
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.client.Response;
import org.eclipse.jetty.client.Result;
import org.eclipse.jetty.client.RetryableRequestException;
import org.eclipse.jetty.client.transport.HttpClientConnectionFactory;
import org.eclipse.jetty.client.transport.HttpClientTransportDynamic;
import org.eclipse.jetty.http.HttpCookieStore;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.transport.ClientConnectionFactoryOverHTTP2;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.util.ConcurrentPool;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;

class JettyCluster
implements Cluster {
    private static final Logger log = Logger.getLogger(JettyCluster.class.getName());
    private static final Duration IDLE_TIMEOUT = Duration.ofMinutes(15L);
    private final HttpClient client;
    private final List<Endpoint> endpoints;
    private final FeedClientBuilder.Compression compression;

    JettyCluster(FeedClientBuilderImpl b) throws IOException {
        this.client = JettyCluster.createHttpClient(b);
        this.endpoints = b.endpoints.stream().map(Endpoint::new).collect(Collectors.toList());
        this.compression = b.compression;
    }

    @Override
    public void dispatch(final HttpRequest req, final CompletableFuture<HttpResponse> vessel) {
        this.client.getExecutor().execute(() -> {
            final Endpoint endpoint = JettyCluster.findLeastBusyEndpoint(this.endpoints);
            try {
                endpoint.inflight.incrementAndGet();
                long reqTimeoutMillis = req.timeLeft().toMillis();
                if (reqTimeoutMillis <= 0L) {
                    log.log(Level.FINE, () -> String.format("Request %s (%s) timed out after '%s' ms", req, System.identityHashCode(vessel), req.timeout()));
                    vessel.completeExceptionally(new TimeoutException("operation timed out after '" + String.valueOf(req.timeout()) + "'"));
                    return;
                }
                Request jettyReq = this.client.newRequest(URI.create(endpoint.uri + req.pathAndQuery())).version(HttpVersion.HTTP_2).method(HttpMethod.fromString((String)req.method())).headers(hs -> req.headers().forEach((k, v) -> hs.add(k, (String)v.get()))).idleTimeout(reqTimeoutMillis, TimeUnit.MILLISECONDS).timeout(reqTimeoutMillis, TimeUnit.MILLISECONDS);
                if (req.body() != null) {
                    byte[] bytes;
                    boolean shouldCompress;
                    boolean bl = shouldCompress = this.compression == FeedClientBuilder.Compression.gzip || this.compression == FeedClientBuilder.Compression.auto && req.body().length > 512;
                    if (shouldCompress) {
                        ByteArrayOutputStream buffer = new ByteArrayOutputStream(1024);
                        try (GZIPOutputStream zip = new GZIPOutputStream(buffer);){
                            zip.write(req.body());
                        }
                        catch (IOException e) {
                            throw new UncheckedIOException(e);
                        }
                        bytes = buffer.toByteArray();
                        jettyReq.headers(hs -> hs.add(HttpHeader.CONTENT_ENCODING, "gzip"));
                    } else {
                        bytes = req.body();
                    }
                    jettyReq.body((Request.Content)new BytesRequestContent(MimeTypes.Type.APPLICATION_JSON.asString(), (byte[][])new byte[][]{bytes}));
                }
                log.log(Level.FINE, () -> String.format("Dispatching request %s (%s) with timeout %d ms", req, System.identityHashCode(vessel), reqTimeoutMillis));
                jettyReq.send((Response.CompleteListener)new BufferingResponseListener(){

                    public void onComplete(Result result) {
                        log.log(Level.FINER, () -> String.format("Completed request %s (%s): %s", req, System.identityHashCode(vessel), result.isFailed() ? result.getFailure().toString() : Integer.valueOf(result.getResponse().getStatus())));
                        endpoint.inflight.decrementAndGet();
                        if (result.isFailed()) {
                            if (result.getFailure() instanceof RetryableRequestException) {
                                vessel.completeExceptionally(new RetryableException(result.getFailure()));
                            } else {
                                vessel.completeExceptionally(result.getFailure());
                            }
                        } else {
                            vessel.complete(new JettyResponse(result.getResponse(), this.getContent()));
                        }
                    }
                });
            }
            catch (Throwable t) {
                log.log(t instanceof Exception ? Level.FINE : Level.WARNING, "Failed to dispatch request: " + String.valueOf(req), t.getMessage());
                endpoint.inflight.decrementAndGet();
                vessel.completeExceptionally(t);
            }
        });
    }

    @Override
    public void close() {
        try {
            this.client.stop();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static HttpClient createHttpClient(FeedClientBuilderImpl b) throws IOException {
        SslContextFactory.Client clientSslCtxFactory = new SslContextFactory.Client();
        clientSslCtxFactory.setSslContext(b.constructSslContext());
        if (b.hostnameVerifier != null) {
            clientSslCtxFactory.setHostnameVerifier(b.hostnameVerifier);
            clientSslCtxFactory.setEndpointIdentificationAlgorithm(null);
        }
        ClientConnector connector = new ClientConnector();
        int threads = Math.max(Math.min(Runtime.getRuntime().availableProcessors(), 32), 8);
        connector.setExecutor((Executor)new QueuedThreadPool(threads));
        connector.setSslContextFactory(clientSslCtxFactory);
        connector.setIdleTimeout(IDLE_TIMEOUT);
        boolean secureProxy = b.proxy != null && b.proxy.getScheme().equals("https");
        connector.setConnectTimeout(Duration.ofSeconds(secureProxy ? 120L : 30L));
        HTTP2Client h2Client = new HTTP2Client(connector);
        h2Client.setMaxConcurrentPushedStreams(b.maxStreamsPerConnection);
        int initialWindow = Integer.MAX_VALUE;
        h2Client.setInitialSessionRecvWindow(initialWindow);
        h2Client.setInitialStreamRecvWindow(initialWindow);
        ClientConnectionFactory.Info h1 = HttpClientConnectionFactory.HTTP11;
        ClientConnectionFactoryOverHTTP2.HTTP2 http2 = new ClientConnectionFactoryOverHTTP2.HTTP2(h2Client);
        HttpClientTransportDynamic transport = new HttpClientTransportDynamic(connector, new ClientConnectionFactory.Info[]{http2, h1});
        int connectionsPerEndpoint = b.connectionsPerEndpoint;
        transport.setConnectionPoolFactory(dest -> new MaxMultiplexConnectionPool(dest, connectionsPerEndpoint, secureProxy, b.connectionTtl));
        HttpClient httpClient = new HttpClient((HttpClientTransport)transport);
        httpClient.setMaxRequestsQueuedPerDestination(Integer.MAX_VALUE);
        httpClient.setFollowRedirects(false);
        httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT, String.format("vespa-feed-client/%s (Jetty:%s)", "8.582.13", Jetty.VERSION)));
        httpClient.setSocketAddressResolver((SocketAddressResolver)new Ipv4PreferringResolver(httpClient, Duration.ofSeconds(10L)));
        httpClient.setHttpCookieStore((HttpCookieStore)new HttpCookieStore.Empty());
        if (b.proxy != null) {
            JettyCluster.addProxyConfiguration(b, httpClient);
        }
        try {
            httpClient.start();
        }
        catch (Exception e) {
            throw new IOException(e);
        }
        httpClient.getProtocolHandlers().remove("www-authenticate");
        return httpClient;
    }

    private static void addProxyConfiguration(FeedClientBuilderImpl b, HttpClient httpClient) throws IOException {
        Origin.Address address = new Origin.Address(b.proxy.getHost(), b.proxy.getPort());
        final TreeMap<String, Supplier<String>> proxyHeadersCopy = new TreeMap<String, Supplier<String>>(b.proxyRequestHeaders);
        if (b.proxy.getScheme().equals("https")) {
            SslContextFactory.Client proxySslCtxFactory = new SslContextFactory.Client();
            if (b.proxyHostnameVerifier != null) {
                proxySslCtxFactory.setHostnameVerifier(b.proxyHostnameVerifier);
                proxySslCtxFactory.setEndpointIdentificationAlgorithm(null);
            }
            proxySslCtxFactory.setSslContext(b.constructProxySslContext());
            try {
                proxySslCtxFactory.start();
            }
            catch (Exception e) {
                throw new IOException(e);
            }
            httpClient.getProxyConfiguration().addProxy((ProxyConfiguration.Proxy)new HttpProxy(address, proxySslCtxFactory, new Origin.Protocol(List.of("h2"), false)));
            final URI proxyUri = URI.create(JettyCluster.endpointUri(b.proxy));
            httpClient.getAuthenticationStore().addAuthenticationResult(new Authentication.Result(){

                public URI getURI() {
                    return proxyUri;
                }

                public void apply(Request r) {
                    r.headers(hs -> proxyHeadersCopy.forEach((k, v) -> hs.add(k, (String)v.get())));
                }
            });
        } else {
            httpClient.getProxyConfiguration().addProxy((ProxyConfiguration.Proxy)new HttpProxy(address, false, new Origin.Protocol(List.of("http/1.1"), false)));
            httpClient.getRequestListeners().addHeadersListener((Request.HeadersListener)new Request.Listener(){

                public void onHeaders(Request r) {
                    if (HttpMethod.CONNECT.is(r.getMethod())) {
                        r.headers(hs -> proxyHeadersCopy.forEach((k, v) -> hs.add(k, (String)v.get())));
                    }
                }
            });
        }
    }

    private static Endpoint findLeastBusyEndpoint(List<Endpoint> endpoints) {
        Endpoint leastBusy = endpoints.get(0);
        int minInflight = leastBusy.inflight.get();
        for (int i = 1; i < endpoints.size(); ++i) {
            Endpoint endpoint = endpoints.get(i);
            int inflight = endpoint.inflight.get();
            if (inflight >= minInflight) continue;
            leastBusy = endpoint;
            minInflight = inflight;
        }
        return leastBusy;
    }

    private static int portOf(URI u) {
        return u.getPort() == -1 ? (u.getScheme().equals("http") ? 80 : 443) : u.getPort();
    }

    private static String endpointUri(URI uri) {
        return String.format("%s://%s:%s", uri.getScheme(), uri.getHost(), JettyCluster.portOf(uri));
    }

    private static class Ipv4PreferringResolver
    extends AbstractLifeCycle
    implements SocketAddressResolver {
        final HttpClient client;
        final Duration timeout;
        SocketAddressResolver.Async instance;

        Ipv4PreferringResolver(HttpClient client, Duration timeout) {
            this.client = client;
            this.timeout = timeout;
        }

        protected void doStart() {
            this.instance = new SocketAddressResolver.Async(this.client.getExecutor(), this.client.getScheduler(), this.timeout.toMillis());
        }

        public void resolve(String host, int port, Promise<List<InetSocketAddress>> promise) {
            this.instance.resolve(host, port, (Promise)new Promise.Wrapper<List<InetSocketAddress>>(promise){

                public void succeeded(List<InetSocketAddress> result) {
                    if (result.size() <= 1) {
                        this.getPromise().succeeded(result);
                        return;
                    }
                    List ipv4Addresses = result.stream().filter(addr -> addr.getAddress() instanceof Inet4Address).collect(Collectors.toList());
                    if (ipv4Addresses.isEmpty()) {
                        this.getPromise().succeeded(result);
                        return;
                    }
                    this.getPromise().succeeded(ipv4Addresses);
                }
            });
        }
    }

    private static class Endpoint {
        final AtomicInteger inflight = new AtomicInteger();
        final String uri;

        Endpoint(URI uri) {
            this.uri = JettyCluster.endpointUri(uri);
        }
    }

    private static class MaxMultiplexConnectionPool
    extends MultiplexConnectionPool {
        static final int MAX_MULTIPLEX = 512;
        final int maxConnections;

        MaxMultiplexConnectionPool(Destination dest, int maxConnections, boolean secureProxy, Duration ttl) {
            super(dest, () -> new ConcurrentPool(ConcurrentPool.StrategyType.RANDOM, maxConnections, MaxMultiplexConnectionPool.newMaxMultiplexer((int)512)), 512);
            this.maxConnections = maxConnections;
            if (secureProxy) {
                this.setMaxDuration(Duration.ofMinutes(1L).toMillis());
            } else {
                this.setMaximizeConnections(true);
                this.setMaxDuration(ttl.toMillis());
            }
        }

        protected void doStart() throws Exception {
            super.doStart();
            this.preCreateConnections(this.maxConnections);
        }
    }

    private static class JettyResponse
    implements HttpResponse {
        final Response response;
        final byte[] content;

        JettyResponse(Response response, byte[] content) {
            this.response = response;
            this.content = content;
        }

        public int code() {
            return this.response.getStatus();
        }

        public byte[] body() {
            return this.content;
        }

        public String contentType() {
            return this.response.getHeaders().get(HttpHeader.CONTENT_TYPE);
        }
    }
}

