package io.confluent.ksql.rest.server.resources.streaming;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.api.server.MetricsCallbackHolder;
import io.confluent.ksql.api.server.NextHandlerOutput;
import io.confluent.ksql.execution.pull.PullQueryResult;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.properties.DenyListPropertyValidator;
import io.confluent.ksql.rest.ApiJsonMapper;
import io.confluent.ksql.rest.EndpointResponse;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.StatementParser;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.query.QueryExecutor;
import io.confluent.ksql.rest.server.query.QueryMetadataHolder;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.rest.util.CommandStoreUtil;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import io.vertx.core.Context;
import java.time.Clock;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.class */
public class StreamedQueryResource {
    private static final Logger log = LoggerFactory.getLogger(StreamedQueryResource.class);
    private static final ObjectMapper OBJECT_MAPPER = ApiJsonMapper.INSTANCE.get();
    private final KsqlExecutionContext ksqlEngine;
    private final StatementParser statementParser;
    private final CommandQueue commandQueue;
    private final Duration disconnectCheckInterval;
    private final Duration commandQueueCatchupTimeout;
    private final ActivenessRegistrar activenessRegistrar;
    private final Optional<KsqlAuthorizationValidator> authorizationValidator;
    private final Errors errorHandler;
    private final DenyListPropertyValidator denyListPropertyValidator;
    private final QueryExecutor queryExecutor;
    private KsqlRestConfig ksqlRestConfig;

    public StreamedQueryResource(KsqlExecutionContext ksqlExecutionContext, KsqlRestConfig ksqlRestConfig, CommandQueue commandQueue, Duration duration, Duration duration2, ActivenessRegistrar activenessRegistrar, Optional<KsqlAuthorizationValidator> optional, Errors errors, DenyListPropertyValidator denyListPropertyValidator, QueryExecutor queryExecutor) {
        this(ksqlExecutionContext, ksqlRestConfig, new StatementParser(ksqlExecutionContext), commandQueue, duration, duration2, activenessRegistrar, optional, errors, denyListPropertyValidator, queryExecutor);
    }

    @VisibleForTesting
    StreamedQueryResource(KsqlExecutionContext ksqlExecutionContext, KsqlRestConfig ksqlRestConfig, StatementParser statementParser, CommandQueue commandQueue, Duration duration, Duration duration2, ActivenessRegistrar activenessRegistrar, Optional<KsqlAuthorizationValidator> optional, Errors errors, DenyListPropertyValidator denyListPropertyValidator, QueryExecutor queryExecutor) {
        this.ksqlEngine = (KsqlExecutionContext) Objects.requireNonNull(ksqlExecutionContext, "ksqlEngine");
        this.ksqlRestConfig = (KsqlRestConfig) Objects.requireNonNull(ksqlRestConfig, "ksqlRestConfig");
        this.statementParser = (StatementParser) Objects.requireNonNull(statementParser, "statementParser");
        this.commandQueue = (CommandQueue) Objects.requireNonNull(commandQueue, "commandQueue");
        this.disconnectCheckInterval = (Duration) Objects.requireNonNull(duration, "disconnectCheckInterval");
        this.commandQueueCatchupTimeout = (Duration) Objects.requireNonNull(duration2, "commandQueueCatchupTimeout");
        this.activenessRegistrar = (ActivenessRegistrar) Objects.requireNonNull(activenessRegistrar, "activenessRegistrar");
        this.authorizationValidator = optional;
        this.errorHandler = (Errors) Objects.requireNonNull(errors, "errorHandler");
        this.denyListPropertyValidator = (DenyListPropertyValidator) Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator");
        this.queryExecutor = queryExecutor;
    }

    public EndpointResponse streamQuery(KsqlSecurityContext ksqlSecurityContext, KsqlRequest ksqlRequest, CompletableFuture<Void> completableFuture, Optional<Boolean> optional, MetricsCallbackHolder metricsCallbackHolder, Context context) {
        throwIfNotConfigured();
        this.activenessRegistrar.updateLastRequestTime();
        KsqlParser.PreparedStatement<?> parseStatement = parseStatement(ksqlRequest);
        CommandStoreUtil.httpWaitForCommandSequenceNumber(this.commandQueue, ksqlRequest, this.commandQueueCatchupTimeout);
        return handleStatement(ksqlSecurityContext, ksqlRequest, parseStatement, completableFuture, optional, metricsCallbackHolder, context);
    }

    private KsqlParser.PreparedStatement<?> parseStatement(KsqlRequest ksqlRequest) {
        String unmaskedKsql = ksqlRequest.getUnmaskedKsql();
        if (unmaskedKsql.trim().isEmpty()) {
            throw new KsqlRestException(Errors.badRequest("\"ksql\" field must be populated"));
        }
        try {
            return this.statementParser.parseSingleStatement(unmaskedKsql);
        } catch (IllegalArgumentException | KsqlException e) {
            throw new KsqlRestException(Errors.badStatement(e, unmaskedKsql));
        }
    }

    private void throwIfNotConfigured() {
        if (!this.ksqlEngine.getKsqlConfig().getKsqlStreamConfigProps().containsKey("application.server")) {
            throw new KsqlRestException(Errors.notReady());
        }
    }

