package io.confluent.ksql.rest.client;

import com.google.common.base.Functions;
import io.confluent.ksql.properties.LocalProperties;
import io.confluent.ksql.rest.client.exception.KsqlRestClientException;
import io.confluent.ksql.rest.entity.ClusterStatusResponse;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatuses;
import io.confluent.ksql.rest.entity.HealthCheckResponse;
import io.confluent.ksql.rest.entity.HeartbeatMessage;
import io.confluent.ksql.rest.entity.HeartbeatResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlHostInfoEntity;
import io.confluent.ksql.rest.entity.KsqlMediaType;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.LagReportingMessage;
import io.confluent.ksql.rest.entity.LagReportingResponse;
import io.confluent.ksql.rest.entity.QueryStreamArgs;
import io.confluent.ksql.rest.entity.ServerClusterId;
import io.confluent.ksql.rest.entity.ServerInfo;
import io.confluent.ksql.rest.entity.ServerMetadata;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.util.VertxCompletableFuture;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.streams.WriteStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/client/KsqlTarget.class */
public final class KsqlTarget {
    private static final Logger log = LoggerFactory.getLogger(KsqlTarget.class);
    private static final String STATUS_PATH = "/status";
    private static final String KSQL_PATH = "/ksql";
    private static final String QUERY_PATH = "/query";
    private static final String QUERY_STREAM_PATH = "/query-stream";
    private static final String HEARTBEAT_PATH = "/heartbeat";
    private static final String CLUSTERSTATUS_PATH = "/clusterStatus";
    private static final String LAG_REPORT_PATH = "/lag";
    private static final String SERVER_METADATA_PATH = "/v1/metadata";
    private static final String SERVER_METADATA_ID_PATH = "/v1/metadata/id";
    private static final String IS_VALID_PATH = "/is_valid_property/";
    private final HttpClient httpClient;
    private final SocketAddress socketAddress;
    private final LocalProperties localProperties;
    private final Optional<String> authHeader;
    private final String host;
    private final Map<String, String> additionalHeaders;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KsqlTarget(HttpClient httpClient, SocketAddress socketAddress, LocalProperties localProperties, Optional<String> optional, String str, Map<String, String> map) {
        this.httpClient = (HttpClient) Objects.requireNonNull(httpClient, "httpClient");
        this.socketAddress = (SocketAddress) Objects.requireNonNull(socketAddress, "socketAddress");
        this.localProperties = (LocalProperties) Objects.requireNonNull(localProperties, "localProperties");
        this.authHeader = (Optional) Objects.requireNonNull(optional, "authHeader");
        this.host = str;
        this.additionalHeaders = (Map) Objects.requireNonNull(map, "additionalHeaders");
    }

    public KsqlTarget authorizationHeader(String str) {
        return new KsqlTarget(this.httpClient, this.socketAddress, this.localProperties, Optional.of(str), this.host, this.additionalHeaders);
    }

    public KsqlTarget properties(Map<String, ?> map) {
        return new KsqlTarget(this.httpClient, this.socketAddress, new LocalProperties(map), this.authHeader, this.host, this.additionalHeaders);
    }

    public RestResponse<ServerInfo> getServerInfo() {
        return get("/info", ServerInfo.class);
    }

    public RestResponse<HealthCheckResponse> getServerHealth() {
        return get("/healthcheck", HealthCheckResponse.class);
    }

    public CompletableFuture<RestResponse<HeartbeatResponse>> postAsyncHeartbeatRequest(KsqlHostInfoEntity ksqlHostInfoEntity, long j) {
        return executeRequestAsync(HttpMethod.POST, HEARTBEAT_PATH, new HeartbeatMessage(ksqlHostInfoEntity, j), responseWithBody -> {
            return (HeartbeatResponse) KsqlClientUtil.deserialize(responseWithBody.getBody(), HeartbeatResponse.class);
        });
    }

    public RestResponse<ClusterStatusResponse> getClusterStatus() {
        return get(CLUSTERSTATUS_PATH, ClusterStatusResponse.class);
    }

