package org.apache.kafka.server.util;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.timeline.SnapshotRegistry;

/* loaded from: input_file:org/apache/kafka/server/util/InterBrokerSendThread.class */
public abstract class InterBrokerSendThread extends ShutdownableThread {
    public final UnsentRequests unsentRequests;
    protected volatile KafkaClient networkClient;
    private final int requestTimeoutMs;
    private final Time time;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/server/util/InterBrokerSendThread$UnsentRequests.class */
    public static final class UnsentRequests {
        private final Map<Node, ArrayDeque<ClientRequest>> unsent = new HashMap();

        protected UnsentRequests() {
        }

        public void put(Node node, ClientRequest clientRequest) {
            this.unsent.computeIfAbsent(node, node2 -> {
                return new ArrayDeque();
            }).add(clientRequest);
        }

        Collection<ClientRequest> removeAllTimedOut(long j) {
            ArrayList arrayList = new ArrayList();
            Iterator<ArrayDeque<ClientRequest>> it = this.unsent.values().iterator();
            while (it.hasNext()) {
                Iterator<ClientRequest> it2 = it.next().iterator();
                boolean z = false;
                while (it2.hasNext() && !z) {
                    ClientRequest next = it2.next();
                    if (Math.max(0L, j - next.createdTimeMs()) > next.requestTimeoutMs()) {
                        arrayList.add(next);
                        it2.remove();
                        z = true;
                    }
                }
            }
            return arrayList;
        }

        void clean() {
            this.unsent.values().removeIf((v0) -> {
                return v0.isEmpty();
            });
        }

        Iterator<Map.Entry<Node, ArrayDeque<ClientRequest>>> iterator() {
            return this.unsent.entrySet().iterator();
        }

        Iterator<ClientRequest> requestIterator(Node node) {
            ArrayDeque<ClientRequest> arrayDeque = this.unsent.get(node);
            return arrayDeque == null ? Collections.emptyIterator() : arrayDeque.iterator();
        }

        boolean hasUnsentRequests(Node node) {
            return requestIterator(node).hasNext();
        }

        public void clearUnsentRequests(Node node) {
            ArrayDeque<ClientRequest> arrayDeque = this.unsent.get(node);
            if (arrayDeque != null) {
                arrayDeque.clear();
            }
        }

        Set<Node> nodes() {
            return this.unsent.keySet();
        }
    }

    public InterBrokerSendThread(String str, KafkaClient kafkaClient, int i, Time time) {
        this(str, kafkaClient, i, time, true);
    }

    public InterBrokerSendThread(String str, KafkaClient kafkaClient, int i, Time time, boolean z) {
        super(str, z);
        this.networkClient = kafkaClient;
        this.requestTimeoutMs = i;
        this.time = time;
        this.unsentRequests = new UnsentRequests();
    }

    public abstract Collection<RequestAndCompletionHandler> generateRequests();

    public boolean hasUnsentRequests() {
        return this.unsentRequests.iterator().hasNext();
    }

    @Override // org.apache.kafka.server.util.ShutdownableThread
    public void shutdown() throws InterruptedException {
        initiateShutdown();
        this.networkClient.initiateClose();
        awaitShutdown();
        Utils.closeQuietly(this.networkClient, "InterBrokerSendThread network client");
    }

    private void drainGeneratedRequests() {
        generateRequests().forEach(requestAndCompletionHandler -> {
            this.unsentRequests.put(requestAndCompletionHandler.destination, this.networkClient.newClientRequest(requestAndCompletionHandler.destination.idString(), requestAndCompletionHandler.request, requestAndCompletionHandler.creationTimeMs, true, this.requestTimeoutMs, requestAndCompletionHandler.handler));
        });
    }

    protected void pollOnce(long j) {
        try {
            drainGeneratedRequests();
            long milliseconds = this.time.milliseconds();
            this.networkClient.poll(sendRequests(milliseconds, j), milliseconds);
            long milliseconds2 = this.time.milliseconds();
            checkDisconnects(milliseconds2);
            failExpiredRequests(milliseconds2);
            this.unsentRequests.clean();
        } catch (FatalExitError e) {
            throw e;
        } catch (Throwable th) {
            if (!(th instanceof DisconnectException) || this.networkClient.active()) {
                if ((th instanceof InterruptedException) && !isRunning()) {
                    throw th;
                }
                this.log.error("unhandled exception caught in InterBrokerSendThread", th);
                throw new FatalExitError();
            }
        }
    }

    @Override // org.apache.kafka.server.util.ShutdownableThread
    public void doWork() {
        pollOnce(SnapshotRegistry.LATEST_EPOCH);
    }

    protected long sendRequests(long j, long j2) {
        long j3 = j2;
        for (Node node : this.unsentRequests.nodes()) {
            Iterator<ClientRequest> requestIterator = this.unsentRequests.requestIterator(node);
            while (requestIterator.hasNext()) {
                ClientRequest next = requestIterator.next();
                if (this.networkClient.ready(node, j)) {
                    this.networkClient.send(next, j);
                    requestIterator.remove();
                } else {
                    j3 = Math.min(j3, this.networkClient.connectionDelay(node, j));
                }
            }
        }
        return j3;
    }

    private void checkDisconnects(long j) {
        Iterator<Map.Entry<Node, ArrayDeque<ClientRequest>>> it = this.unsentRequests.iterator();
        while (it.hasNext()) {
            Map.Entry<Node, ArrayDeque<ClientRequest>> next = it.next();
            Node key = next.getKey();
            ArrayDeque<ClientRequest> value = next.getValue();
            if (!value.isEmpty() && this.networkClient.connectionFailed(key)) {
                it.remove();
                Iterator<ClientRequest> it2 = value.iterator();
                while (it2.hasNext()) {
                    ClientRequest next2 = it2.next();
                    AuthenticationException authenticationException = this.networkClient.authenticationException(key);
                    if (authenticationException != null) {
                        this.log.error("Failed to send the following request due to authentication error: {}", next2);
                    }
                    completeWithDisconnect(next2, j, authenticationException);
                }
            }
        }
    }

    private void failExpiredRequests(long j) {
        for (ClientRequest clientRequest : this.unsentRequests.removeAllTimedOut(j)) {
            this.log.debug("Failed to send the following request after {} ms: {}", Integer.valueOf(clientRequest.requestTimeoutMs()), clientRequest);
            completeWithDisconnect(clientRequest, j, null);
        }
    }

    private static void completeWithDisconnect(ClientRequest clientRequest, long j, AuthenticationException authenticationException) {
        RequestCompletionHandler callback = clientRequest.callback();
        callback.onComplete(new ClientResponse(clientRequest.makeHeader(clientRequest.requestBuilder().latestAllowedVersion()), callback, clientRequest.destination(), j, j, true, (UnsupportedVersionException) null, authenticationException, (AbstractResponse) null));
    }

    protected boolean hasInFlightRequests(Node node) {
        return this.unsentRequests.hasUnsentRequests(node) || this.networkClient.hasInFlightRequests(node.idString());
    }

    public void wakeup() {
        this.networkClient.wakeup();
    }
}
