/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.AcksPublisher;
import io.confluent.ksql.api.client.BatchedQueryResult;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.ConnectorDescription;
import io.confluent.ksql.api.client.ConnectorInfo;
import io.confluent.ksql.api.client.ExecuteStatementResult;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.QueryInfo;
import io.confluent.ksql.api.client.ServerInfo;
import io.confluent.ksql.api.client.SourceDescription;
import io.confluent.ksql.api.client.StreamInfo;
import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.TableInfo;
import io.confluent.ksql.api.client.TopicInfo;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.impl.AdminResponseHandlers;
import io.confluent.ksql.api.client.impl.BatchedQueryResultImpl;
import io.confluent.ksql.api.client.impl.ConnectorCommandResponseHandler;
import io.confluent.ksql.api.client.impl.DdlDmlRequestValidators;
import io.confluent.ksql.api.client.impl.DdlDmlResponseHandlers;
import io.confluent.ksql.api.client.impl.ExecuteQueryResponseHandler;
import io.confluent.ksql.api.client.impl.InsertIntoResponseHandler;
import io.confluent.ksql.api.client.impl.ResponseHandler;
import io.confluent.ksql.api.client.impl.StreamInsertsResponseHandler;
import io.confluent.ksql.api.client.impl.StreamQueryResponseHandler;
import io.confluent.ksql.util.VertxSslOptionsFactory;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.streams.ReadStream;
import java.nio.charset.Charset;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;

