package org.apache.kafka.common.network;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/common/network/NioEchoServer.class */
public class NioEchoServer extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(NioEchoServer.class);
    private static final double EPS = 1.0E-4d;
    private final int port;
    private final ServerSocketChannel serverSocketChannel;
    private final List<SocketChannel> newChannels;
    private final List<SocketChannel> socketChannels;
    private final AcceptorThread acceptorThread;
    private final Selector selector;
    private volatile TransferableChannel outputChannel;
    private final CredentialCache credentialCache;
    private final Metrics metrics;
    private volatile int numSent;
    private volatile boolean closeKafkaChannels;
    private final DelegationTokenCache tokenCache;
    private final Time time;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/common/network/NioEchoServer$AcceptorThread.class */
    public class AcceptorThread extends Thread {
        public AcceptorThread() {
            setName("acceptor");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Selector selector = null;
            try {
                try {
                    selector = Selector.open();
                    NioEchoServer.this.serverSocketChannel.register(selector, 16);
                    while (NioEchoServer.this.serverSocketChannel.isOpen()) {
                        if (selector.select(1000L) > 0) {
                            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                            while (it.hasNext()) {
                                SelectionKey next = it.next();
                                if (next.isAcceptable()) {
                                    SocketChannel accept = ((ServerSocketChannel) next.channel()).accept();
                                    accept.configureBlocking(false);
                                    NioEchoServer.this.newChannels.add(accept);
                                    NioEchoServer.this.selector.wakeup();
                                }
                                it.remove();
                            }
                        }
                    }
                    Utils.closeQuietly(selector, "acceptSelector");
                } catch (IOException e) {
                    NioEchoServer.LOG.warn(e.getMessage(), e);
                    Utils.closeQuietly(selector, "acceptSelector");
                }
            } catch (Throwable th) {
                Utils.closeQuietly(selector, "acceptSelector");
                throw th;
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/common/network/NioEchoServer$MetricType.class */
    public enum MetricType {
        TOTAL,
        RATE,
        AVG,
        MAX;

        private final String metricNameSuffix = "-" + name().toLowerCase(Locale.ROOT);

        MetricType() {
        }

        public String metricNameSuffix() {
            return this.metricNameSuffix;
        }
    }

    public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig abstractConfig, String str, ChannelBuilder channelBuilder, CredentialCache credentialCache, Time time) throws Exception {
        this(listenerName, securityProtocol, abstractConfig, str, channelBuilder, credentialCache, 100, time);
    }

    public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig abstractConfig, String str, ChannelBuilder channelBuilder, CredentialCache credentialCache, int i, Time time) throws Exception {
        this(listenerName, securityProtocol, abstractConfig, str, channelBuilder, credentialCache, i, time, new DelegationTokenCache(ScramMechanism.mechanismNames()));
    }

    public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig abstractConfig, String str, ChannelBuilder channelBuilder, CredentialCache credentialCache, int i, Time time, DelegationTokenCache delegationTokenCache) throws Exception {
        super("echoserver");
        this.numSent = 0;
        setDaemon(true);
        this.serverSocketChannel = ServerSocketChannel.open();
        this.serverSocketChannel.configureBlocking(false);
        this.serverSocketChannel.socket().bind(new InetSocketAddress(str, 0));
        this.port = this.serverSocketChannel.socket().getLocalPort();
        this.socketChannels = Collections.synchronizedList(new ArrayList());
        this.newChannels = Collections.synchronizedList(new ArrayList());
        this.credentialCache = credentialCache;
        this.tokenCache = delegationTokenCache;
        if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
            for (String str2 : ScramMechanism.mechanismNames()) {
                if (credentialCache.cache(str2, ScramCredential.class) == null) {
                    credentialCache.createCache(str2, ScramCredential.class);
                }
            }
        }
        LogContext logContext = new LogContext();
        channelBuilder = channelBuilder == null ? ChannelBuilders.serverChannelBuilder(listenerName, false, securityProtocol, abstractConfig, credentialCache, delegationTokenCache, time, logContext, () -> {
            return ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
        }) : channelBuilder;
        this.metrics = new Metrics();
        this.selector = new Selector(10000L, i, this.metrics, time, "MetricGroup", channelBuilder, logContext);
        this.acceptorThread = new AcceptorThread();
        this.time = time;
    }

    public int port() {
        return this.port;
    }

    public CredentialCache credentialCache() {
        return this.credentialCache;
    }

    public DelegationTokenCache tokenCache() {
        return this.tokenCache;
    }

    public double metricValue(String str) {
        for (Map.Entry entry : this.metrics.metrics().entrySet()) {
            if (((MetricName) entry.getKey()).name().equals(str)) {
                return ((Double) ((KafkaMetric) entry.getValue()).metricValue()).doubleValue();
            }
        }
        throw new IllegalStateException("Metric not found, " + str + ", found=" + this.metrics.metrics().keySet());
    }

    public void verifyAuthenticationMetrics(int i, int i2) throws InterruptedException {
        waitForMetrics("successful-authentication", i, EnumSet.of(MetricType.TOTAL, MetricType.RATE));
        waitForMetrics("failed-authentication", i2, EnumSet.of(MetricType.TOTAL, MetricType.RATE));
    }

    public void verifyReauthenticationMetrics(int i, int i2) throws InterruptedException {
        waitForMetrics("successful-reauthentication", i, EnumSet.of(MetricType.TOTAL, MetricType.RATE));
        waitForMetrics("failed-reauthentication", i2, EnumSet.of(MetricType.TOTAL, MetricType.RATE));
        waitForMetrics("successful-authentication-no-reauth", 0.0d, EnumSet.of(MetricType.TOTAL));
        if (this.time instanceof MockTime) {
            return;
        }
        waitForMetrics("reauthentication-latency", Math.signum(i), EnumSet.of(MetricType.MAX, MetricType.AVG));
    }

    public void verifyAuthenticationNoReauthMetric(int i) throws InterruptedException {
        waitForMetrics("successful-authentication-no-reauth", i, EnumSet.of(MetricType.TOTAL));
    }

    public void waitForMetric(String str, double d) throws InterruptedException {
        waitForMetrics(str, d, EnumSet.of(MetricType.TOTAL, MetricType.RATE));
    }

    public void waitForMetrics(String str, double d, Set<MetricType> set) throws InterruptedException {
        long milliseconds = this.time.milliseconds();
        for (MetricType metricType : set) {
            long milliseconds2 = TestUtils.DEFAULT_MAX_WAIT_MS - (this.time.milliseconds() - milliseconds);
            String str2 = str + metricType.metricNameSuffix();
            if (d == 0.0d) {
                Double valueOf = Double.valueOf(d);
                if (metricType == MetricType.MAX || metricType == MetricType.AVG) {
                    valueOf = Double.valueOf(Double.NaN);
                }
                Assertions.assertEquals(valueOf.doubleValue(), metricValue(str2), EPS, "Metric not updated " + str2 + " expected:<" + d + "> but was:<" + metricValue(str2) + ">");
            } else if (metricType == MetricType.TOTAL) {
                TestUtils.waitForCondition(() -> {
                    return Math.abs(metricValue(str2) - d) <= EPS;
                }, milliseconds2, (Supplier<String>) () -> {
                    return "Metric not updated " + str2 + " expected:<" + d + "> but was:<" + metricValue(str2) + ">";
                });
            } else {
                TestUtils.waitForCondition(() -> {
                    return metricValue(str2) > 0.0d;
                }, milliseconds2, (Supplier<String>) () -> {
                    return "Metric not updated " + str2 + " expected:<a positive number> but was:<" + metricValue(str2) + ">";
                });
            }
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            this.acceptorThread.start();
            while (this.serverSocketChannel.isOpen()) {
                this.selector.poll(100L);
                synchronized (this.newChannels) {
                    for (SocketChannel socketChannel : this.newChannels) {
                        this.selector.register(id(socketChannel), socketChannel);
                        this.socketChannels.add(socketChannel);
                    }
                    this.newChannels.clear();
                }
                if (this.closeKafkaChannels) {
                    Iterator it = this.selector.channels().iterator();
                    while (it.hasNext()) {
                        this.selector.close(((KafkaChannel) it.next()).id());
                    }
                }
                for (NetworkReceive networkReceive : this.selector.completedReceives()) {
                    KafkaChannel channel = channel(networkReceive.source());
                    if (!maybeBeginServerReauthentication(channel, networkReceive, this.time)) {
                        String id = channel.id();
                        this.selector.mute(id);
                        NetworkSend networkSend = new NetworkSend(networkReceive.source(), ByteBufferSend.sizePrefixed(networkReceive.payload()));
                        if (this.outputChannel == null) {
                            this.selector.send(networkSend);
                        } else {
                            networkSend.writeTo(this.outputChannel);
                            this.selector.unmute(id);
                        }
                    }
                }
                Iterator it2 = this.selector.completedSends().iterator();
                while (it2.hasNext()) {
                    this.selector.unmute(((NetworkSend) it2.next()).destinationId());
                    this.numSent++;
                }
            }
        } catch (IOException e) {
            LOG.warn(e.getMessage(), e);
        }
    }

    public int numSent() {
        return this.numSent;
    }

    private static boolean maybeBeginServerReauthentication(KafkaChannel kafkaChannel, NetworkReceive networkReceive, Time time) {
        try {
            if (TestUtils.apiKeyFrom(networkReceive) != ApiKeys.SASL_HANDSHAKE) {
                return false;
            }
            time.getClass();
            return kafkaChannel.maybeBeginServerReauthentication(networkReceive, time::nanoseconds);
        } catch (Exception e) {
            return false;
        }
    }

    private String id(SocketChannel socketChannel) {
        return socketChannel.socket().getLocalAddress().getHostAddress() + ":" + socketChannel.socket().getLocalPort() + "-" + socketChannel.socket().getInetAddress().getHostAddress() + ":" + socketChannel.socket().getPort();
    }

    private KafkaChannel channel(String str) {
        KafkaChannel channel = this.selector.channel(str);
        return channel == null ? this.selector.closingChannel(str) : channel;
    }

    public void outputChannel(final WritableByteChannel writableByteChannel) {
        this.outputChannel = new TransferableChannel() { // from class: org.apache.kafka.common.network.NioEchoServer.1
            public boolean hasPendingWrites() {
                return false;
            }

            public long transferFrom(FileChannel fileChannel, long j, long j2) throws IOException {
                return fileChannel.transferTo(j, j2, writableByteChannel);
            }

            public boolean isOpen() {
                return writableByteChannel.isOpen();
            }

            public void close() throws IOException {
                writableByteChannel.close();
            }

            public int write(ByteBuffer byteBuffer) throws IOException {
                return writableByteChannel.write(byteBuffer);
            }

            public long write(ByteBuffer[] byteBufferArr, int i, int i2) throws IOException {
                long j = 0;
                for (int i3 = i; i3 < i + i2; i3++) {
                    j += write(byteBufferArr[i3]);
                }
                return j;
            }

            public long write(ByteBuffer[] byteBufferArr) throws IOException {
                return write(byteBufferArr, 0, byteBufferArr.length);
            }
        };
    }

    public Selector selector() {
        return this.selector;
    }

    public void closeKafkaChannels() {
        this.closeKafkaChannels = true;
        this.selector.wakeup();
        try {
            try {
                TestUtils.waitForCondition(() -> {
                    return this.selector.channels().isEmpty();
                }, "Channels not closed");
                this.closeKafkaChannels = false;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            this.closeKafkaChannels = false;
            throw th;
        }
    }

    public void closeSocketChannels() throws IOException {
        Iterator<SocketChannel> it = this.socketChannels.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.socketChannels.clear();
    }

    public void close() throws IOException, InterruptedException {
        this.serverSocketChannel.close();
        closeSocketChannels();
        Utils.closeQuietly(this.selector, "selector");
        this.acceptorThread.interrupt();
        this.acceptorThread.join();
        interrupt();
        join();
    }
}
