package io.confluent.ksql.engine;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.analyzer.ImmutableAnalysis;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.execution.ExecutionPlan;
import io.confluent.ksql.execution.ExecutionPlanner;
import io.confluent.ksql.execution.ddl.commands.CreateTableCommand;
import io.confluent.ksql.execution.ddl.commands.DdlCommand;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.plan.PlanInfoExtractor;
import io.confluent.ksql.execution.pull.HARouting;
import io.confluent.ksql.execution.pull.PullPhysicalPlan;
import io.confluent.ksql.execution.pull.PullPhysicalPlanBuilder;
import io.confluent.ksql.execution.pull.PullQueryQueuePopulator;
import io.confluent.ksql.execution.pull.PullQueryResult;
import io.confluent.ksql.execution.pull.StreamedRowTranslator;
import io.confluent.ksql.execution.scalablepush.PushPhysicalPlan;
import io.confluent.ksql.execution.scalablepush.PushPhysicalPlanBuilder;
import io.confluent.ksql.execution.scalablepush.PushPhysicalPlanCreator;
import io.confluent.ksql.execution.scalablepush.PushPhysicalPlanManager;
import io.confluent.ksql.execution.scalablepush.PushQueryPreparer;
import io.confluent.ksql.execution.scalablepush.PushQueryQueuePopulator;
import io.confluent.ksql.execution.scalablepush.PushRouting;
import io.confluent.ksql.execution.scalablepush.PushRoutingOptions;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.internal.ScalablePushQueryMetrics;
import io.confluent.ksql.logging.query.QueryLogger;
import io.confluent.ksql.logicalplanner.LogicalPlan;
import io.confluent.ksql.logicalplanner.LogicalPlanner;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.MetaStoreImpl;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.OutputRefinement;
import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties;
import io.confluent.ksql.parser.tree.AliasedRelation;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
import io.confluent.ksql.parser.tree.ExecutableDdlStatement;
import io.confluent.ksql.parser.tree.Join;
import io.confluent.ksql.parser.tree.JoinedSource;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.QueryContainer;
import io.confluent.ksql.parser.tree.Relation;
import io.confluent.ksql.parser.tree.Select;
import io.confluent.ksql.parser.tree.SingleColumn;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.parser.tree.Table;
import io.confluent.ksql.physicalplanner.PhysicalPlan;
import io.confluent.ksql.physicalplanner.PhysicalPlanner;
import io.confluent.ksql.physicalplanner.nodes.Node;
import io.confluent.ksql.planner.LogicalPlanNode;
import io.confluent.ksql.planner.QueryPlannerOptions;
import io.confluent.ksql.planner.plan.DataSourceNode;
import io.confluent.ksql.planner.plan.KsqlBareOutputNode;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.PlanNodeId;
import io.confluent.ksql.planner.plan.VerifiableNode;
import io.confluent.ksql.query.PullQueryWriteStream;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.TransientQueryQueue;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.utils.FormatOptions;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.RefinementInfo;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.ConsistencyOffsetVector;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.PlanSummary;
import io.confluent.ksql.util.PushOffsetRange;
import io.confluent.ksql.util.PushQueryMetadata;
import io.confluent.ksql.util.QueryMask;
import io.confluent.ksql.util.ScalablePushQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import io.vertx.core.Context;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/confluent/ksql/engine/EngineExecutor.class */
public final class EngineExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(EngineExecutor.class);
    private static final String NO_OUTPUT_TOPIC_PREFIX = "";
    private final EngineContext engineContext;
    private final ServiceContext serviceContext;
    private final SessionConfig config;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/engine/EngineExecutor$ExecutorPlans.class */
    public static final class ExecutorPlans {
        private final OutputNode outputNode;
        private final ExecutionPlan executionPlan;

        private ExecutorPlans(OutputNode outputNode, ExecutionPlan executionPlan) {
            this.outputNode = (OutputNode) Objects.requireNonNull(outputNode, "outputNode");
            this.executionPlan = (ExecutionPlan) Objects.requireNonNull(executionPlan, "physicalPlanNode");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/engine/EngineExecutor$StubbedOutputNode.class */
    public static final class StubbedOutputNode extends KsqlStructuredDataOutputNode {
        private StubbedOutputNode(DataSource dataSource, KsqlTopic ksqlTopic, LogicalSchema logicalSchema) {
            super(new PlanNodeId("stubbedOutput"), new StubbedVerifiableDataSourceNode(new PlanNodeId("stubbedSource"), dataSource, dataSource.getName(), false), logicalSchema, Optional.empty(), ksqlTopic, OptionalInt.empty(), true, SourceName.of(ksqlTopic.getKafkaTopicName()), false);
        }
    }

    /* loaded from: input_file:io/confluent/ksql/engine/EngineExecutor$StubbedVerifiableDataSourceNode.class */
    private static final class StubbedVerifiableDataSourceNode extends DataSourceNode implements VerifiableNode {
        private StubbedVerifiableDataSourceNode(PlanNodeId planNodeId, DataSource dataSource, SourceName sourceName, boolean z) {
            super(planNodeId, dataSource, sourceName, z);
        }

        @Override // io.confluent.ksql.planner.plan.VerifiableNode
        public void validateKeyPresent(SourceName sourceName) {
        }

        @Override // io.confluent.ksql.planner.plan.DataSourceNode, io.confluent.ksql.planner.plan.PlanNode
        public LogicalSchema getSchema() {
            return LogicalSchema.builder().build();
        }
    }

    private EngineExecutor(EngineContext engineContext, ServiceContext serviceContext, SessionConfig sessionConfig) {
        this.engineContext = (EngineContext) Objects.requireNonNull(engineContext, "engineContext");
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.config = (SessionConfig) Objects.requireNonNull(sessionConfig, "config");
        KsqlEngineProps.throwOnImmutableOverride(sessionConfig.getOverrides());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static EngineExecutor create(EngineContext engineContext, ServiceContext serviceContext, SessionConfig sessionConfig) {
        return new EngineExecutor(engineContext, serviceContext, sessionConfig);
    }

    KsqlExecutionContext.ExecuteResult execute(KsqlPlan ksqlPlan) {
        return execute(ksqlPlan, false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KsqlExecutionContext.ExecuteResult execute(KsqlPlan ksqlPlan, boolean z) {
        DataSource source;
        String maskedStatement = QueryMask.getMaskedStatement(ksqlPlan.getStatementText());
        if (!ksqlPlan.getQueryPlan().isPresent()) {
            return KsqlExecutionContext.ExecuteResult.of((String) ksqlPlan.getDdlCommand().map(ddlCommand -> {
                return executeDdl(ddlCommand, maskedStatement, false, Collections.emptySet(), z);
            }).orElseThrow(() -> {
                return new IllegalStateException("DdlResult should be present if there is no physical plan.");
            }));
        }
        QueryPlan queryPlan = ksqlPlan.getQueryPlan().get();
        KsqlConstants.PersistentQueryType persistentQueryType = ksqlPlan.getPersistentQueryType().get();
        if (persistentQueryType != KsqlConstants.PersistentQueryType.CREATE_SOURCE && (source = this.engineContext.getMetaStore().getSource(queryPlan.getSink().get())) != null && source.isSource()) {
            throw new KsqlException(String.format("Cannot insert into read-only %s: %s", source.getDataSourceType().getKsqlType().toLowerCase(), source.getName().text()));
        }
        Optional<U> map = ksqlPlan.getDdlCommand().map(ddlCommand2 -> {
            return executeDdl(ddlCommand2, maskedStatement, true, queryPlan.getSources(), z);
        });
        if (map.isPresent() && ((String) map.get()).contains("already exists")) {
            return KsqlExecutionContext.ExecuteResult.of((String) map.get());
        }
        if (persistentQueryType != KsqlConstants.PersistentQueryType.CREATE_SOURCE || isSourceTableMaterializationEnabled()) {
            return KsqlExecutionContext.ExecuteResult.of(executePersistentQuery(queryPlan, maskedStatement, persistentQueryType));
        }
        QueryLogger.info(String.format("Source table query won't be materialized because '%s' is disabled.", "ksql.source.table.materialization.enabled"), ksqlPlan.getStatementText());
        return KsqlExecutionContext.ExecuteResult.of((String) map.get());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PullQueryResult executeTablePullQuery(ImmutableAnalysis immutableAnalysis, ConfiguredStatement<Query> configuredStatement, HARouting hARouting, RoutingOptions routingOptions, QueryPlannerOptions queryPlannerOptions, Optional<PullQueryExecutorMetrics> optional, boolean z, Optional<ConsistencyOffsetVector> optional2) {
        if (!configuredStatement.getStatement().isPullQuery()) {
            throw new IllegalArgumentException("Executor can only handle pull queries");
        }
        SessionConfig sessionConfig = configuredStatement.getSessionConfig();
        KsqlConstants.RoutingNodeType routingNodeType = routingOptions.getIsSkipForwardRequest() ? KsqlConstants.RoutingNodeType.REMOTE_NODE : KsqlConstants.RoutingNodeType.SOURCE_NODE;
        PullPhysicalPlan pullPhysicalPlan = null;
        try {
            LogicalPlanNode buildAndValidateLogicalPlan = buildAndValidateLogicalPlan(configuredStatement, immutableAnalysis, sessionConfig.getConfig(false), queryPlannerOptions, false);
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            pullPhysicalPlan = buildPullPhysicalPlan(buildAndValidateLogicalPlan, immutableAnalysis, queryPlannerOptions, completableFuture, optional2);
            PullQueryWriteStream pullQueryWriteStream = new PullQueryWriteStream(immutableAnalysis.getLimitClause(), new StreamedRowTranslator(pullPhysicalPlan.getOutputSchema(), optional2));
            PullQueryQueuePopulator pullQueryQueuePopulator = () -> {
                return hARouting.handlePullQuery(this.serviceContext, pullPhysicalPlan, configuredStatement, routingOptions, pullQueryWriteStream, completableFuture);
            };
            LogicalSchema outputSchema = pullPhysicalPlan.getOutputSchema();
            QueryId queryId = pullPhysicalPlan.getQueryId();
            KsqlConstants.QuerySourceType sourceType = pullPhysicalPlan.getSourceType();
            PullPhysicalPlan.PullPhysicalPlanType planType = pullPhysicalPlan.getPlanType();
            pullPhysicalPlan.getClass();
            PullQueryResult pullQueryResult = new PullQueryResult(outputSchema, pullQueryQueuePopulator, queryId, pullQueryWriteStream, optional, sourceType, planType, routingNodeType, pullPhysicalPlan::getRowsReadFromDataSource, completableFuture, optional2);
            if (z) {
                pullQueryResult.start();
            }
            return pullQueryResult;
        } catch (Exception e) {
            if (pullPhysicalPlan == null) {
                optional.ifPresent(pullQueryExecutorMetrics -> {
                    pullQueryExecutorMetrics.recordErrorRateForNoResult(1.0d);
                });
            } else {
                PullPhysicalPlan pullPhysicalPlan2 = pullPhysicalPlan;
                optional.ifPresent(pullQueryExecutorMetrics2 -> {
                    pullQueryExecutorMetrics2.recordErrorRate(1.0d, pullPhysicalPlan2.getSourceType(), pullPhysicalPlan2.getPlanType(), routingNodeType);
                });
            }
            QueryLogger.error("Failure to execute pull query", configuredStatement.getMaskedStatementText(), e);
            if (e instanceof KsqlStatementException) {
                throw new KsqlStatementException(e.getMessage() == null ? "Server Error" : e.getMessage(), e.getUnloggedMessage(), configuredStatement.getMaskedStatementText(), e);
            }
            throw new KsqlStatementException(e.getMessage() == null ? "Server Error" : e.getMessage(), configuredStatement.getMaskedStatementText(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ScalablePushQueryMetadata executeScalablePushQuery(ImmutableAnalysis immutableAnalysis, ConfiguredStatement<Query> configuredStatement, PushRouting pushRouting, PushRoutingOptions pushRoutingOptions, QueryPlannerOptions queryPlannerOptions, Context context, Optional<ScalablePushQueryMetrics> optional) {
        SessionConfig sessionConfig = configuredStatement.getSessionConfig();
        KsqlConstants.RoutingNodeType routingNodeType = pushRoutingOptions.getHasBeenForwarded() ? KsqlConstants.RoutingNodeType.REMOTE_NODE : KsqlConstants.RoutingNodeType.SOURCE_NODE;
        PushPhysicalPlan pushPhysicalPlan = null;
        try {
            LogicalPlanNode buildAndValidateLogicalPlan = buildAndValidateLogicalPlan(configuredStatement, immutableAnalysis, sessionConfig.getConfig(false), queryPlannerOptions, true);
            PushPhysicalPlanCreator pushPhysicalPlanCreator = (optional2, optional3) -> {
                return buildScalablePushPhysicalPlan(buildAndValidateLogicalPlan, immutableAnalysis, context, optional2, optional3);
            };
            Optional<U> map = pushRoutingOptions.getContinuationToken().map(PushOffsetRange::deserialize);
            PushPhysicalPlanManager pushPhysicalPlanManager = new PushPhysicalPlanManager(pushPhysicalPlanCreator, pushRoutingOptions.getCatchupConsumerGroup(), map);
            PushPhysicalPlan physicalPlan = pushPhysicalPlanManager.getPhysicalPlan();
            pushPhysicalPlan = physicalPlan;
            TransientQueryQueue transientQueryQueue = new TransientQueryQueue(immutableAnalysis.getLimitClause());
            PushQueryMetadata.ResultType resultType = physicalPlan.getScalablePushRegistry().isTable() ? physicalPlan.getScalablePushRegistry().isWindowed() ? PushQueryMetadata.ResultType.WINDOWED_TABLE : PushQueryMetadata.ResultType.TABLE : PushQueryMetadata.ResultType.STREAM;
            PushQueryQueuePopulator pushQueryQueuePopulator = () -> {
                return pushRouting.handlePushQuery(this.serviceContext, pushPhysicalPlanManager, configuredStatement, pushRoutingOptions, physicalPlan.getOutputSchema(), transientQueryQueue, optional, map);
            };
            PushQueryPreparer pushQueryPreparer = () -> {
                pushRouting.preparePushQuery(pushPhysicalPlanManager, configuredStatement, pushRoutingOptions);
            };
            LogicalSchema outputSchema = physicalPlan.getOutputSchema();
            QueryId queryId = physicalPlan.getQueryId();
            KsqlConstants.QuerySourceType sourceType = physicalPlan.getSourceType();
            physicalPlan.getClass();
            return new ScalablePushQueryMetadata(outputSchema, queryId, transientQueryQueue, optional, resultType, pushQueryQueuePopulator, pushQueryPreparer, sourceType, routingNodeType, physicalPlan::getRowsReadFromDataSource);
        } catch (Exception e) {
            if (pushPhysicalPlan == null) {
                optional.ifPresent(scalablePushQueryMetrics -> {
                    scalablePushQueryMetrics.recordErrorRateForNoResult(1.0d);
                });
            } else {
                PushPhysicalPlan pushPhysicalPlan2 = pushPhysicalPlan;
                optional.ifPresent(scalablePushQueryMetrics2 -> {
                    scalablePushQueryMetrics2.recordErrorRate(1.0d, pushPhysicalPlan2.getSourceType(), routingNodeType);
                });
            }
            QueryLogger.error("Failure to execute push query V2. " + pushRoutingOptions.debugString() + " " + queryPlannerOptions.debugString(), configuredStatement.getMaskedStatementText(), e);
            if (e instanceof KsqlStatementException) {
                throw new KsqlStatementException(e.getMessage() == null ? "Server Error" : e.getMessage(), e.getUnloggedMessage(), configuredStatement.getMaskedStatementText(), e);
            }
            throw new KsqlStatementException(e.getMessage() == null ? "Server Error" : e.getMessage(), configuredStatement.getMaskedStatementText(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TransientQueryMetadata executeTransientQuery(ConfiguredStatement<Query> configuredStatement, boolean z) {
        ExecutorPlans planQuery = planQuery(configuredStatement, configuredStatement.getStatement(), Optional.empty(), Optional.empty(), this.engineContext.getMetaStore());
        KsqlBareOutputNode ksqlBareOutputNode = (KsqlBareOutputNode) planQuery.outputNode;
        this.engineContext.createQueryValidator().validateQuery(this.config, planQuery.executionPlan, this.engineContext.getQueryRegistry().getAllLiveQueries());
        return this.engineContext.getQueryRegistry().createTransientQuery(this.config, this.serviceContext, this.engineContext.getProcessingLogContext(), this.engineContext.getMetaStore(), configuredStatement.getMaskedStatementText(), planQuery.executionPlan.getQueryId(), getSourceNames(ksqlBareOutputNode), planQuery.executionPlan.getPhysicalPlan(), buildPlanSummary(planQuery.executionPlan.getQueryId(), planQuery.executionPlan.getPhysicalPlan()), ksqlBareOutputNode.getSchema(), ksqlBareOutputNode.getLimit(), ksqlBareOutputNode.getWindowInfo(), z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TransientQueryMetadata executeStreamPullQuery(ConfiguredStatement<Query> configuredStatement, boolean z, ImmutableMap<TopicPartition, Long> immutableMap) {
        ExecutorPlans planQuery = planQuery(configuredStatement, configuredStatement.getStatement(), Optional.empty(), Optional.empty(), this.engineContext.getMetaStore());
        KsqlBareOutputNode ksqlBareOutputNode = (KsqlBareOutputNode) planQuery.outputNode;
        this.engineContext.createQueryValidator().validateQuery(this.config, planQuery.executionPlan, this.engineContext.getQueryRegistry().getAllLiveQueries());
        return this.engineContext.getQueryRegistry().createStreamPullQuery(this.config, this.serviceContext, this.engineContext.getProcessingLogContext(), this.engineContext.getMetaStore(), configuredStatement.getMaskedStatementText(), planQuery.executionPlan.getQueryId(), getSourceNames(ksqlBareOutputNode), planQuery.executionPlan.getPhysicalPlan(), buildPlanSummary(planQuery.executionPlan.getQueryId(), planQuery.executionPlan.getPhysicalPlan()), ksqlBareOutputNode.getSchema(), ksqlBareOutputNode.getLimit(), ksqlBareOutputNode.getWindowInfo(), z, immutableMap);
    }

    @SuppressFBWarnings({"NP_NULL_PARAM_DEREF_NONVIRTUAL"})
    private KsqlPlan sourceTablePlan(ConfiguredStatement<?> configuredStatement) {
        CreateTable createTable = (CreateTable) configuredStatement.getStatement();
        CreateTableCommand createDdlCommand = this.engineContext.createDdlCommand(configuredStatement.getMaskedStatementText(), (ExecutableDdlStatement) configuredStatement.getStatement(), this.config);
        AliasedRelation aliasedRelation = new AliasedRelation(new Table(createTable.getName()), createTable.getName());
        Query query = new Query(Optional.empty(), new Select((List) createTable.getElements().stream().filter(tableElement -> {
            return (tableElement.getConstraints().isKey() || tableElement.getConstraints().isPrimaryKey()) ? false : true;
        }).map(tableElement2 -> {
            return new SingleColumn(new UnqualifiedColumnReferenceExp(tableElement2.getName()), Optional.of(tableElement2.getName()));
        }).collect(Collectors.toList())), aliasedRelation, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(RefinementInfo.of(OutputRefinement.CHANGES)), false, OptionalInt.empty());
        MetaStoreImpl metaStoreImpl = new MetaStoreImpl(new InternalFunctionRegistry());
        Formats formats = createDdlCommand.getFormats();
        metaStoreImpl.putSource(new KsqlTable(configuredStatement.getMaskedStatementText(), createTable.getName(), createDdlCommand.getSchema(), Optional.empty(), false, new KsqlTopic(createDdlCommand.getTopicName(), KeyFormat.of(formats.getKeyFormat(), formats.getKeyFeatures(), Optional.empty()), ValueFormat.of(formats.getValueFormat(), formats.getValueFeatures())), true), false);
        ExecutorPlans planQuery = planQuery(configuredStatement, query, Optional.empty(), Optional.empty(), metaStoreImpl);
        KsqlBareOutputNode ksqlBareOutputNode = (KsqlBareOutputNode) planQuery.outputNode;
        QueryPlan queryPlan = new QueryPlan(getSourceNames(ksqlBareOutputNode), Optional.empty(), planQuery.executionPlan.getPhysicalPlan(), planQuery.executionPlan.getQueryId(), getApplicationId(planQuery.executionPlan.getQueryId(), getSourceTopicNames(ksqlBareOutputNode)));
        this.engineContext.createQueryValidator().validateQuery(this.config, planQuery.executionPlan, this.engineContext.getQueryRegistry().getAllLiveQueries());
        return KsqlPlan.queryPlanCurrent(configuredStatement.getMaskedStatementText(), Optional.of(createDdlCommand), queryPlan);
    }

    private boolean isSourceTableMaterializationEnabled() {
        return this.config.getConfig(false).getBoolean("ksql.source.table.materialization.enabled").booleanValue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KsqlPlan plan(ConfiguredStatement<?> configuredStatement) {
        try {
            throwOnNonExecutableStatement(configuredStatement);
            if (!(configuredStatement.getStatement() instanceof ExecutableDdlStatement)) {
                QueryContainer queryContainer = (QueryContainer) configuredStatement.getStatement();
                ExecutorPlans planQuery = planQuery(configuredStatement, queryContainer.getQuery(), Optional.of(queryContainer.getSink()), queryContainer.getQueryId(), this.engineContext.getMetaStore());
                KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode = (KsqlStructuredDataOutputNode) planQuery.outputNode;
                Optional<DdlCommand> maybeCreateSinkDdl = maybeCreateSinkDdl(configuredStatement, ksqlStructuredDataOutputNode);
                validateResultType(ksqlStructuredDataOutputNode.getNodeOutputType(), configuredStatement);
                QueryPlan queryPlan = new QueryPlan(getSourceNames(ksqlStructuredDataOutputNode), ksqlStructuredDataOutputNode.getSinkName(), planQuery.executionPlan.getPhysicalPlan(), planQuery.executionPlan.getQueryId(), getApplicationId(planQuery.executionPlan.getQueryId(), getSourceTopicNames(ksqlStructuredDataOutputNode)));
                this.engineContext.createQueryValidator().validateQuery(this.config, planQuery.executionPlan, this.engineContext.getQueryRegistry().getAllLiveQueries());
                return KsqlPlan.queryPlanCurrent(configuredStatement.getMaskedStatementText(), maybeCreateSinkDdl, queryPlan);
            }
            boolean z = (configuredStatement.getStatement() instanceof CreateStream) && ((CreateStream) configuredStatement.getStatement()).isSource();
            boolean z2 = (configuredStatement.getStatement() instanceof CreateTable) && ((CreateTable) configuredStatement.getStatement()).isSource();
            if ((z || z2) && !isSourceTableMaterializationEnabled()) {
                throw new KsqlStatementException("Cannot execute command because source table materialization is disabled.", configuredStatement.getMaskedStatementText());
            }
            if (z2) {
                return sourceTablePlan(configuredStatement);
            }
            return KsqlPlan.ddlPlanCurrent(configuredStatement.getMaskedStatementText(), this.engineContext.createDdlCommand(configuredStatement.getMaskedStatementText(), (ExecutableDdlStatement) configuredStatement.getStatement(), this.config));
        } catch (KsqlStatementException e) {
            throw e;
        } catch (Exception e2) {
            throw new KsqlStatementException(e2.getMessage(), configuredStatement.getMaskedStatementText(), e2);
        }
    }

    private Optional<String> getApplicationId(QueryId queryId, Collection<String> collection) {
        return this.config.getConfig(true).getBoolean("ksql.runtime.feature.shared.enabled").booleanValue() ? Optional.of(this.engineContext.getRuntimeAssignor().getRuntimeAndMaybeAddRuntime(queryId, collection, this.config.getConfig(true))) : Optional.empty();
    }

    private void throwIfUnsupported(Query query) {
        if (query.isPullQuery()) {
            throw new IllegalStateException();
        }
        if (query.getWhere().isPresent()) {
            throw new UnsupportedOperationException("New query planner does not support WHERE. Set ksql.new.query.planner.enabled=false.");
        }
        if (query.getGroupBy().isPresent()) {
            throw new UnsupportedOperationException("New query planner does not support GROUP BY. Set ksql.new.query.planner.enabled=false.");
        }
        if (query.getHaving().isPresent()) {
            throw new UnsupportedOperationException("New query planner does not support HAVING. Set ksql.new.query.planner.enabled=false.");
        }
        if (query.getWindow().isPresent()) {
            throw new UnsupportedOperationException("New query planner does not support WINDOWS. Set ksql.new.query.planner.enabled=false.");
        }
        if (query.getPartitionBy().isPresent()) {
            throw new UnsupportedOperationException("New query planner does not support PARTITION BY. Set ksql.new.query.planner.enabled=false.");
        }
        if (query.getLimit().isPresent()) {
            throw new UnsupportedOperationException("New query planner does not support LIMIT. Set ksql.new.query.planner.enabled=false.");
        }
        Relation from = query.getFrom();
        if (from instanceof Join) {
            throw new UnsupportedOperationException("New query planner does not support joins. Set ksql.new.query.planner.enabled=false.");
        }
        if (from instanceof JoinedSource) {
            throw new IllegalStateException();
        }
    }

    /* JADX WARN: Type inference failed for: r0v13, types: [io.confluent.ksql.parser.tree.Statement] */
    private ExecutorPlans planQuery(ConfiguredStatement<?> configuredStatement, Query query, Optional<Sink> optional, Optional<String> optional2, MetaStore metaStore) {
        QueryEngine createQueryEngine = this.engineContext.createQueryEngine(this.serviceContext);
        KsqlConfig config = this.config.getConfig(true);
        if (config.getBoolean("ksql.new.query.planner.enabled").booleanValue()) {
            throwIfUnsupported(query);
            LogicalPlan buildPlan = LogicalPlanner.buildPlan(metaStore, query);
            PhysicalPlan buildPlan2 = PhysicalPlanner.buildPlan(metaStore, buildPlan);
            Node<?> root = buildPlan2.getRoot();
            LogicalSchema.Builder builder = LogicalSchema.builder();
            buildPlan.getRoot().getOutputSchema().forEach(logicalColumn -> {
                if (root.keyColumnNames().contains(logicalColumn.name())) {
                    builder.keyColumn(logicalColumn.name(), logicalColumn.type());
                } else {
                    builder.valueColumn(logicalColumn.name(), logicalColumn.type());
                }
            });
            return new ExecutorPlans(new StubbedOutputNode(metaStore.getSource((SourceName) buildPlan.getSourceNames().stream().findFirst().get()), getSinkTopic(root.getFormats(), optional.get()), builder.build()), ExecutionPlanner.buildPlan(metaStore, buildPlan2, optional.get()));
        }
        OutputNode buildQueryLogicalPlan = QueryEngine.buildQueryLogicalPlan(query, optional, metaStore, config, configuredStatement.getMaskedStatementText());
        LogicalPlanNode logicalPlanNode = new LogicalPlanNode(Optional.of(buildQueryLogicalPlan));
        QueryId buildId = QueryIdUtil.buildId(configuredStatement.getStatement(), this.engineContext, this.engineContext.idGenerator(), buildQueryLogicalPlan, config.getBoolean("ksql.create.or.replace.enabled").booleanValue(), optional2);
        if (optional2.isPresent() && this.engineContext.getQueryRegistry().getPersistentQuery(buildId).isPresent()) {
            throw new KsqlException(String.format("Query ID '%s' already exists.", buildId));
        }
        Optional<PersistentQueryMetadata> persistentQuery = this.engineContext.getQueryRegistry().getPersistentQuery(buildId);
        return new ExecutorPlans(logicalPlanNode.getNode().get(), createQueryEngine.buildPhysicalPlan(logicalPlanNode, this.config, metaStore, buildId, persistentQuery.isPresent() ? Optional.of(persistentQuery.get().getPhysicalPlan().extractPlanInfo(new PlanInfoExtractor())) : Optional.empty()));
    }

    private static KsqlTopic getSinkTopic(Formats formats, Sink sink) {
        FormatInfo keyFormat;
        SerdeFeatures keyFeatures;
        FormatInfo valueFormat;
        SerdeFeatures valueFeatures;
        CreateSourceAsProperties properties = sink.getProperties();
        Optional keyFormat2 = properties.getKeyFormat();
        if (keyFormat2.isPresent()) {
            keyFormat = FormatInfo.of((String) keyFormat2.get());
            keyFeatures = SerdeFeatures.of(new SerdeFeature[0]);
        } else {
            keyFormat = formats.getKeyFormat();
            keyFeatures = formats.getKeyFeatures();
        }
        Optional valueFormat2 = properties.getValueFormat();
        if (valueFormat2.isPresent()) {
            valueFormat = FormatInfo.of((String) valueFormat2.get());
            valueFeatures = SerdeFeatures.of(new SerdeFeature[0]);
        } else {
            valueFormat = formats.getValueFormat();
            valueFeatures = formats.getValueFeatures();
        }
        return new KsqlTopic(sink.getName().text(), KeyFormat.nonWindowed(keyFormat, keyFeatures), ValueFormat.of(valueFormat, valueFeatures));
    }

    private LogicalPlanNode buildAndValidateLogicalPlan(ConfiguredStatement<?> configuredStatement, ImmutableAnalysis immutableAnalysis, KsqlConfig ksqlConfig, QueryPlannerOptions queryPlannerOptions, boolean z) {
        return new LogicalPlanNode(Optional.of(new io.confluent.ksql.planner.LogicalPlanner(ksqlConfig, immutableAnalysis, this.engineContext.getMetaStore()).buildQueryLogicalPlan(queryPlannerOptions, z)));
    }

    private PushPhysicalPlan buildScalablePushPhysicalPlan(LogicalPlanNode logicalPlanNode, ImmutableAnalysis immutableAnalysis, Context context, Optional<PushOffsetRange> optional, Optional<String> optional2) {
        return new PushPhysicalPlanBuilder(this.engineContext.getProcessingLogContext(), ScalablePushQueryExecutionUtil.findQuery(this.engineContext, immutableAnalysis)).buildPushPhysicalPlan(logicalPlanNode, context, optional, optional2);
    }

    private PullPhysicalPlan buildPullPhysicalPlan(LogicalPlanNode logicalPlanNode, ImmutableAnalysis immutableAnalysis, QueryPlannerOptions queryPlannerOptions, CompletableFuture<Void> completableFuture, Optional<ConsistencyOffsetVector> optional) {
        return new PullPhysicalPlanBuilder(this.engineContext.getProcessingLogContext(), PullQueryExecutionUtil.findMaterializingQuery(this.engineContext, immutableAnalysis), immutableAnalysis, queryPlannerOptions, completableFuture, optional).buildPullPhysicalPlan(logicalPlanNode);
    }

    private Optional<DdlCommand> maybeCreateSinkDdl(ConfiguredStatement<?> configuredStatement, KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode) {
        if (!ksqlStructuredDataOutputNode.createInto()) {
            validateExistingSink(ksqlStructuredDataOutputNode);
            return Optional.empty();
        }
        Object statement = configuredStatement.getStatement();
        SourceName sourceName = ksqlStructuredDataOutputNode.getSinkName().get();
        boolean z = (statement instanceof CreateAsSelect) && ((CreateAsSelect) statement).isOrReplace();
        boolean z2 = (statement instanceof CreateAsSelect) && ((CreateAsSelect) statement).isNotExists();
        DataSource source = this.engineContext.getMetaStore().getSource(sourceName);
        if (source == null || z2 || z) {
            return Optional.of(this.engineContext.createDdlCommand(ksqlStructuredDataOutputNode, ((QueryContainer) statement).getQuery().getRefinement()));
        }
        throw new KsqlException(String.format("Cannot add %s '%s': A %s with the same name already exists", ksqlStructuredDataOutputNode.getNodeOutputType().getKsqlType().toLowerCase(), sourceName.text(), source.getDataSourceType().getKsqlType().toLowerCase()));
    }

    private void validateExistingSink(KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode) {
        SourceName sourceName = ksqlStructuredDataOutputNode.getSinkName().get();
        DataSource source = this.engineContext.getMetaStore().getSource(sourceName);
        if (source == null) {
            throw new KsqlException(String.format("%s does not exist.", ksqlStructuredDataOutputNode));
        }
        if (source.getDataSourceType() != ksqlStructuredDataOutputNode.getNodeOutputType()) {
            throw new KsqlException(String.format("Incompatible data sink and query result. Data sink (%s) type is %s but select query result is %s.", sourceName.text(), source.getDataSourceType(), ksqlStructuredDataOutputNode.getNodeOutputType()));
        }
        LogicalSchema schema = ksqlStructuredDataOutputNode.getSchema();
        LogicalSchema schema2 = source.getSchema();
        if (!schema.compatibleSchema(schema2)) {
            throw new KsqlException("Incompatible schema between results and sink." + System.lineSeparator() + "Result schema is " + schema + System.lineSeparator() + "Sink schema is " + schema2);
        }
    }

    private static void validateResultType(DataSource.DataSourceType dataSourceType, ConfiguredStatement<?> configuredStatement) {
        if ((configuredStatement.getStatement() instanceof CreateStreamAsSelect) && dataSourceType == DataSource.DataSourceType.KTABLE) {
            throw new KsqlStatementException("Invalid result type. Your SELECT query produces a TABLE. Please use CREATE TABLE AS SELECT statement instead.", configuredStatement.getMaskedStatementText());
        }
        if ((configuredStatement.getStatement() instanceof CreateTableAsSelect) && dataSourceType == DataSource.DataSourceType.KSTREAM) {
            throw new KsqlStatementException("Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.", configuredStatement.getMaskedStatementText());
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [io.confluent.ksql.parser.tree.Statement] */
    private static void throwOnNonExecutableStatement(ConfiguredStatement<?> configuredStatement) {
        if (!KsqlEngine.isExecutableStatement(configuredStatement.getStatement())) {
            throw new KsqlStatementException("Statement not executable", configuredStatement.getMaskedStatementText());
        }
    }

    private static Set<SourceName> getSourceNames(PlanNode planNode) {
        return (Set) planNode.getSourceNodes().map((v0) -> {
            return v0.getDataSource();
        }).map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toSet());
    }

    private static Set<String> getSourceTopicNames(PlanNode planNode) {
        return (Set) planNode.getSourceNodes().map((v0) -> {
            return v0.getDataSource();
        }).map((v0) -> {
            return v0.getKsqlTopic();
        }).map((v0) -> {
            return v0.getKafkaTopicName();
        }).collect(Collectors.toSet());
    }

    private String executeDdl(DdlCommand ddlCommand, String str, boolean z, Set<SourceName> set, boolean z2) {
        try {
            return this.engineContext.executeDdl(str, ddlCommand, z, set, z2);
        } catch (KsqlStatementException e) {
            throw e;
        } catch (Exception e2) {
            throw new KsqlStatementException(e2.getMessage(), str, e2);
        }
    }

    private Set<DataSource> getSources(QueryPlan queryPlan) {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        for (SourceName sourceName : queryPlan.getSources()) {
            DataSource source = this.engineContext.getMetaStore().getSource(sourceName);
            if (source == null) {
                throw new KsqlException("Unknown source: " + sourceName.toString(FormatOptions.noEscape()));
            }
            builder.add(source);
        }
        return builder.build();
    }

    private PersistentQueryMetadata executePersistentQuery(QueryPlan queryPlan, String str, KsqlConstants.PersistentQueryType persistentQueryType) {
        return this.engineContext.getQueryRegistry().createOrReplacePersistentQuery(this.config, this.serviceContext, this.engineContext.getProcessingLogContext(), this.engineContext.getMetaStore(), str, queryPlan.getQueryId(), queryPlan.getSink().map(sourceName -> {
            return this.engineContext.getMetaStore().getSource(sourceName);
        }), getSources(queryPlan), queryPlan.getPhysicalPlan(), buildPlanSummary(queryPlan.getQueryId(), queryPlan.getPhysicalPlan()), persistentQueryType, queryPlan.getRuntimeId());
    }

    private String buildPlanSummary(QueryId queryId, ExecutionStep<?> executionStep) {
        return new PlanSummary(queryId, this.config.getConfig(true), this.engineContext.getMetaStore()).summarize(executionStep);
    }
}
