package org.apache.spark.kafka010;

import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
import org.apache.kafka.common.security.token.delegation.TokenInformation;
import org.apache.spark.SparkConf;
import org.apache.spark.deploy.SparkHadoopUtil$;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.config.package$;
import org.apache.spark.kafka010.KafkaTokenUtil;
import org.apache.spark.util.SecurityUtils$;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.JavaConverters$;
import scala.collection.StringOps$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.util.control.NonFatal$;

/* compiled from: KafkaTokenUtil.scala */
/* loaded from: input_file:org/apache/spark/kafka010/KafkaTokenUtil$.class */
public final class KafkaTokenUtil$ implements Logging {
    public static final KafkaTokenUtil$ MODULE$ = new KafkaTokenUtil$();
    private static final Text TOKEN_KIND;
    private static final String TOKEN_SERVICE_PREFIX;
    private static transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        Logging.$init$(MODULE$);
        TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN");
        TOKEN_SERVICE_PREFIX = "kafka.server.delegation.token";
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        org$apache$spark$internal$Logging$$log_ = logger;
    }

    public Text TOKEN_KIND() {
        return TOKEN_KIND;
    }

    private String TOKEN_SERVICE_PREFIX() {
        return TOKEN_SERVICE_PREFIX;
    }

    public Text getTokenService(String str) {
        return new Text(new StringBuilder(1).append(TOKEN_SERVICE_PREFIX()).append(".").append(str).toString());
    }

    private String getClusterIdentifier(Text text) {
        return text.toString().replace(new StringBuilder(1).append(TOKEN_SERVICE_PREFIX()).append(".").toString(), "");
    }

    public Tuple2<Token<KafkaTokenUtil.KafkaDelegationTokenIdentifier>, Object> obtainToken(SparkConf sparkConf, KafkaTokenClusterConf kafkaTokenClusterConf) {
        checkProxyUser();
        DelegationToken delegationToken = (DelegationToken) AdminClient.create(createAdminClientProperties(sparkConf, kafkaTokenClusterConf)).createDelegationToken(new CreateDelegationTokenOptions()).delegationToken().get();
        printToken(delegationToken);
        return new Tuple2<>(new Token(delegationToken.tokenInfo().tokenId().getBytes(), delegationToken.hmacAsBase64String().getBytes(), TOKEN_KIND(), getTokenService(kafkaTokenClusterConf.identifier())), BoxesRunTime.boxToLong(delegationToken.tokenInfo().expiryTimestamp()));
    }

    public void checkProxyUser() {
        Predef$.MODULE$.require(!SparkHadoopUtil$.MODULE$.get().isProxyUser(UserGroupInformation.getCurrentUser()), () -> {
            return "Obtaining delegation token for proxy user is not yet supported.";
        });
    }

    public Properties createAdminClientProperties(SparkConf sparkConf, KafkaTokenClusterConf kafkaTokenClusterConf) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaTokenClusterConf.authBootstrapServers());
        properties.put("security.protocol", kafkaTokenClusterConf.securityProtocol());
        String securityProtocol = kafkaTokenClusterConf.securityProtocol();
        String str = SecurityProtocol.SASL_SSL.name;
        if (str != null ? !str.equals(securityProtocol) : securityProtocol != null) {
            String str2 = SecurityProtocol.SSL.name;
            if (str2 != null ? !str2.equals(securityProtocol) : securityProtocol != null) {
                String str3 = SecurityProtocol.SASL_PLAINTEXT.name;
                if (str3 != null ? !str3.equals(securityProtocol) : securityProtocol != null) {
                    throw new MatchError(securityProtocol);
                }
                logWarning(() -> {
                    return "Obtaining kafka delegation token through plain communication channel. Please consider the security impact.";
                });
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                setTrustStoreProperties(kafkaTokenClusterConf, properties);
                setKeyStoreProperties(kafkaTokenClusterConf, properties);
                logWarning(() -> {
                    return "Obtaining kafka delegation token with SSL protocol. Please configure 2-way authentication on the broker side.";
                });
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        } else {
            setTrustStoreProperties(kafkaTokenClusterConf, properties);
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
        if (isGlobalJaasConfigurationProvided()) {
            logDebug(() -> {
                return "JVM global security configuration detected, using it for login.";
            });
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        } else {
            properties.put("sasl.mechanism", "GSSAPI");
            if (sparkConf.contains(package$.MODULE$.KEYTAB())) {
                logDebug(() -> {
                    return "Keytab detected, using it for login.";
                });
                properties.put("sasl.jaas.config", getKeytabJaasParams((String) ((Option) sparkConf.get(package$.MODULE$.KEYTAB())).get(), (String) ((Option) sparkConf.get(package$.MODULE$.PRINCIPAL())).get(), kafkaTokenClusterConf.kerberosServiceName()));
            } else {
                logDebug(() -> {
                    return "Using ticket cache for login.";
                });
                properties.put("sasl.jaas.config", getTicketCacheJaasParams(kafkaTokenClusterConf));
            }
        }
        logDebug(() -> {
            return new StringBuilder(44).append("AdminClient params before specified params: ").append(String.valueOf(KafkaRedactionUtil$.MODULE$.redactParams(((IterableOnceOps) JavaConverters$.MODULE$.propertiesAsScalaMapConverter(properties).asScala()).toSeq()))).toString();
        });
        kafkaTokenClusterConf.specifiedKafkaParams().foreach(tuple2 -> {
            return properties.setProperty((String) tuple2._1(), (String) tuple2._2());
        });
        logDebug(() -> {
            return new StringBuilder(43).append("AdminClient params after specified params: ").append(String.valueOf(KafkaRedactionUtil$.MODULE$.redactParams(((IterableOnceOps) JavaConverters$.MODULE$.propertiesAsScalaMapConverter(properties).asScala()).toSeq()))).toString();
        });
        return properties;
    }

    public boolean isGlobalJaasConfigurationProvided() {
        try {
            JaasContext.loadClientContext(Collections.emptyMap());
            return true;
        } catch (Throwable th) {
            if (th == null || NonFatal$.MODULE$.unapply(th).isEmpty()) {
                throw th;
            }
            return false;
        }
    }

    private void setTrustStoreProperties(KafkaTokenClusterConf kafkaTokenClusterConf, Properties properties) {
        kafkaTokenClusterConf.trustStoreType().foreach(str -> {
            return properties.put("ssl.truststore.type", str);
        });
        kafkaTokenClusterConf.trustStoreLocation().foreach(str2 -> {
            return properties.put("ssl.truststore.location", str2);
        });
        kafkaTokenClusterConf.trustStorePassword().foreach(str3 -> {
            return properties.put("ssl.truststore.password", str3);
        });
    }

    private void setKeyStoreProperties(KafkaTokenClusterConf kafkaTokenClusterConf, Properties properties) {
        kafkaTokenClusterConf.keyStoreType().foreach(str -> {
            return properties.put("ssl.keystore.type", str);
        });
        kafkaTokenClusterConf.keyStoreLocation().foreach(str2 -> {
            return properties.put("ssl.keystore.location", str2);
        });
        kafkaTokenClusterConf.keyStorePassword().foreach(str3 -> {
            return properties.put("ssl.keystore.password", str3);
        });
        kafkaTokenClusterConf.keyPassword().foreach(str4 -> {
            return properties.put("ssl.key.password", str4);
        });
    }

    public String getKeytabJaasParams(String str, String str2, String str3) {
        String replace = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString(new StringBuilder(125).append("\n      |").append(SecurityUtils$.MODULE$.getKrb5LoginModuleName()).append(" required\n      | debug=").append(SecurityUtils$.MODULE$.isGlobalKrbDebugEnabled()).append("\n      | useKeyTab=true\n      | serviceName=\"").append(str3).append("\"\n      | keyTab=\"").append(str).append("\"\n      | principal=\"").append(str2).append("\";\n      ").toString())).replace("\n", "");
        logDebug(() -> {
            return new StringBuilder(24).append("Krb keytab JAAS params: ").append(replace).toString();
        });
        return replace;
    }

    private String getTicketCacheJaasParams(KafkaTokenClusterConf kafkaTokenClusterConf) {
        String trim = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString(new StringBuilder(91).append("\n      |").append(SecurityUtils$.MODULE$.getKrb5LoginModuleName()).append(" required\n      | debug=").append(SecurityUtils$.MODULE$.isGlobalKrbDebugEnabled()).append("\n      | useTicketCache=true\n      | serviceName=\"").append(kafkaTokenClusterConf.kerberosServiceName()).append("\";\n      ").toString())).replace("\n", "").trim();
        logDebug(() -> {
            return new StringBuilder(30).append("Krb ticket cache JAAS params: ").append(trim).toString();
        });
        return trim;
    }

    private void printToken(DelegationToken delegationToken) {
        if (log().isDebugEnabled()) {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
            logDebug(() -> {
                return StringOps$.MODULE$.format$extension(Predef$.MODULE$.augmentString("%-15s %-30s %-15s %-25s %-15s %-15s %-15s"), ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"}));
            });
            TokenInformation tokenInformation = delegationToken.tokenInfo();
            logDebug(() -> {
                return StringOps$.MODULE$.format$extension(Predef$.MODULE$.augmentString("%-15s %-15s %-15s %-25s %-15s %-15s %-15s"), ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{tokenInformation.tokenId(), Utils$.MODULE$.REDACTION_REPLACEMENT_TEXT(), tokenInformation.owner(), tokenInformation.renewersAsString(), simpleDateFormat.format(BoxesRunTime.boxToLong(tokenInformation.issueTimestamp())), simpleDateFormat.format(BoxesRunTime.boxToLong(tokenInformation.expiryTimestamp())), simpleDateFormat.format(BoxesRunTime.boxToLong(tokenInformation.maxTimestamp()))}));
            });
        }
    }

    public Option<KafkaTokenClusterConf> findMatchingTokenClusterConfig(SparkConf sparkConf, String str) {
        Iterable iterable = (Iterable) ((IterableOps) ((IterableOps) ((Iterable) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(UserGroupInformation.getCurrentUser().getCredentials().getAllTokens()).asScala()).filter(token -> {
            return BoxesRunTime.boxToBoolean($anonfun$findMatchingTokenClusterConfig$1(token));
        })).map(token2 -> {
            return KafkaTokenSparkConf$.MODULE$.getClusterConfig(sparkConf, MODULE$.getClusterIdentifier(token2.getService()));
        })).filter(kafkaTokenClusterConf -> {
            return BoxesRunTime.boxToBoolean($anonfun$findMatchingTokenClusterConfig$3(str, kafkaTokenClusterConf));
        });
        Predef$.MODULE$.require(iterable.size() <= 1, () -> {
            return new StringBuilder(53).append("More than one delegation token matches the following ").append(new StringBuilder(20).append("bootstrap servers: ").append(str).append(".").toString()).toString();
        });
        return iterable.headOption();
    }

    public String getTokenJaasParams(KafkaTokenClusterConf kafkaTokenClusterConf) {
        Token token = UserGroupInformation.getCurrentUser().getCredentials().getToken(getTokenService(kafkaTokenClusterConf.identifier()));
        Predef$.MODULE$.require(token != null, () -> {
            return new StringBuilder(32).append("Token for identifier ").append(kafkaTokenClusterConf.identifier()).append(" must exist").toString();
        });
        String trim = StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString(new StringBuilder(111).append("\n      |").append(ScramLoginModule.class.getName()).append(" required\n      | tokenauth=true\n      | serviceName=\"").append(kafkaTokenClusterConf.kerberosServiceName()).append("\"\n      | username=\"").append(new String(token.getIdentifier())).append("\"\n      | password=\"").append(new String(token.getPassword())).append("\";\n      ").toString())).replace("\n", "").trim();
        logDebug(() -> {
            return new StringBuilder(19).append("Scram JAAS params: ").append(KafkaRedactionUtil$.MODULE$.redactJaasParam(trim)).toString();
        });
        return trim;
    }

    public boolean needTokenUpdate(Map<String, Object> map, Option<KafkaTokenClusterConf> option) {
        if (!option.isDefined() || !map.containsKey("sasl.jaas.config")) {
            return false;
        }
        logDebug(() -> {
            return "Delegation token used by connector, checking if uses the latest token.";
        });
        String str = (String) map.get("sasl.jaas.config");
        String tokenJaasParams = getTokenJaasParams((KafkaTokenClusterConf) option.get());
        return tokenJaasParams != null ? !tokenJaasParams.equals(str) : str != null;
    }

    public static final /* synthetic */ boolean $anonfun$findMatchingTokenClusterConfig$1(Token token) {
        return token.getService().toString().startsWith(MODULE$.TOKEN_SERVICE_PREFIX());
    }

    public static final /* synthetic */ boolean $anonfun$findMatchingTokenClusterConfig$4(Pattern pattern, String str) {
        return pattern.matcher(str).matches();
    }

    public static final /* synthetic */ boolean $anonfun$findMatchingTokenClusterConfig$3(String str, KafkaTokenClusterConf kafkaTokenClusterConf) {
        Pattern compile = Pattern.compile(kafkaTokenClusterConf.targetServersRegex());
        return Utils$.MODULE$.stringToSeq(str).exists(str2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$findMatchingTokenClusterConfig$4(compile, str2));
        });
    }

    private KafkaTokenUtil$() {
    }
}