    public CompletableFuture<RestResponse<LagReportingResponse>> postAsyncLagReportingRequest(LagReportingMessage lagReportingMessage) {
        return executeRequestAsync(HttpMethod.POST, LAG_REPORT_PATH, lagReportingMessage, responseWithBody -> {
            return (LagReportingResponse) KsqlClientUtil.deserialize(responseWithBody.getBody(), LagReportingResponse.class);
        });
    }

    public RestResponse<CommandStatuses> getStatuses() {
        return get(STATUS_PATH, CommandStatuses.class);
    }

    public RestResponse<CommandStatus> getStatus(String str) {
        return get("/status/" + str, CommandStatus.class);
    }

    public RestResponse<ServerMetadata> getServerMetadata() {
        return get(SERVER_METADATA_PATH, ServerMetadata.class);
    }

    public RestResponse<ServerClusterId> getServerMetadataId() {
        return get(SERVER_METADATA_ID_PATH, ServerClusterId.class);
    }

    public RestResponse<Boolean> getIsValidRequest(String str) {
        return get(IS_VALID_PATH + str, Boolean.class);
    }

    public RestResponse<KsqlEntityList> postKsqlRequest(String str, Map<String, ?> map, Optional<Long> optional) {
        return post(KSQL_PATH, createKsqlRequest(str, map, optional), responseWithBody -> {
            return (KsqlEntityList) KsqlClientUtil.deserialize(responseWithBody.getBody(), KsqlEntityList.class);
        });
    }