    private EndpointResponse handleStatement(KsqlSecurityContext ksqlSecurityContext, KsqlRequest ksqlRequest, KsqlParser.PreparedStatement<?> preparedStatement, CompletableFuture<Void> completableFuture, Optional<Boolean> optional, MetricsCallbackHolder metricsCallbackHolder, Context context) {
        try {
            this.authorizationValidator.ifPresent(ksqlAuthorizationValidator -> {
                ksqlAuthorizationValidator.checkAuthorization(ksqlSecurityContext, this.ksqlEngine.getMetaStore(), preparedStatement.getStatement());
            });
            Map<String, Object> configOverrides = ksqlRequest.getConfigOverrides();
            this.denyListPropertyValidator.validateAll(configOverrides);
            return preparedStatement.getStatement() instanceof Query ? shouldMigrateToQueryStream(ksqlRequest.getConfigOverrides()) ? EndpointResponse.ok(new NextHandlerOutput()) : handleQuery(preparedStatement, completableFuture, this.queryExecutor.handleStatement(ksqlSecurityContext.getServiceContext(), ksqlRequest.getConfigOverrides(), ksqlRequest.getRequestProperties(), preparedStatement, optional, metricsCallbackHolder, context, false)) : preparedStatement.getStatement() instanceof PrintTopic ? handlePrintTopic(ksqlSecurityContext.getServiceContext(), configOverrides, preparedStatement, completableFuture) : Errors.badRequest(String.format("Statement type `%s' not supported for this resource", preparedStatement.getClass().getName()));
        } catch (KsqlException e) {
            return this.errorHandler.generateResponse(e, Errors.badRequest(e));
        } catch (TopicAuthorizationException e2) {
            return this.errorHandler.accessDeniedFromKafkaResponse(e2);
        } catch (KsqlStatementException e3) {
            return Errors.badStatement(e3.getRawMessage(), e3.getSqlStatement());
        }
    }

    private EndpointResponse handleQuery(KsqlParser.PreparedStatement<Query> preparedStatement, CompletableFuture<Void> completableFuture, QueryMetadataHolder queryMetadataHolder) {
        if (!queryMetadataHolder.getPullQueryResult().isPresent()) {
            return queryMetadataHolder.getPushQueryMetadata().isPresent() ? EndpointResponse.ok(new QueryStreamWriter(queryMetadataHolder.getPushQueryMetadata().get(), this.disconnectCheckInterval.toMillis(), OBJECT_MAPPER, completableFuture, ((Boolean) queryMetadataHolder.getStreamPullQueryMetadata().map(streamPullQueryMetadata -> {
                return Boolean.valueOf(streamPullQueryMetadata.getEndOffsets().isEmpty());
            }).orElse(false)).booleanValue())) : Errors.badRequest(String.format("Statement type `%s' not supported for this resource", preparedStatement.getClass().getName()));
        }
        PullQueryResult pullQueryResult = queryMetadataHolder.getPullQueryResult().get();
        return EndpointResponse.ok(new PullQueryStreamWriter(pullQueryResult, this.disconnectCheckInterval.toMillis(), OBJECT_MAPPER, pullQueryResult.getPullQueryQueue(), Clock.systemUTC(), completableFuture, preparedStatement));
    }

    private EndpointResponse handlePrintTopic(ServiceContext serviceContext, Map<String, Object> map, KsqlParser.PreparedStatement<PrintTopic> preparedStatement, CompletableFuture<Void> completableFuture) {
        PrintTopic statement = preparedStatement.getStatement();
        String topic = statement.getTopic();
        if (!serviceContext.getTopicClient().isTopicExists(topic)) {
            Collection<String> findPossibleTopicMatches = findPossibleTopicMatches(topic, serviceContext);
            throw new KsqlRestException(Errors.badRequest("Could not find topic '" + topic + "', or the KSQL user does not have permissions to list the topic. Topic names are case-sensitive." + (findPossibleTopicMatches.isEmpty() ? "" : (String) findPossibleTopicMatches.stream().map(str -> {
                return "\tprint " + str + ";";
            }).collect(Collectors.joining(System.lineSeparator(), System.lineSeparator() + "Did you mean:" + System.lineSeparator(), "")))));
        }
        HashMap hashMap = new HashMap(this.ksqlEngine.getKsqlConfig().getKsqlStreamConfigProps());
        hashMap.putAll(map);
        TopicStreamWriter create = TopicStreamWriter.create(serviceContext, hashMap, statement, this.disconnectCheckInterval, completableFuture);
        log.info("Printing topic '{}'", topic);
        return EndpointResponse.ok(create);
    }

    private static Collection<String> findPossibleTopicMatches(String str, ServiceContext serviceContext) {
        return (Collection) serviceContext.getTopicClient().listTopicNames().stream().filter(str2 -> {
            return str2.equalsIgnoreCase(str);
        }).collect(Collectors.toSet());
    }

    private boolean shouldMigrateToQueryStream(Map<String, Object> map) {
        return map.containsKey("ksql.endpoint.migrate.query") ? ((Boolean) map.get("ksql.endpoint.migrate.query")).booleanValue() : this.ksqlEngine.getKsqlConfig().getBoolean("ksql.endpoint.migrate.query").booleanValue();
    }
}
