package io.confluent.ksql.security;

import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.topic.SourceTopicsExtractor;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.acl.AclOperation;

/* loaded from: input_file:io/confluent/ksql/security/KsqlAuthorizationValidatorImpl.class */
public class KsqlAuthorizationValidatorImpl implements KsqlAuthorizationValidator {
    private final KsqlAccessValidator accessValidator;

    public KsqlAuthorizationValidatorImpl(KsqlAccessValidator ksqlAccessValidator) {
        this.accessValidator = ksqlAccessValidator;
    }

    KsqlAccessValidator getAccessValidator() {
        return this.accessValidator;
    }

    @Override // io.confluent.ksql.security.KsqlAuthorizationValidator
    public void checkAuthorization(KsqlSecurityContext ksqlSecurityContext, MetaStore metaStore, Statement statement) {
        if (statement instanceof Query) {
            validateQuery(ksqlSecurityContext, metaStore, (Query) statement);
            return;
        }
        if (statement instanceof InsertInto) {
            validateInsertInto(ksqlSecurityContext, metaStore, (InsertInto) statement);
            return;
        }
        if (statement instanceof CreateAsSelect) {
            validateCreateAsSelect(ksqlSecurityContext, metaStore, (CreateAsSelect) statement);
        } else if (statement instanceof PrintTopic) {
            validatePrintTopic(ksqlSecurityContext, (PrintTopic) statement);
        } else if (statement instanceof CreateSource) {
            validateCreateSource(ksqlSecurityContext, (CreateSource) statement);
        }
    }

    private void validateQuery(KsqlSecurityContext ksqlSecurityContext, MetaStore metaStore, Query query) {
        for (KsqlTopic ksqlTopic : extractQueryTopics(query, metaStore)) {
            checkTopicAccess(ksqlSecurityContext, ksqlTopic.getKafkaTopicName(), AclOperation.READ);
            checkSchemaAccess(ksqlSecurityContext, ksqlTopic, AclOperation.READ);
        }
    }

    private void validateCreateAsSelect(KsqlSecurityContext ksqlSecurityContext, MetaStore metaStore, CreateAsSelect createAsSelect) {
        validateQuery(ksqlSecurityContext, metaStore, createAsSelect.getQuery());
        KsqlTopic createAsSelectSinkTopic = getCreateAsSelectSinkTopic(metaStore, createAsSelect);
        checkTopicAccess(ksqlSecurityContext, createAsSelectSinkTopic.getKafkaTopicName(), AclOperation.WRITE);
        checkSchemaAccess(ksqlSecurityContext, createAsSelectSinkTopic, AclOperation.WRITE);
    }

    private void validateInsertInto(KsqlSecurityContext ksqlSecurityContext, MetaStore metaStore, InsertInto insertInto) {
        validateQuery(ksqlSecurityContext, metaStore, insertInto.getQuery());
        checkTopicAccess(ksqlSecurityContext, getSourceTopicName(metaStore, insertInto.getTarget()), AclOperation.WRITE);
    }

    private void validatePrintTopic(KsqlSecurityContext ksqlSecurityContext, PrintTopic printTopic) {
        checkTopicAccess(ksqlSecurityContext, printTopic.getTopic(), AclOperation.READ);
    }

    private void validateCreateSource(KsqlSecurityContext ksqlSecurityContext, CreateSource createSource) {
        checkTopicAccess(ksqlSecurityContext, createSource.getProperties().getKafkaTopic(), AclOperation.READ);
    }

    private String getSourceTopicName(MetaStore metaStore, SourceName sourceName) {
        DataSource source = metaStore.getSource(sourceName);
        if (source == null) {
            throw new KsqlException("Cannot validate for topic access from an unknown stream/table: " + sourceName);
        }
        return source.getKafkaTopicName();
    }

    private KsqlTopic getCreateAsSelectSinkTopic(MetaStore metaStore, CreateAsSelect createAsSelect) {
        String str;
        KeyFormat keyFormat;
        ValueFormat valueFormat;
        CreateSourceAsProperties properties = createAsSelect.getProperties();
        if (properties.getKafkaTopic().isPresent()) {
            str = (String) properties.getKafkaTopic().get();
            SourceTopicsExtractor sourceTopicsExtractor = new SourceTopicsExtractor(metaStore);
            sourceTopicsExtractor.process(createAsSelect.getQuery(), null);
            KsqlTopic primarySourceTopic = sourceTopicsExtractor.getPrimarySourceTopic();
            Optional map = properties.getKeyFormat().map(str2 -> {
                return FormatFactory.fromName(str2);
            });
            Optional map2 = properties.getValueFormat().map(str3 -> {
                return FormatFactory.fromName(str3);
            });
            keyFormat = (KeyFormat) map.map(format -> {
                return KeyFormat.of(FormatInfo.of(format.name()), format.supportsFeature(SerdeFeature.SCHEMA_INFERENCE) ? SerdeFeatures.of(new SerdeFeature[]{SerdeFeature.SCHEMA_INFERENCE}) : SerdeFeatures.of(new SerdeFeature[0]), Optional.empty());
            }).orElse(primarySourceTopic.getKeyFormat());
            valueFormat = (ValueFormat) map2.map(format2 -> {
                return ValueFormat.of(FormatInfo.of(format2.name()), format2.supportsFeature(SerdeFeature.SCHEMA_INFERENCE) ? SerdeFeatures.of(new SerdeFeature[]{SerdeFeature.SCHEMA_INFERENCE}) : SerdeFeatures.of(new SerdeFeature[0]));
            }).orElse(primarySourceTopic.getValueFormat());
        } else {
            DataSource source = metaStore.getSource(createAsSelect.getName());
            if (source == null) {
                throw new KsqlException("Cannot validate for topic access from an unknown stream/table: " + createAsSelect.getName());
            }
            str = source.getKafkaTopicName();
            keyFormat = source.getKsqlTopic().getKeyFormat();
            valueFormat = source.getKsqlTopic().getValueFormat();
        }
        return new KsqlTopic(str, keyFormat, valueFormat);
    }

    private Set<KsqlTopic> extractQueryTopics(Query query, MetaStore metaStore) {
        SourceTopicsExtractor sourceTopicsExtractor = new SourceTopicsExtractor(metaStore);
        sourceTopicsExtractor.process(query, null);
        return sourceTopicsExtractor.getSourceTopics();
    }

    private void checkTopicAccess(KsqlSecurityContext ksqlSecurityContext, String str, AclOperation aclOperation) {
        this.accessValidator.checkTopicAccess(ksqlSecurityContext, str, aclOperation);
    }

    private void checkSchemaAccess(KsqlSecurityContext ksqlSecurityContext, KsqlTopic ksqlTopic, AclOperation aclOperation) {
        if (formatSupportsSchemaInference(ksqlTopic.getKeyFormat().getFormatInfo())) {
            this.accessValidator.checkSubjectAccess(ksqlSecurityContext, KsqlConstants.getSRSubject(ksqlTopic.getKafkaTopicName(), true), aclOperation);
        }
        if (formatSupportsSchemaInference(ksqlTopic.getValueFormat().getFormatInfo())) {
            this.accessValidator.checkSubjectAccess(ksqlSecurityContext, KsqlConstants.getSRSubject(ksqlTopic.getKafkaTopicName(), false), aclOperation);
        }
    }

    private static boolean formatSupportsSchemaInference(FormatInfo formatInfo) {
        return FormatFactory.of(formatInfo).supportsFeature(SerdeFeature.SCHEMA_INFERENCE);
    }
}