    public RestResponse<Integer> postQueryRequest(String str, Map<String, ?> map, Optional<Long> optional, WriteStream<List<StreamedRow>> writeStream, CompletableFuture<Void> completableFuture, Function<StreamedRow, StreamedRow> function) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        KsqlRequest createKsqlRequest = createKsqlRequest(str, map, optional);
        atomicInteger.getClass();
        return post(QUERY_PATH, createKsqlRequest, atomicInteger::get, buffer -> {
            List<StreamedRow> rows = KsqlTargetUtil.toRows(buffer, function);
            atomicInteger.addAndGet(rows.size());
            return rows;
        }, "\n", writeStream, completableFuture);
    }

    public RestResponse<List<StreamedRow>> postQueryRequest(String str, Map<String, ?> map, Optional<Long> optional) {
        return post(QUERY_PATH, createKsqlRequest(str, map, optional), KsqlTarget::toRows);
    }

    public RestResponse<List<StreamedRow>> postQueryStreamRequestProto(String str, Map<String, Object> map) {
        return executeRequestSync(HttpMethod.POST, QUERY_STREAM_PATH, new QueryStreamArgs(str, this.localProperties.toMap(), Collections.emptyMap(), map), KsqlTarget::toRowsFromProto, Optional.of(KsqlMediaType.KSQL_V1_PROTOBUF.mediaType()));
    }

    public RestResponse<StreamPublisher<StreamedRow>> postQueryRequestStreamed(String str, Map<String, ?> map, Optional<Long> optional) {
        return executeQueryRequestWithStreamResponse(str, map, optional, buffer -> {
            return (StreamedRow) KsqlClientUtil.deserialize(buffer, StreamedRow.class);
        });
    }

    public CompletableFuture<RestResponse<StreamPublisher<StreamedRow>>> postQueryRequestStreamedAsync(String str, Map<String, ?> map) {
        return executeQueryStreamRequest(str, map, KsqlTargetUtil::toRowFromDelimited);
    }

    public RestResponse<StreamPublisher<String>> postPrintTopicRequest(String str, Optional<Long> optional) {
        return executeQueryRequestWithStreamResponse(str, Collections.emptyMap(), optional, (v0) -> {
            return v0.toString();
        });
    }

    private KsqlRequest createKsqlRequest(String str, Map<String, ?> map, Optional<Long> optional) {
        return new KsqlRequest(str, this.localProperties.toMap(), map, optional.orElse(null));
    }

    private <T> RestResponse<T> get(String str, Class<T> cls) {
        return executeRequestSync(HttpMethod.GET, str, null, responseWithBody -> {
            return KsqlClientUtil.deserialize(responseWithBody.getBody(), cls);
        }, Optional.empty());
    }

    private <T> RestResponse<T> post(String str, Object obj, Function<ResponseWithBody, T> function) {
        return executeRequestSync(HttpMethod.POST, str, obj, function, Optional.empty());
    }

    private <R, T> RestResponse<R> post(String str, Object obj, Supplier<R> supplier, Function<Buffer, T> function, String str2, WriteStream<T> writeStream, CompletableFuture<Void> completableFuture) {
        return executeRequestSync(HttpMethod.POST, str, obj, supplier, function, str2, writeStream, completableFuture);
    }

    private <T> CompletableFuture<RestResponse<T>> executeRequestAsync(HttpMethod httpMethod, String str, Object obj, Function<ResponseWithBody, T> function) {
        return executeAsync(httpMethod, str, Optional.empty(), obj, function, (httpClientResponse, completableFuture) -> {
            httpClientResponse.bodyHandler(buffer -> {
                completableFuture.complete(new ResponseWithBody(httpClientResponse, buffer));
            });
        });
    }

    private <T> RestResponse<T> executeRequestSync(HttpMethod httpMethod, String str, Object obj, Function<ResponseWithBody, T> function, Optional<String> optional) {
        return executeSync(httpMethod, str, optional, obj, function, (httpClientResponse, completableFuture) -> {
            httpClientResponse.bodyHandler(buffer -> {
                completableFuture.complete(new ResponseWithBody(httpClientResponse, buffer));
            });
        });
    }

    private <R, T> RestResponse<R> executeRequestSync(HttpMethod httpMethod, String str, Object obj, Supplier<R> supplier, Function<Buffer, T> function, String str2, WriteStream<T> writeStream, CompletableFuture<Void> completableFuture) {
        return executeSync(httpMethod, str, Optional.empty(), obj, responseWithBody -> {
            return supplier.get();
        }, (httpClientResponse, completableFuture2) -> {
            RecordParser newDelimited = RecordParser.newDelimited(str2, httpClientResponse);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            BufferMapWriteStream bufferMapWriteStream = new BufferMapWriteStream(function, writeStream);
            completableFuture2.getClass();
            newDelimited.exceptionHandler(completableFuture2::completeExceptionally);
            newDelimited.pipe().endOnSuccess(false).to(bufferMapWriteStream, asyncResult -> {
                atomicBoolean.set(true);
                if (asyncResult.succeeded()) {
                    completableFuture2.complete(new ResponseWithBody(httpClientResponse, Buffer.buffer()));
                }
                if (asyncResult.failed()) {
                    log.error("Error while handling response.", asyncResult.cause());
                    completableFuture2.completeExceptionally(asyncResult.cause());
                }
            });
            Context currentContext = Vertx.currentContext();
            completableFuture.handle((r9, th) -> {
                currentContext.runOnContext(r8 -> {
                    if (atomicBoolean.get()) {
                        return;
                    }
                    try {
                        httpClientResponse.request().connection().close();
                        completableFuture2.completeExceptionally(new KsqlRestClientException("Closing connection"));
                    } catch (Throwable th) {
                        log.error("Error while handling close", th);
                        completableFuture2.completeExceptionally(th);
                    }
                });
                return null;
            });
        });
    }

    private <T> RestResponse<StreamPublisher<T>> executeQueryRequestWithStreamResponse(String str, Map<String, ?> map, Optional<Long> optional, Function<Buffer, T> function) {
        KsqlRequest createKsqlRequest = createKsqlRequest(str, map, optional);
        AtomicReference atomicReference = new AtomicReference();
        return executeSync(HttpMethod.POST, QUERY_PATH, Optional.empty(), createKsqlRequest, responseWithBody -> {
            return (StreamPublisher) atomicReference.get();
        }, (httpClientResponse, completableFuture) -> {
            if (httpClientResponse.statusCode() != 200) {
                httpClientResponse.bodyHandler(buffer -> {
                    completableFuture.complete(new ResponseWithBody(httpClientResponse, buffer));
                });
            } else {
                atomicReference.set(new StreamPublisher(Vertx.currentContext(), httpClientResponse, function, completableFuture, true));
                completableFuture.complete(new ResponseWithBody(httpClientResponse));
            }
        });
    }

    private <T> CompletableFuture<RestResponse<StreamPublisher<T>>> executeQueryStreamRequest(String str, Map<String, ?> map, Function<Buffer, T> function) {
        QueryStreamArgs queryStreamArgs = new QueryStreamArgs(str, this.localProperties.toMap(), Collections.emptyMap(), (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
        AtomicReference atomicReference = new AtomicReference();
        return executeAsync(HttpMethod.POST, QUERY_STREAM_PATH, Optional.of("application/vnd.ksqlapi.delimited.v1"), queryStreamArgs, responseWithBody -> {
            return (StreamPublisher) atomicReference.get();
        }, (httpClientResponse, completableFuture) -> {
            if (httpClientResponse.statusCode() != 200) {
                httpClientResponse.bodyHandler(buffer -> {
                    completableFuture.complete(new ResponseWithBody(httpClientResponse, buffer));
                });
            } else {
                atomicReference.set(new StreamPublisher(Vertx.currentContext(), httpClientResponse, function, completableFuture, false));
                completableFuture.complete(new ResponseWithBody(httpClientResponse));
            }
        });
    }

    private <T> RestResponse<T> executeSync(HttpMethod httpMethod, String str, Optional<String> optional, Object obj, Function<ResponseWithBody, T> function, BiConsumer<HttpClientResponse, CompletableFuture<ResponseWithBody>> biConsumer) {
        try {
            return KsqlClientUtil.toRestResponse(execute(httpMethod, str, optional, obj, biConsumer).get(), str, function);
        } catch (Exception e) {
            throw new KsqlRestClientException("Error issuing " + httpMethod + " to KSQL server. path:" + str, e);
        }
    }

    private <T> CompletableFuture<RestResponse<T>> executeAsync(HttpMethod httpMethod, String str, Optional<String> optional, Object obj, Function<ResponseWithBody, T> function, BiConsumer<HttpClientResponse, CompletableFuture<ResponseWithBody>> biConsumer) {
        return (CompletableFuture<RestResponse<T>>) execute(httpMethod, str, optional, obj, biConsumer).thenApply(responseWithBody -> {
            return KsqlClientUtil.toRestResponse(responseWithBody, str, function);
        });
    }

    private CompletableFuture<ResponseWithBody> execute(HttpMethod httpMethod, String str, Optional<String> optional, Object obj, BiConsumer<HttpClientResponse, CompletableFuture<ResponseWithBody>> biConsumer) {
        VertxCompletableFuture vertxCompletableFuture = new VertxCompletableFuture();
        RequestOptions requestOptions = new RequestOptions();
        requestOptions.setMethod(httpMethod);
        requestOptions.setServer(this.socketAddress);
        requestOptions.setPort(Integer.valueOf(this.socketAddress.port()));
        requestOptions.setHost(this.host);
        requestOptions.setURI(str);
        this.httpClient.request(requestOptions, asyncResult -> {
            if (asyncResult.failed()) {
                vertxCompletableFuture.completeExceptionally(asyncResult.cause());
                return;
            }
            HttpClientRequest httpClientRequest = (HttpClientRequest) asyncResult.result();
            httpClientRequest.response(asyncResult -> {
                if (asyncResult.failed()) {
                    vertxCompletableFuture.completeExceptionally(asyncResult.cause());
                }
                biConsumer.accept(asyncResult.result(), vertxCompletableFuture);
            });
            vertxCompletableFuture.getClass();
            httpClientRequest.exceptionHandler(vertxCompletableFuture::completeExceptionally);
            if (optional.isPresent()) {
                httpClientRequest.putHeader("Accept", (String) optional.get());
            } else {
                httpClientRequest.putHeader("Accept", "application/json");
            }
            this.authHeader.ifPresent(str2 -> {
                httpClientRequest.putHeader("Authorization", str2);
            });
            Map<String, String> map = this.additionalHeaders;
            httpClientRequest.getClass();
            map.forEach(httpClientRequest::putHeader);
            if (obj != null) {
                httpClientRequest.end(KsqlClientUtil.serialize(obj));
            } else {
                httpClientRequest.end();
            }
        });
        return vertxCompletableFuture;
    }

    private static List<StreamedRow> toRows(ResponseWithBody responseWithBody) {
        return KsqlTargetUtil.toRows(responseWithBody.getBody(), Functions.identity());
    }

    private static List<StreamedRow> toRowsFromProto(ResponseWithBody responseWithBody) {
        return KsqlTargetUtil.toRows(responseWithBody.getBody(), Functions.identity());
    }
}
