/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network;

import java.io.IOException;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.net.ssl.SSLEngine;
import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.network.AsyncAuthExecutor;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelMetadataRegistry;
import org.apache.kafka.common.network.ConnectionMode;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.ListenerReconfigurable;
import org.apache.kafka.common.network.PlaintextTransportLayer;
import org.apache.kafka.common.network.ProxyProtocolEngine;
import org.apache.kafka.common.network.ProxyProtocolEngineFactory;
import org.apache.kafka.common.network.RequestCallback;
import org.apache.kafka.common.network.SslTransportLayer;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.Login;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.authenticator.DefaultLogin;
import org.apache.kafka.common.security.authenticator.LoginManager;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler;
import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
import org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler;
import org.apache.kafka.common.security.kerberos.KerberosClientCallbackHandler;
import org.apache.kafka.common.security.kerberos.KerberosLogin;
import org.apache.kafka.common.security.kerberos.KerberosName;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler;
import org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.security.scram.internals.ScramServerCallbackHandler;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.UniqueIdGenerator;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.interceptor.BrokerInterceptor;
import org.ietf.jgss.GSSCredential;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
import org.ietf.jgss.GSSName;
import org.ietf.jgss.Oid;
import org.slf4j.Logger;

public class SaslChannelBuilder
implements ChannelBuilder,
ListenerReconfigurable {
    static final String GSS_NATIVE_PROP = "sun.security.jgss.native";
    private final SecurityProtocol securityProtocol;
    private final ListenerName listenerName;
    private final boolean isInterBrokerListener;
    private final String clientSaslMechanism;
    private final ConnectionMode connectionMode;
    private final Map<String, JaasContext> jaasContexts;
    private final CredentialCache credentialCache;
    private final DelegationTokenCache tokenCache;
    private final Map<String, LoginManager> loginManagers;
    private final Map<String, Subject> subjects;
    private final Function<Short, ApiVersionsResponse> apiVersionSupplier;
    private final RequestCallback requestCallback;
    private SslFactory sslFactory;
    private Map<String, ?> configs;
    private final String sslClientAuthOverride;
    private KerberosShortNamer kerberosShortNamer;
    private final Map<String, AuthenticateCallbackHandler> saslCallbackHandlers;
    private final Map<String, Long> connectionsMaxReauthMsByMechanism;
    private final Map<String, Boolean> asyncAuthEnabledByMechanism;
    private final Map<String, Long> asyncAuthTimeoutByMechanism;
    private final Time time;
    private final LogContext logContext;
    private final Logger log;
    private boolean shouldParseSni;
    private boolean shouldParseLkcId;
    private String subdomainPrefix;
    private final ProxyProtocolEngineFactory proxyProtocolEngineFactory;
    private final Map<String, Integer> saslServerMaxReceiveSizeByMechanism;
    private final UniqueIdGenerator uniqueIdGenerator;
    private boolean proxyModeLocalDefault;
    private boolean mTlsEnabled = false;
    private List<String> validSniHostnames;
    private String validSniHostNamesExcludeSuffix;
    private boolean rejectInvalidSniHostnames;

    public SaslChannelBuilder(ConnectionMode connectionMode, Map<String, JaasContext> jaasContexts, SecurityProtocol securityProtocol, ListenerName listenerName, boolean isInterBrokerListener, String clientSaslMechanism, CredentialCache credentialCache, DelegationTokenCache tokenCache, String sslClientAuthOverride, Time time, LogContext logContext, Function<Short, ApiVersionsResponse> apiVersionSupplier, RequestCallback requestCallback, ProxyProtocolEngineFactory proxyProtocolEngineFactory) {
        this.connectionMode = connectionMode;
        this.jaasContexts = jaasContexts;
        this.loginManagers = new HashMap<String, LoginManager>(jaasContexts.size());
        this.subjects = new HashMap<String, Subject>(jaasContexts.size());
        this.securityProtocol = securityProtocol;
        this.listenerName = listenerName;
        this.isInterBrokerListener = isInterBrokerListener;
        this.clientSaslMechanism = clientSaslMechanism;
        this.credentialCache = credentialCache;
        this.tokenCache = tokenCache;
        this.sslClientAuthOverride = sslClientAuthOverride;
        this.saslCallbackHandlers = new HashMap<String, AuthenticateCallbackHandler>();
        this.connectionsMaxReauthMsByMechanism = new HashMap<String, Long>();
        this.asyncAuthEnabledByMechanism = new HashMap<String, Boolean>();
        this.asyncAuthTimeoutByMechanism = new HashMap<String, Long>();
        this.time = time;
        this.logContext = logContext;
        this.log = logContext.logger(this.getClass());
        this.apiVersionSupplier = apiVersionSupplier;
        this.requestCallback = requestCallback;
        this.proxyProtocolEngineFactory = proxyProtocolEngineFactory;
        if (connectionMode == ConnectionMode.SERVER && apiVersionSupplier == null) {
            throw new IllegalArgumentException("Server channel builder must provide an ApiVersionResponse supplier");
        }
        this.saslServerMaxReceiveSizeByMechanism = new HashMap<String, Integer>();
        this.uniqueIdGenerator = new UniqueIdGenerator();
        this.validSniHostnames = Collections.emptyList();
        this.rejectInvalidSniHostnames = false;
    }

    @Override
    public void configure(Map<String, ?> configs) throws KafkaException {
        try {
            this.configs = configs;
            if (this.connectionMode == ConnectionMode.SERVER) {
                this.createServerCallbackHandlers(configs);
                this.createConnectionsMaxReauthMsMap(configs);
                this.createAsyncAuthEnabledMap(configs);
                this.createAsyncAuthTimeoutMap(configs);
                this.createSaslMaxReceiveSize(configs);
            } else {
                this.createClientCallbackHandler(configs);
            }
            for (Map.Entry<String, AuthenticateCallbackHandler> entry : this.saslCallbackHandlers.entrySet()) {
                String string = entry.getKey();
                entry.getValue().configure(configs, string, this.jaasContexts.get(string).configurationEntries());
            }
            Class<? extends Login> defaultLoginClass = this.defaultLoginClass();
            if (this.connectionMode == ConnectionMode.SERVER && this.jaasContexts.containsKey("GSSAPI")) {
                this.kerberosShortNamer = SaslChannelBuilder.createKerberosShortNamerFromConfigs(configs);
            }
            for (Map.Entry<String, JaasContext> entry : this.jaasContexts.entrySet()) {
                String mechanism = entry.getKey();
                LoginManager loginManager = LoginManager.acquireLoginManager(entry.getValue(), mechanism, defaultLoginClass, configs);
                this.loginManagers.put(mechanism, loginManager);
                Subject subject = loginManager.subject();
                this.subjects.put(mechanism, subject);
                if (this.connectionMode != ConnectionMode.SERVER || !mechanism.equals("GSSAPI")) continue;
                this.maybeAddNativeGssapiCredentials(subject);
            }
            if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
                this.sslFactory = new SslFactory(this.connectionMode, this.sslClientAuthOverride, this.isInterBrokerListener);
                this.sslFactory.configure(configs);
            }
            Map<String, ?> map = configs;
            this.shouldParseSni = map.getOrDefault("confluent.multitenant.parse.sni.host.name.enable", false);
            this.shouldParseLkcId = map.getOrDefault("confluent.multitenant.parse.lkc.id.enable", false);
            this.subdomainPrefix = map.getOrDefault("confluent.subdomain.prefix", ConfluentConfigs.SUBDOMAIN_PREFIX_DEFAULT);
            this.proxyModeLocalDefault = map.getOrDefault("confluent.proxy.mode.local.default", false);
            this.mTlsEnabled = ConfluentConfigs.getMTlsEnable(configs) && ConfluentConfigs.getMTlsListenerName(configs).equalsIgnoreCase(this.listenerName.value());
            String string = map.getOrDefault("confluent.valid.sni.hostnames", "");
            if (!string.isEmpty()) {
                this.validSniHostnames = Arrays.asList(string.split(","));
            }
            this.validSniHostNamesExcludeSuffix = map.getOrDefault("confluent.valid.sni.hostnames.exclude.suffix", "");
            if (!string.isEmpty()) {
                this.rejectInvalidSniHostnames = map.getOrDefault("confluent.reject.invalid.sni.hostnames", false);
            }
        }
        catch (Throwable e) {
            this.close();
            throw new KafkaException(e);
        }
    }

    public static KerberosShortNamer createKerberosShortNamerFromConfigs(Map<String, ?> configs) {
        String defaultRealm;
        try {
            defaultRealm = SaslChannelBuilder.defaultKerberosRealm();
        }
        catch (Exception ke) {
            defaultRealm = "";
        }
        List principalToLocalRules = (List)configs.get("sasl.kerberos.principal.to.local.rules");
        if (principalToLocalRules != null) {
            return KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);
        }
        return null;
    }

    @Override
    public Set<String> reconfigurableConfigs() {
        return this.securityProtocol == SecurityProtocol.SASL_SSL ? SslConfigs.RECONFIGURABLE_CONFIGS : Collections.emptySet();
    }

    @Override
    public void validateReconfiguration(Map<String, ?> configs) throws ConfigException {
        if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
            try {
                this.sslFactory.validateReconfiguration(configs);
            }
            catch (IllegalStateException e) {
                throw new ConfigException("SASL reconfiguration failed due to " + String.valueOf(e));
            }
        }
    }

    @Override
    public void reconfigure(Map<String, ?> configs) {
        if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
            this.sslFactory.reconfigure(configs);
        }
    }

    @Override
    public ListenerName listenerName() {
        return this.listenerName;
    }

    @Override
    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, AsyncAuthExecutor asyncAuthExecutor, MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry, Time time, LogContext logContext) throws KafkaException {
        TransportLayer transportLayer = null;
        try {
            Supplier<Authenticator> authenticatorCreator;
            SocketChannel socketChannel = (SocketChannel)key.channel();
            Socket socket = socketChannel.socket();
            TransportLayer finalTransportLayer = transportLayer = this.buildTransportLayer(id, key, socketChannel, metadataRegistry, this.proxyProtocolEngineFactory.createProxyProtocolEngine(metadataRegistry));
            if (this.connectionMode == ConnectionMode.SERVER) {
                authenticatorCreator = () -> this.buildServerAuthenticator(this.configs, Collections.unmodifiableMap(this.saslCallbackHandlers), id, this.uniqueIdGenerator.generate(Time.SYSTEM.milliseconds()), finalTransportLayer, Collections.unmodifiableMap(this.subjects), Collections.unmodifiableMap(this.connectionsMaxReauthMsByMechanism), Collections.unmodifiableMap(this.asyncAuthEnabledByMechanism), Collections.unmodifiableMap(this.asyncAuthTimeoutByMechanism), Collections.unmodifiableMap(this.saslServerMaxReceiveSizeByMechanism), asyncAuthExecutor, metadataRegistry);
            } else {
                LoginManager loginManager = this.loginManagers.get(this.clientSaslMechanism);
                authenticatorCreator = () -> this.buildClientAuthenticator(this.configs, this.saslCallbackHandlers.get(this.clientSaslMechanism), id, socket.getInetAddress().getHostName(), loginManager.serviceName(), finalTransportLayer, this.subjects.get(this.clientSaslMechanism));
            }
            BrokerInterceptor interceptor = ConfluentConfigs.buildBrokerInterceptor(this.connectionMode, this.configs);
            return new KafkaChannel(id, transportLayer, authenticatorCreator, maxReceiveSize, asyncAuthExecutor, memoryPool != null ? memoryPool : MemoryPool.NONE, metadataRegistry, interceptor, time, this.shouldParseSni, this.shouldParseLkcId, logContext, this.subdomainPrefix, this.validSniHostnames, this.validSniHostNamesExcludeSuffix, this.rejectInvalidSniHostnames);
        }
        catch (Exception e) {
            Utils.closeQuietly(transportLayer, "transport layer for channel Id: " + id);
            throw new KafkaException(e);
        }
    }

    @Override
    public Set<String> securityMechanisms() {
        if (this.mTlsEnabled) {
            HashSet<String> mechanisms = new HashSet<String>(this.subjects.keySet());
            mechanisms.add("MTLS");
            return mechanisms;
        }
        return this.subjects.keySet();
    }

    @Override
    public void close() {
        for (LoginManager loginManager : this.loginManagers.values()) {
            loginManager.release();
        }
        this.loginManagers.clear();
        for (AuthenticateCallbackHandler handler : this.saslCallbackHandlers.values()) {
            handler.close();
        }
        if (this.sslFactory != null) {
            this.sslFactory.close();
        }
    }

    protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel, ChannelMetadataRegistry metadataRegistry, ProxyProtocolEngine proxyProtocolEngine) throws IOException {
        if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
            SSLEngine sslEngine = this.sslFactory.createSslEngine(socketChannel.socket());
            return SslTransportLayer.create(id, key, sslEngine, metadataRegistry, this.sslFactory.createCloseableSslEngine(sslEngine), this.connectionMode, this.shouldParseSni || !this.validSniHostnames.isEmpty(), this.shouldParseLkcId, proxyProtocolEngine, this.proxyModeLocalDefault);
        }
        return new PlaintextTransportLayer(key, proxyProtocolEngine, this.proxyModeLocalDefault);
    }

    protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs, Map<String, AuthenticateCallbackHandler> callbackHandlers, String id, long sessionId, TransportLayer transportLayer, Map<String, Subject> subjects, Map<String, Long> connectionsMaxReauthMsByMechanism, Map<String, Boolean> asyncAuthEnabledByMechanism, Map<String, Long> asyncAuthTimeoutByMechanism, Map<String, Integer> saslServerMaxReceiveSizeByMechanism, AsyncAuthExecutor asyncAuthExecutor, ChannelMetadataRegistry metadataRegistry) {
        return new SaslServerAuthenticator(configs, callbackHandlers, id, sessionId, subjects, this.kerberosShortNamer, this.listenerName, this.isInterBrokerListener, this.securityProtocol, transportLayer, connectionsMaxReauthMsByMechanism, asyncAuthEnabledByMechanism, asyncAuthTimeoutByMechanism, saslServerMaxReceiveSizeByMechanism, asyncAuthExecutor, metadataRegistry, this.time, this.apiVersionSupplier, this.requestCallback);
    }

    protected SaslClientAuthenticator buildClientAuthenticator(Map<String, ?> configs, AuthenticateCallbackHandler callbackHandler, String id, String serverHost, String servicePrincipal, TransportLayer transportLayer, Subject subject) {
        return new SaslClientAuthenticator(configs, callbackHandler, id, subject, servicePrincipal, serverHost, this.clientSaslMechanism, transportLayer, this.time, this.logContext, this.requestCallback);
    }

    Map<String, LoginManager> loginManagers() {
        return this.loginManagers;
    }

    private static String defaultKerberosRealm() {
        return new KerberosPrincipal("tmp", 1).getRealm();
    }

    private void createClientCallbackHandler(Map<String, ?> configs) {
        Class<? extends AuthenticateCallbackHandler> clazz = (Class<? extends AuthenticateCallbackHandler>)configs.get("sasl.client.callback.handler.class");
        if (clazz == null) {
            clazz = this.clientCallbackHandlerClass();
        }
        AuthenticateCallbackHandler callbackHandler = (AuthenticateCallbackHandler)Utils.newInstance(clazz);
        this.saslCallbackHandlers.put(this.clientSaslMechanism, callbackHandler);
    }

    private void createServerCallbackHandlers(Map<String, ?> configs) {
        for (String mechanism : this.jaasContexts.keySet()) {
            String prefix = ListenerName.saslMechanismPrefix(mechanism);
            Class clazz = (Class)configs.get(prefix + "sasl.server.callback.handler.class");
            AuthenticateCallbackHandler callbackHandler = clazz != null ? (AuthenticateCallbackHandler)Utils.newInstance(clazz) : (mechanism.equals("PLAIN") ? new PlainServerCallbackHandler() : (ScramMechanism.isScram(mechanism) ? new ScramServerCallbackHandler(this.credentialCache.cache(mechanism, ScramCredential.class), this.tokenCache) : (mechanism.equals("OAUTHBEARER") ? new OAuthBearerUnsecuredValidatorCallbackHandler() : new SaslServerCallbackHandler())));
            this.saslCallbackHandlers.put(mechanism, callbackHandler);
        }
    }

    private void createConnectionsMaxReauthMsMap(Map<String, ?> configs) {
        for (String mechanism : this.jaasContexts.keySet()) {
            String prefix = ListenerName.saslMechanismPrefix(mechanism);
            Long connectionsMaxReauthMs = (Long)configs.get(prefix + "connections.max.reauth.ms");
            if (connectionsMaxReauthMs == null) {
                connectionsMaxReauthMs = (Long)configs.get("connections.max.reauth.ms");
            }
            if (connectionsMaxReauthMs == null) continue;
            this.connectionsMaxReauthMsByMechanism.put(mechanism, connectionsMaxReauthMs);
        }
    }

    private void createAsyncAuthEnabledMap(Map<String, ?> configs) {
        for (String mechanism : this.jaasContexts.keySet()) {
            String prefix = ListenerName.saslMechanismPrefix(mechanism);
            Boolean asyncAuthEnable = (Boolean)configs.get(prefix + "sasl.server.authn.async.enable");
            if (asyncAuthEnable == null) {
                asyncAuthEnable = (Boolean)configs.get("sasl.server.authn.async.enable");
            }
            if (asyncAuthEnable == null) continue;
            this.asyncAuthEnabledByMechanism.put(mechanism, asyncAuthEnable);
        }
    }

    private void createAsyncAuthTimeoutMap(Map<String, ?> configs) {
        for (String mechanism : this.jaasContexts.keySet()) {
            String prefix = ListenerName.saslMechanismPrefix(mechanism);
            Long asyncAuthTimeout = (Long)configs.get(prefix + "sasl.server.authn.async.timeout.ms");
            if (asyncAuthTimeout == null) {
                asyncAuthTimeout = (Long)configs.get("sasl.server.authn.async.timeout.ms");
            }
            if (asyncAuthTimeout == null) continue;
            this.asyncAuthTimeoutByMechanism.put(mechanism, asyncAuthTimeout);
        }
    }

    private void createSaslMaxReceiveSize(Map<String, ?> configs) {
        for (String mechanism : this.jaasContexts.keySet()) {
            String prefix = ListenerName.saslMechanismPrefix(mechanism);
            Integer saslMaxReceiveSize = (Integer)configs.get(prefix + "sasl.server.max.receive.size");
            if (saslMaxReceiveSize == null) {
                saslMaxReceiveSize = (Integer)configs.get("sasl.server.max.receive.size");
            }
            if (saslMaxReceiveSize == null) {
                saslMaxReceiveSize = 524288;
            }
            this.saslServerMaxReceiveSizeByMechanism.put(mechanism, saslMaxReceiveSize);
        }
    }

    protected Class<? extends Login> defaultLoginClass() {
        if (this.jaasContexts.containsKey("GSSAPI")) {
            return KerberosLogin.class;
        }
        if ("OAUTHBEARER".equals(this.clientSaslMechanism)) {
            return OAuthBearerRefreshingLogin.class;
        }
        return DefaultLogin.class;
    }

    private Class<? extends AuthenticateCallbackHandler> clientCallbackHandlerClass() {
        switch (this.clientSaslMechanism) {
            case "GSSAPI": {
                return KerberosClientCallbackHandler.class;
            }
            case "OAUTHBEARER": {
                return OAuthBearerSaslClientCallbackHandler.class;
            }
        }
        return SaslClientCallbackHandler.class;
    }

    private void maybeAddNativeGssapiCredentials(Subject subject) {
        boolean usingNativeJgss = Boolean.getBoolean(GSS_NATIVE_PROP);
        if (usingNativeJgss && subject.getPrivateCredentials(GSSCredential.class).isEmpty()) {
            KerberosName kerberosName;
            String servicePrincipal = SaslClientAuthenticator.firstPrincipal(subject);
            try {
                kerberosName = KerberosName.parse(servicePrincipal);
            }
            catch (IllegalArgumentException e) {
                throw new KafkaException("Principal has name with unexpected format " + servicePrincipal);
            }
            String servicePrincipalName = kerberosName.serviceName();
            String serviceHostname = kerberosName.hostName();
            try {
                GSSManager manager = this.gssManager();
                Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
                GSSName gssName = manager.createName(servicePrincipalName + "@" + serviceHostname, GSSName.NT_HOSTBASED_SERVICE);
                GSSCredential cred = manager.createCredential(gssName, Integer.MAX_VALUE, krb5Mechanism, 2);
                subject.getPrivateCredentials().add(cred);
                this.log.info("Configured native GSSAPI private credentials for {}@{}", (Object)serviceHostname, (Object)serviceHostname);
            }
            catch (GSSException ex) {
                this.log.warn("Cannot add private credential to subject; clients authentication may fail", (Throwable)ex);
            }
        }
    }

    protected GSSManager gssManager() {
        return GSSManager.getInstance();
    }

    protected Subject subject(String saslMechanism) {
        return this.subjects.get(saslMechanism);
    }
}