public class ClientImpl
implements Client {
    private static final String QUERY_STREAM_ENDPOINT = "/query-stream";
    private static final String INSERTS_ENDPOINT = "/inserts-stream";
    private static final String CLOSE_QUERY_ENDPOINT = "/close-query";
    private static final String KSQL_ENDPOINT = "/ksql";
    private static final String INFO_ENDPOINT = "/info";
    private final ClientOptions clientOptions;
    private final Vertx vertx;
    private final HttpClient httpClient;
    private final SocketAddress serverSocketAddress;
    private final String basicAuthHeader;
    private final boolean ownedVertx;
    private final Map<String, Object> sessionVariables;

    public ClientImpl(ClientOptions clientOptions) {
        this(clientOptions, Vertx.vertx(), true);
    }

    public ClientImpl(ClientOptions clientOptions, Vertx vertx) {
        this(clientOptions, vertx, false);
    }

    private ClientImpl(ClientOptions clientOptions, Vertx vertx, boolean ownedVertx) {
        this.clientOptions = clientOptions.copy();
        this.vertx = vertx;
        this.ownedVertx = ownedVertx;
        this.httpClient = ClientImpl.createHttpClient(vertx, clientOptions);
        this.basicAuthHeader = ClientImpl.createBasicAuthHeader(clientOptions);
        this.serverSocketAddress = SocketAddress.inetSocketAddress((int)clientOptions.getPort(), (String)clientOptions.getHost());
        this.sessionVariables = new HashMap<String, Object>();
    }

    @Override
    public CompletableFuture<StreamedQueryResult> streamQuery(String sql) {
        return this.streamQuery(sql, Collections.emptyMap());
    }

    @Override
    public CompletableFuture<StreamedQueryResult> streamQuery(String sql, Map<String, Object> properties) {
        CompletableFuture<StreamedQueryResult> cf = new CompletableFuture<StreamedQueryResult>();
        this.makeQueryRequest(sql, properties, cf, (ctx, rp, fut, req) -> new StreamQueryResponseHandler(ctx, rp, fut));
        return cf;
    }

    @Override
    public BatchedQueryResult executeQuery(String sql) {
        return this.executeQuery(sql, Collections.emptyMap());
    }

    @Override
    public BatchedQueryResult executeQuery(String sql, Map<String, Object> properties) {
        BatchedQueryResultImpl result = new BatchedQueryResultImpl();
        this.makeQueryRequest(sql, properties, result, (context, recordParser, cf, request) -> new ExecuteQueryResponseHandler(context, recordParser, (BatchedQueryResult)cf, this.clientOptions.getExecuteQueryMaxResultRows()));
        return result;
    }

    @Override
    public CompletableFuture<Void> insertInto(String streamName, KsqlObject row) {
        CompletableFuture<Void> cf = new CompletableFuture<Void>();
        Buffer requestBody = Buffer.buffer();
        JsonObject params = new JsonObject().put("target", streamName);
        requestBody.appendBuffer(params.toBuffer()).appendString("\n");
        requestBody.appendString(row.toJsonString()).appendString("\n");
        this.makePostRequest(INSERTS_ENDPOINT, requestBody, cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleStreamedResponse(response, cf, (ctx, rp, fut, req) -> new InsertIntoResponseHandler(ctx, rp, fut))));
        return cf;
    }

    @Override
    public CompletableFuture<AcksPublisher> streamInserts(String streamName, Publisher<KsqlObject> insertsPublisher) {
        CompletableFuture<AcksPublisher> cf = new CompletableFuture<AcksPublisher>();
        Buffer requestBody = Buffer.buffer();
        JsonObject params = new JsonObject().put("target", streamName);
        requestBody.appendBuffer(params.toBuffer()).appendString("\n");
        this.makePostRequest(INSERTS_ENDPOINT, requestBody, cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleStreamedResponse(response, cf, (ctx, rp, fut, req) -> new StreamInsertsResponseHandler(ctx, rp, fut, req, insertsPublisher))), false);
        return cf;
    }

    @Override
    public CompletableFuture<Void> terminatePushQuery(String queryId) {
        CompletableFuture<Void> cf = new CompletableFuture<Void>();
        this.makePostRequest(CLOSE_QUERY_ENDPOINT, new JsonObject().put("queryId", queryId), cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleCloseQueryResponse(response, cf)));
        return cf;
    }

    @Override
    public CompletableFuture<ExecuteStatementResult> executeStatement(String sql) {
        return this.executeStatement(sql, Collections.emptyMap());
    }

    @Override
    public CompletableFuture<ExecuteStatementResult> executeStatement(String sql, Map<String, Object> properties) {
        CompletableFuture<ExecuteStatementResult> cf = new CompletableFuture<ExecuteStatementResult>();
        if (!DdlDmlRequestValidators.validateExecuteStatementRequest(sql, cf)) {
            return cf;
        }
        this.makePostRequest(KSQL_ENDPOINT, new JsonObject().put("ksql", sql).put("streamsProperties", properties).put("sessionVariables", this.sessionVariables), cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleSingleEntityResponse(response, cf, DdlDmlResponseHandlers::handleExecuteStatementResponse, DdlDmlResponseHandlers::handleUnexpectedNumResponseEntities)));
        return cf;
    }

    @Override
    public CompletableFuture<List<StreamInfo>> listStreams() {
        CompletableFuture<List<StreamInfo>> cf = new CompletableFuture<List<StreamInfo>>();
        this.makePostRequest(KSQL_ENDPOINT, new JsonObject().put("ksql", "list streams;"), cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleSingleEntityResponse(response, cf, AdminResponseHandlers::handleListStreamsResponse)));
        return cf;
    }

    @Override
    public CompletableFuture<List<TableInfo>> listTables() {
        CompletableFuture<List<TableInfo>> cf = new CompletableFuture<List<TableInfo>>();
        this.makePostRequest(KSQL_ENDPOINT, new JsonObject().put("ksql", "list tables;"), cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleSingleEntityResponse(response, cf, AdminResponseHandlers::handleListTablesResponse)));
        return cf;
    }

    @Override
    public CompletableFuture<List<TopicInfo>> listTopics() {
        CompletableFuture<List<TopicInfo>> cf = new CompletableFuture<List<TopicInfo>>();
        this.makePostRequest(KSQL_ENDPOINT, new JsonObject().put("ksql", "list topics;"), cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleSingleEntityResponse(response, cf, AdminResponseHandlers::handleListTopicsResponse)));
        return cf;
    }

    @Override
    public CompletableFuture<List<QueryInfo>> listQueries() {
        CompletableFuture<List<QueryInfo>> cf = new CompletableFuture<List<QueryInfo>>();
        this.makePostRequest(KSQL_ENDPOINT, new JsonObject().put("ksql", "list queries;"), cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleSingleEntityResponse(response, cf, AdminResponseHandlers::handleListQueriesResponse)));
        return cf;
    }

    @Override
    public CompletableFuture<SourceDescription> describeSource(String sourceName) {
        CompletableFuture<SourceDescription> cf = new CompletableFuture<SourceDescription>();
        this.makePostRequest(KSQL_ENDPOINT, new JsonObject().put("ksql", "describe " + sourceName + ";").put("sessionVariables", this.sessionVariables), cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleSingleEntityResponse(response, cf, AdminResponseHandlers::handleDescribeSourceResponse)));
        return cf;
    }

    @Override
    public CompletableFuture<ServerInfo> serverInfo() {
        CompletableFuture<ServerInfo> cf = new CompletableFuture<ServerInfo>();
        this.makeGetRequest(INFO_ENDPOINT, new JsonObject(), cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleObjectResponse(response, cf, AdminResponseHandlers::handleServerInfoResponse)));
        return cf;
    }

    @Override
    public CompletableFuture<Void> createConnector(String name, boolean isSource, Map<String, Object> properties) {
        CompletableFuture<Void> cf = new CompletableFuture<Void>();
        String connectorConfigs = properties.entrySet().stream().map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue())).collect(Collectors.joining(","));
        String type = isSource ? "SOURCE" : "SINK";
        this.makePostRequest(KSQL_ENDPOINT, new JsonObject().put("ksql", String.format("CREATE %s CONNECTOR %s WITH (%s);", type, name, connectorConfigs)).put("sessionVariables", this.sessionVariables), cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleSingleEntityResponse(response, cf, ConnectorCommandResponseHandler::handleCreateConnectorResponse)));
        return cf;
    }

    @Override
    public CompletableFuture<Void> dropConnector(String name) {
        CompletableFuture<Void> cf = new CompletableFuture<Void>();
        this.makePostRequest(KSQL_ENDPOINT, new JsonObject().put("ksql", "drop connector " + name + ";").put("sessionVariables", this.sessionVariables), cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleSingleEntityResponse(response, cf, ConnectorCommandResponseHandler::handleDropConnectorResponse)));
        return cf;
    }

    @Override
    public CompletableFuture<List<ConnectorInfo>> listConnectors() {
        CompletableFuture<List<ConnectorInfo>> cf = new CompletableFuture<List<ConnectorInfo>>();
        this.makePostRequest(KSQL_ENDPOINT, new JsonObject().put("ksql", "list connectors;"), cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleSingleEntityResponse(response, cf, ConnectorCommandResponseHandler::handleListConnectorsResponse)));
        return cf;
    }

    @Override
    public CompletableFuture<ConnectorDescription> describeConnector(String name) {
        CompletableFuture<ConnectorDescription> cf = new CompletableFuture<ConnectorDescription>();
        this.makePostRequest(KSQL_ENDPOINT, new JsonObject().put("ksql", "describe connector " + name + ";").put("sessionVariables", this.sessionVariables), cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleSingleEntityResponse(response, cf, ConnectorCommandResponseHandler::handleDescribeConnectorsResponse)));
        return cf;
    }

    @Override
    public void define(String variable, Object value) {
        this.sessionVariables.put(variable, value);
    }

    @Override
    public void undefine(String variable) {
        this.sessionVariables.remove(variable);
    }

    @Override
    public Map<String, Object> getVariables() {
        return new HashMap<String, Object>(this.sessionVariables);
    }

    @Override
    public void close() {
        this.httpClient.close();
        if (this.ownedVertx) {
            this.vertx.close();
        }
    }

    private <T extends CompletableFuture<?>> void makeQueryRequest(String sql, Map<String, Object> properties, T cf, StreamedResponseHandlerSupplier<T> responseHandlerSupplier) {
        JsonObject requestBody = new JsonObject().put("sql", sql).put("properties", properties).put("sessionVariables", this.sessionVariables);
        this.makePostRequest(QUERY_STREAM_ENDPOINT, requestBody, cf, (Handler<HttpClientResponse>)((Handler)response -> ClientImpl.handleStreamedResponse(response, cf, responseHandlerSupplier)));
    }

    private <T extends CompletableFuture<?>> void makeGetRequest(String path, JsonObject requestBody, T cf, Handler<HttpClientResponse> responseHandler) {
        this.makeRequest(path, requestBody.toBuffer(), cf, responseHandler, true, HttpMethod.GET);
    }

    private <T extends CompletableFuture<?>> void makePostRequest(String path, JsonObject requestBody, T cf, Handler<HttpClientResponse> responseHandler) {
        this.makePostRequest(path, requestBody.toBuffer(), cf, responseHandler);
    }

    private <T extends CompletableFuture<?>> void makePostRequest(String path, Buffer requestBody, T cf, Handler<HttpClientResponse> responseHandler) {
        this.makePostRequest(path, requestBody, cf, responseHandler, true);
    }

    private <T extends CompletableFuture<?>> void makePostRequest(String path, Buffer requestBody, T cf, Handler<HttpClientResponse> responseHandler, boolean endRequest) {
        this.makeRequest(path, requestBody, cf, responseHandler, endRequest, HttpMethod.POST);
    }

    private <T extends CompletableFuture<?>> void makeRequest(String path, Buffer requestBody, T cf, Handler<HttpClientResponse> responseHandler, boolean endRequest, HttpMethod method) {
        HttpClientRequest request = this.httpClient.request(method, this.serverSocketAddress, this.clientOptions.getPort(), this.clientOptions.getHost(), path, responseHandler).exceptionHandler(cf::completeExceptionally);
        if (this.clientOptions.isUseBasicAuth()) {
            request = this.configureBasicAuth(request);
        }
        if (endRequest) {
            request.end(requestBody);
        } else {
            HttpClientRequest finalRequest = request;
            finalRequest.sendHead(version -> finalRequest.writeCustomFrame(0, 0, requestBody));
        }
    }

    private HttpClientRequest configureBasicAuth(HttpClientRequest request) {
        return request.putHeader(HttpHeaderNames.AUTHORIZATION.toString(), this.basicAuthHeader);
    }

    private static <T extends CompletableFuture<?>> void handleStreamedResponse(HttpClientResponse response, T cf, StreamedResponseHandlerSupplier<T> responseHandlerSupplier) {
        if (response.statusCode() == HttpResponseStatus.OK.code()) {
            RecordParser recordParser = RecordParser.newDelimited((String)"\n", (ReadStream)response);
            ResponseHandler<T> responseHandler = responseHandlerSupplier.get(Vertx.currentContext(), recordParser, cf, response.request());
            recordParser.handler(responseHandler::handleBodyBuffer);
            recordParser.endHandler(responseHandler::handleBodyEnd);
            recordParser.exceptionHandler(responseHandler::handleException);
        } else {
            ClientImpl.handleErrorResponse(response, cf);
        }
    }

    private static void handleCloseQueryResponse(HttpClientResponse response, CompletableFuture<Void> cf) {
        if (response.statusCode() == HttpResponseStatus.OK.code()) {
            cf.complete(null);
        } else {
            ClientImpl.handleErrorResponse(response, cf);
        }
    }

    private static <T> void handleSingleEntityResponse(HttpClientResponse response, CompletableFuture<T> cf, SingleEntityResponseHandler<T> responseHandler) {
        ClientImpl.handleSingleEntityResponse(response, cf, responseHandler, numEntities -> new IllegalStateException("Unexpected number of entities in server response: " + numEntities));
    }

    private static <T> void handleSingleEntityResponse(HttpClientResponse response, CompletableFuture<T> cf, SingleEntityResponseHandler<T> responseHandler, Function<Integer, RuntimeException> multipleEntityErrorSupplier) {
        if (response.statusCode() == HttpResponseStatus.OK.code()) {
            response.bodyHandler(buffer -> {
                JsonObject entity;
                JsonArray entities = buffer.toJsonArray();
                if (entities.size() != 1) {
                    cf.completeExceptionally((Throwable)multipleEntityErrorSupplier.apply(entities.size()));
                    return;
                }
                try {
                    entity = entities.getJsonObject(0);
                }
                catch (Exception e) {
                    cf.completeExceptionally(new IllegalStateException("Unexpected server response format. Response: " + entities.getJsonObject(0)));
                    return;
                }
                responseHandler.accept(entity, cf);
            });
        } else {
            ClientImpl.handleErrorResponse(response, cf);
        }
    }

    private static <T> void handleObjectResponse(HttpClientResponse response, CompletableFuture<T> cf, SingleEntityResponseHandler<T> responseHandler) {
        if (response.statusCode() == HttpResponseStatus.OK.code()) {
            response.bodyHandler(buffer -> {
                JsonObject entity = buffer.toJsonObject();
                responseHandler.accept(entity, cf);
            });
        } else {
            ClientImpl.handleErrorResponse(response, cf);
        }
    }

    private static <T extends CompletableFuture<?>> void handleErrorResponse(HttpClientResponse response, T cf) {
        response.bodyHandler(buffer -> {
            JsonObject errorResponse = buffer.toJsonObject();
            cf.completeExceptionally(new KsqlClientException(String.format("Received %d response from server: %s. Error code: %d", response.statusCode(), errorResponse.getString("message"), errorResponse.getInteger("error_code"))));
        });
    }

    private static HttpClient createHttpClient(Vertx vertx, ClientOptions clientOptions) {
        JksOptions jksOptions;
        HttpClientOptions options = new HttpClientOptions().setSsl(clientOptions.isUseTls()).setUseAlpn(clientOptions.isUseAlpn()).setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(false).setVerifyHost(clientOptions.isVerifyHost()).setDefaultHost(clientOptions.getHost()).setDefaultPort(clientOptions.getPort()).setHttp2MultiplexingLimit(clientOptions.getHttp2MultiplexingLimit());
        if (clientOptions.isUseTls() && !clientOptions.getTrustStore().isEmpty()) {
            jksOptions = VertxSslOptionsFactory.getJksTrustStoreOptions((String)clientOptions.getTrustStore(), (String)clientOptions.getTrustStorePassword());
            options = options.setTrustStoreOptions(jksOptions);
        }
        if (!clientOptions.getKeyStore().isEmpty()) {
            jksOptions = VertxSslOptionsFactory.buildJksKeyStoreOptions((String)clientOptions.getKeyStore(), (String)clientOptions.getKeyStorePassword(), Optional.of(clientOptions.getKeyPassword()), Optional.of(clientOptions.getKeyAlias()));
            options = options.setKeyStoreOptions(jksOptions);
        }
        return vertx.createHttpClient(options);
    }

    private static String createBasicAuthHeader(ClientOptions clientOptions) {
        if (!clientOptions.isUseBasicAuth()) {
            return "";
        }
        String creds = clientOptions.getBasicAuthUsername() + ":" + clientOptions.getBasicAuthPassword();
        String base64creds = Base64.getEncoder().encodeToString(creds.getBytes(Charset.defaultCharset()));
        return "Basic " + base64creds;
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        ClientImpl client = (ClientImpl)o;
        return this.clientOptions.equals(client.clientOptions) && this.vertx.equals(client.vertx);
    }

    public int hashCode() {
        return Objects.hash(this.clientOptions, this.vertx);
    }

    public String toString() {
        return "Client{clientOptions=" + this.clientOptions + ", vertx=" + this.vertx + '}';
    }

    @FunctionalInterface
    private static interface SingleEntityResponseHandler<T> {
        public void accept(JsonObject var1, CompletableFuture<T> var2);
    }

    @FunctionalInterface
    private static interface StreamedResponseHandlerSupplier<T extends CompletableFuture<?>> {
        public ResponseHandler<T> get(Context var1, RecordParser var2, T var3, HttpClientRequest var4);
    }
}

