package io.confluent.ksql.services;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.services.ConnectClient;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.QueryMask;
import io.vertx.core.http.HttpHeaders;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.hc.client5.http.fluent.Request;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.io.HttpClientResponseHandler;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.PluginInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/services/DefaultConnectClient.class */
public class DefaultConnectClient implements ConnectClient {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultConnectClient.class);
    private static final ObjectMapper MAPPER = ConnectJsonMapper.INSTANCE.get();
    private static final String CONNECTOR_PLUGINS = "/connector-plugins";
    private static final String CONNECTORS = "/connectors";
    private static final String STATUS = "/status";
    private static final String TOPICS = "/topics";
    private static final String VALIDATE_CONNECTOR = "/connector-plugins/%s/config/validate";
    private static final int MAX_ATTEMPTS = 3;
    private final URI connectUri;
    private final Header[] requestHeaders;
    private final CloseableHttpClient httpClient;
    private final long requestTimeoutMs;

    public DefaultConnectClient(String str, Optional<String> optional, Map<String, String> map, Optional<SSLContext> optional2, boolean z, long j) {
        Objects.requireNonNull(str, "connectUri");
        Objects.requireNonNull(optional, "authHeader");
        Objects.requireNonNull(map, "additionalRequestHeaders");
        Objects.requireNonNull(optional2, "sslContext");
        try {
            this.connectUri = new URI(str);
            this.requestHeaders = buildHeaders(optional, map);
            this.httpClient = buildHttpClient(optional2, z);
            this.requestTimeoutMs = j;
        } catch (URISyntaxException e) {
            throw new KsqlException("Could not initialize connect client due to invalid URI: " + str, e);
        }
    }

    public ConnectClient.ConnectResponse<ConnectorInfo> create(String str, Map<String, String> map) {
        try {
            LOG.debug("Issuing create request to Kafka Connect at URI {} with name {} and config {}", new Object[]{this.connectUri, str, QueryMask.getMaskedConnectConfig(map)});
            ConnectClient.ConnectResponse<ConnectorInfo> withRetries = withRetries(() -> {
                return (ConnectClient.ConnectResponse) Request.post(resolveUri(CONNECTORS)).setHeaders(this.requestHeaders).responseTimeout(Timeout.ofMilliseconds(this.requestTimeoutMs)).connectTimeout(Timeout.ofMilliseconds(this.requestTimeoutMs)).bodyString(MAPPER.writeValueAsString(ImmutableMap.of("name", str, "config", map)), ContentType.APPLICATION_JSON).execute(this.httpClient).handleResponse(createHandler(201, new TypeReference<ConnectorInfo>() { // from class: io.confluent.ksql.services.DefaultConnectClient.1
                }, Function.identity()));
            });
            withRetries.error().ifPresent(str2 -> {
                LOG.warn("Did not CREATE connector {}: {}", str, str2);
            });
            return withRetries;
        } catch (Exception e) {
            throw new KsqlServerException(e);
        }
    }

    public ConnectClient.ConnectResponse<ConfigInfos> validate(String str, Map<String, String> map) {
        try {
            Map maskedConnectConfig = QueryMask.getMaskedConnectConfig(map);
            LOG.debug("Issuing validate request to Kafka Connect at URI {} for plugin {} and config {}", new Object[]{this.connectUri, str, maskedConnectConfig});
            ConnectClient.ConnectResponse<ConfigInfos> withRetries = withRetries(() -> {
                return (ConnectClient.ConnectResponse) Request.put(resolveUri(String.format(VALIDATE_CONNECTOR, str))).setHeaders(this.requestHeaders).responseTimeout(Timeout.ofMilliseconds(this.requestTimeoutMs)).connectTimeout(Timeout.ofMilliseconds(this.requestTimeoutMs)).bodyString(MAPPER.writeValueAsString(map), ContentType.APPLICATION_JSON).execute(this.httpClient).handleResponse(createHandler(200, new TypeReference<ConfigInfos>() { // from class: io.confluent.ksql.services.DefaultConnectClient.2
                }, Function.identity()));
            });
            withRetries.error().ifPresent(str2 -> {
                LOG.warn("Did not VALIDATE connector configuration for plugin {} and config {}: {}", new Object[]{str, maskedConnectConfig, str2});
            });
            return withRetries;
        } catch (Exception e) {
            throw new KsqlServerException(e);
        }
    }

    public ConnectClient.ConnectResponse<List<String>> connectors() {
        try {
            LOG.debug("Issuing request to Kafka Connect at URI {} to list connectors", this.connectUri);
            ConnectClient.ConnectResponse<List<String>> withRetries = withRetries(() -> {
                return (ConnectClient.ConnectResponse) Request.get(resolveUri(CONNECTORS)).setHeaders(this.requestHeaders).responseTimeout(Timeout.ofMilliseconds(this.requestTimeoutMs)).connectTimeout(Timeout.ofMilliseconds(this.requestTimeoutMs)).execute(this.httpClient).handleResponse(createHandler(200, new TypeReference<List<String>>() { // from class: io.confluent.ksql.services.DefaultConnectClient.3
                }, Function.identity()));
            });
            withRetries.error().ifPresent(str -> {
                LOG.warn("Could not list connectors: {}.", str);
            });
            return withRetries;
        } catch (Exception e) {
            throw new KsqlServerException(e);
        }
    }

    public ConnectClient.ConnectResponse<List<PluginInfo>> connectorPlugins() {
        try {
            LOG.debug("Issuing request to Kafka Connect at URI {} to list connector plugins", this.connectUri);
            ConnectClient.ConnectResponse<List<PluginInfo>> withRetries = withRetries(() -> {
                return (ConnectClient.ConnectResponse) Request.get(resolveUri(CONNECTOR_PLUGINS)).setHeaders(this.requestHeaders).responseTimeout(Timeout.ofMilliseconds(this.requestTimeoutMs)).connectTimeout(Timeout.ofMilliseconds(this.requestTimeoutMs)).execute(this.httpClient).handleResponse(createHandler(200, new TypeReference<List<PluginInfo>>() { // from class: io.confluent.ksql.services.DefaultConnectClient.4
                }, Function.identity()));
            });
            withRetries.error().ifPresent(str -> {
                LOG.warn("Could not list connector plugins: {}.", str);
            });
            return withRetries;
        } catch (Exception e) {
            throw new KsqlServerException(e);
        }
    }

    public ConnectClient.ConnectResponse<ConnectorStateInfo> status(String str) {
        try {
            LOG.debug("Issuing status request to Kafka Connect at URI {} with name {}", this.connectUri, str);
            ConnectClient.ConnectResponse<ConnectorStateInfo> withRetries = withRetries(() -> {
                return (ConnectClient.ConnectResponse) Request.get(resolveUri("/connectors/" + str + STATUS)).setHeaders(this.requestHeaders).responseTimeout(Timeout.ofMilliseconds(this.requestTimeoutMs)).connectTimeout(Timeout.ofMilliseconds(this.requestTimeoutMs)).execute(this.httpClient).handleResponse(createHandler(200, new TypeReference<ConnectorStateInfo>() { // from class: io.confluent.ksql.services.DefaultConnectClient.5
                }, Function.identity()));
            });
            withRetries.error().ifPresent(str2 -> {
                LOG.warn("Could not query status of connector {}: {}", str, str2);
            });
            return withRetries;
        } catch (Exception e) {
            throw new KsqlServerException(e);
        }
    }

    public ConnectClient.ConnectResponse<ConnectorInfo> describe(String str) {
        try {
            LOG.debug("Issuing request to Kafka Connect at URI {} to get config for {}", this.connectUri, str);
            ConnectClient.ConnectResponse<ConnectorInfo> withRetries = withRetries(() -> {
                return (ConnectClient.ConnectResponse) Request.get(resolveUri(String.format("%s/%s", CONNECTORS, str))).setHeaders(this.requestHeaders).responseTimeout(Timeout.ofMilliseconds(this.requestTimeoutMs)).connectTimeout(Timeout.ofMilliseconds(this.requestTimeoutMs)).execute(this.httpClient).handleResponse(createHandler(200, new TypeReference<ConnectorInfo>() { // from class: io.confluent.ksql.services.DefaultConnectClient.6
                }, Function.identity()));
            });
            withRetries.error().ifPresent(str2 -> {
                LOG.warn("Could not list connectors: {}.", str2);
            });
            return withRetries;
        } catch (Exception e) {
            throw new KsqlServerException(e);
        }
    }

    public ConnectClient.ConnectResponse<String> delete(String str) {
        try {
            LOG.debug("Issuing request to Kafka Connect at URI {} to delete {}", this.connectUri, str);
            ConnectClient.ConnectResponse<String> withRetries = withRetries(() -> {
                return (ConnectClient.ConnectResponse) Request.delete(resolveUri(String.format("%s/%s", CONNECTORS, str))).setHeaders(this.requestHeaders).responseTimeout(Timeout.ofMilliseconds(this.requestTimeoutMs)).connectTimeout(Timeout.ofMilliseconds(this.requestTimeoutMs)).execute(this.httpClient).handleResponse(createHandler((List<Integer>) ImmutableList.of(204, 200), new TypeReference<Object>() { // from class: io.confluent.ksql.services.DefaultConnectClient.7
                }, obj -> {
                    return str;
                }));
            });
            withRetries.error().ifPresent(str2 -> {
                LOG.warn("Could not delete connector: {}.", str2);
            });
            return withRetries;
        } catch (Exception e) {
            throw new KsqlServerException(e);
        }
    }

    public ConnectClient.ConnectResponse<Map<String, Map<String, List<String>>>> topics(String str) {
        try {
            LOG.debug("Issuing request to Kafka Connect at URI {} to get active topics for {}", this.connectUri, str);
            ConnectClient.ConnectResponse<Map<String, Map<String, List<String>>>> withRetries = withRetries(() -> {
                return (ConnectClient.ConnectResponse) Request.get(resolveUri("/connectors/" + str + TOPICS)).setHeaders(this.requestHeaders).responseTimeout(Timeout.ofMilliseconds(this.requestTimeoutMs)).connectTimeout(Timeout.ofMilliseconds(this.requestTimeoutMs)).execute(this.httpClient).handleResponse(createHandler(200, new TypeReference<Map<String, Map<String, List<String>>>>() { // from class: io.confluent.ksql.services.DefaultConnectClient.8
                }, Function.identity()));
            });
            withRetries.error().ifPresent(str2 -> {
                LOG.warn("Could not query topics of connector {}: {}", str, str2);
            });
            return withRetries;
        } catch (Exception e) {
            throw new KsqlServerException(e);
        }
    }

    @VisibleForTesting
    public Header[] getRequestHeaders() {
        return (Header[]) this.requestHeaders.clone();
    }

    private String resolveUri(String str) {
        try {
            return new URI(this.connectUri.getScheme(), this.connectUri.getUserInfo(), this.connectUri.getHost(), this.connectUri.getPort(), Paths.get(this.connectUri.getPath(), str).toString(), this.connectUri.getQuery(), this.connectUri.getFragment()).toString();
        } catch (URISyntaxException e) {
            throw new KsqlServerException("Failed to resolve URI", e);
        }
    }

    private static Header[] buildHeaders(Optional<String> optional, Map<String, String> map) {
        ArrayList arrayList = new ArrayList();
        optional.ifPresent(str -> {
            arrayList.add(new BasicHeader(HttpHeaders.AUTHORIZATION.toString(), optional.get()));
        });
        if (!map.isEmpty()) {
            arrayList.addAll((List) map.entrySet().stream().map(entry -> {
                return new BasicHeader((String) entry.getKey(), entry.getValue());
            }).collect(Collectors.toList()));
        }
        return (Header[]) arrayList.toArray(new Header[0]);
    }

    private static CloseableHttpClient buildHttpClient(Optional<SSLContext> optional, boolean z) {
        PoolingHttpClientConnectionManagerBuilder create = PoolingHttpClientConnectionManagerBuilder.create();
        optional.ifPresent(sSLContext -> {
            create.setSSLSocketFactory(z ? new SSLConnectionSocketFactory(sSLContext) : new SSLConnectionSocketFactory(sSLContext, (str, sSLSession) -> {
                return true;
            }));
        });
        return HttpClientBuilder.create().setConnectionManager(create.setMaxConnPerRoute(100).setMaxConnTotal(200).setValidateAfterInactivity(TimeValue.ofSeconds(10L)).build()).useSystemProperties().evictExpiredConnections().evictIdleConnections(TimeValue.ofMinutes(1L)).build();
    }

    private static <T> ConnectClient.ConnectResponse<T> withRetries(Callable<ConnectClient.ConnectResponse<T>> callable) {
        try {
            return (ConnectClient.ConnectResponse) RetryerBuilder.newBuilder().withStopStrategy(StopStrategies.stopAfterAttempt(MAX_ATTEMPTS)).withWaitStrategy(WaitStrategies.exponentialWait()).retryIfResult(connectResponse -> {
                return connectResponse == null || connectResponse.httpCode() >= 500 || connectResponse.httpCode() == 409;
            }).retryIfException().build().call(callable);
        } catch (RetryException e) {
            LOG.warn("Failed to query connect cluster after {} attempts.", Integer.valueOf(e.getNumberOfFailedAttempts()));
            if (e.getLastFailedAttempt().hasResult()) {
                return (ConnectClient.ConnectResponse) e.getLastFailedAttempt().getResult();
            }
            throw new KsqlServerException(e.getCause());
        } catch (ExecutionException e2) {
            throw new KsqlServerException("Unexpected exception!", e2);
        }
    }

    private static <T, C> HttpClientResponseHandler<ConnectClient.ConnectResponse<T>> createHandler(int i, TypeReference<C> typeReference, Function<C, T> function) {
        return createHandler((List<Integer>) Collections.singletonList(Integer.valueOf(i)), typeReference, function);
    }

    private static <T, C> HttpClientResponseHandler<ConnectClient.ConnectResponse<T>> createHandler(List<Integer> list, TypeReference<C> typeReference, Function<C, T> function) {
        return classicHttpResponse -> {
            int code = classicHttpResponse.getCode();
            if (!list.contains(Integer.valueOf(classicHttpResponse.getCode()))) {
                return ConnectClient.ConnectResponse.failure(EntityUtils.toString(classicHttpResponse.getEntity()), code);
            }
            HttpEntity entity = classicHttpResponse.getEntity();
            return ConnectClient.ConnectResponse.success(function.apply(entity == null ? null : MAPPER.readValue(entity.getContent(), typeReference)), code);
        };
    }
}
