/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import kafka.controller.KafkaController;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkAdminClient;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConnectionManager;
import kafka.server.link.ClusterLinkDestConnectionManager$;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkNetworkClient;
import kafka.server.link.ConnectionMode;
import kafka.server.link.ConnectionMode$Inbound$;
import kafka.server.link.ReverseClient;
import kafka.server.link.ReverseClient$;
import kafka.utils.CoreUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.ClientInterceptor;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.InitiateReverseConnectionsRequestData;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.requests.InitiateReverseConnectionsRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\t\rv!\u0002\u0016,\u0011\u0003\u0011d!\u0002\u001b,\u0011\u0003)\u0004\"\u0002\u001f\u0002\t\u0003i\u0004b\u0002 \u0002\u0005\u0004%\ta\u0010\u0005\u0007\u0019\u0006\u0001\u000b\u0011\u0002!\u0007\tQZ\u0003!\u0014\u0005\n1\u0016\u0011\t\u0011)A\u00053~C\u0001\u0002Y\u0003\u0003\u0002\u0003\u0006I!\u0019\u0005\nI\u0016\u0011\t\u0011)A\u0005KBD\u0001\"]\u0003\u0003\u0002\u0003\u0006IA\u001d\u0005\u000b\u0003\u0003)!\u0011!Q\u0001\n\u0005\r\u0001BCA\u0005\u000b\t\u0005\t\u0015!\u0003\u0002\f!Q\u0011\u0011D\u0003\u0003\u0002\u0003\u0006I!a\u0007\t\u0015\u0005\u001dRA!b\u0001\n\u0003\tI\u0003\u0003\u0006\u00026\u0015\u0011\t\u0011)A\u0005\u0003WA!\"a\u000e\u0006\u0005\u0003\u0005\u000b\u0011BA\u001d\u0011)\t\t%\u0002B\u0001B\u0003%\u00111\t\u0005\u0007y\u0015!\t!a\u0015\t\u0013\u0005%TA1A\u0005\n\u0005-\u0004\u0002CAA\u000b\u0001\u0006I!!\u001c\t\u0011\u0005\rUA1A\u0005\n}Bq!!\"\u0006A\u0003%\u0001\t\u0003\u0005\u0002\b\u0016\u0011\r\u0011\"\u0003@\u0011\u001d\tI)\u0002Q\u0001\n\u0001C\u0001\"a#\u0006\u0005\u0004%Ia\u0010\u0005\b\u0003\u001b+\u0001\u0015!\u0003A\u0011%\ty)\u0002a\u0001\n\u0013\t\t\nC\u0005\u0002\u0016\u0016\u0001\r\u0011\"\u0003\u0002\u0018\"A\u00111U\u0003!B\u0013\t\u0019\nC\u0004\u0002.\u0016!\t%a,\t\u0011\u00055W\u0001\"\u0001,\u0003\u001fDq!!>\u0006\t\u0003\n9\u0010C\u0004\u0003\u000e\u0015!\tEa\u0004\t\u000f\t=S\u0001\"\u0003\u0003R!9!qM\u0003\u0005\n\t%\u0004b\u0002B@\u000b\u0011\u0005#\u0011\u0011\u0005\b\u0005\u001b+A\u0011\u000bBH\u0011\u001d\u0011\t*\u0002C)\u0005\u001fCqAa%\u0006\t\u0003\u0011)\nC\u0004\u0003\u001a\u0016!\tEa'\t\u000f\tuU\u0001\"\u0011\u0003\u001c\"i!qT\u0003\u0011\u0002\u0007\u0005\t\u0011\"\u0003\u0003\"B\f\u0001e\u00117vgR,'\u000fT5oW\u0012+7\u000f^\"p]:,7\r^5p]6\u000bg.Y4fe*\u0011A&L\u0001\u0005Y&t7N\u0003\u0002/_\u000511/\u001a:wKJT\u0011\u0001M\u0001\u0006W\u000647.Y\u0002\u0001!\t\u0019\u0014!D\u0001,\u0005\u0001\u001aE.^:uKJd\u0015N\\6EKN$8i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:\u0014\u0005\u00051\u0004CA\u001c;\u001b\u0005A$\"A\u001d\u0002\u000bM\u001c\u0017\r\\1\n\u0005mB$AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002e\u0005!b*\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012,\u0012\u0001\u0011\t\u0003\u0003*k\u0011A\u0011\u0006\u0003\u0007\u0012\u000ba!\u0019;p[&\u001c'BA#G\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003\u000f\"\u000bA!\u001e;jY*\t\u0011*\u0001\u0003kCZ\f\u0017BA&C\u00055\tEo\\7jG&sG/Z4fe\u0006)b*\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012\u00043cA\u0003O#B\u00111gT\u0005\u0003!.\u0012Ad\u00117vgR,'\u000fT5oW\u000e{gN\\3di&|g.T1oC\u001e,'\u000f\u0005\u0002S+:\u00111gU\u0005\u0003).\n!c\u00117vgR,'\u000fT5oW\u001a\u000b7\r^8ss&\u0011ak\u0016\u0002\u0016\t\u0016\u001cHoQ8o]\u0016\u001cG/[8o\u001b\u0006t\u0017mZ3s\u0015\t!6&\u0001\u0005mS:\\G)\u0019;b!\tQV,D\u0001\\\u0015\tav&\u0001\u0002{W&\u0011al\u0017\u0002\u0010\u00072,8\u000f^3s\u0019&t7\u000eR1uC&\u0011\u0001lT\u0001\u000eS:LG/[1m\u0007>tg-[4\u0011\u0005M\u0012\u0017BA2,\u0005E\u0019E.^:uKJd\u0015N\\6D_:4\u0017nZ\u0001\u0014Y>\u001c\u0017\r\u001c'pO&\u001c\u0017\r\\\"mkN$XM\u001d\t\u0003M6t!aZ6\u0011\u0005!DT\"A5\u000b\u0005)\f\u0014A\u0002\u001fs_>$h(\u0003\u0002mq\u00051\u0001K]3eK\u001aL!A\\8\u0003\rM#(/\u001b8h\u0015\ta\u0007(\u0003\u0002e\u001f\u0006\t2\r\\5f]RLe\u000e^3sG\u0016\u0004Ho\u001c:\u0011\u0007]\u001aX/\u0003\u0002uq\t1q\n\u001d;j_:\u0004\"A\u001e@\u000e\u0003]T!\u0001_=\u0002\u000f\rd\u0017.\u001a8ug*\u0011\u0001G\u001f\u0006\u0003wr\fa!\u00199bG\",'\"A?\u0002\u0007=\u0014x-\u0003\u0002\u0000o\n\t2\t\\5f]RLe\u000e^3sG\u0016\u0004Ho\u001c:\u0002\u000f5,GO]5dgB\u00191'!\u0002\n\u0007\u0005\u001d1F\u0001\nDYV\u001cH/\u001a:MS:\\W*\u001a;sS\u000e\u001c\u0018A\u0005:f[>$X-\u00113nS:4\u0015m\u0019;pef\u0004\u0002bNA\u0007C\u0006E\u00111C\u0005\u0004\u0003\u001fA$!\u0003$v]\u000e$\u0018n\u001c83!\t\u0019T\u0001E\u00024\u0003+I1!a\u0006,\u0005Y\u0019E.^:uKJd\u0015N\\6BI6Lgn\u00117jK:$\u0018A\u00037pG\u0006d\u0017\tZ7j]B!\u0011QDA\u0012\u001b\t\tyBC\u0002\u0002\"]\fQ!\u00193nS:LA!!\n\u0002 \tq1i\u001c8gYV,g\u000e^!e[&t\u0017AC2p]R\u0014x\u000e\u001c7feV\u0011\u00111\u0006\t\u0005\u0003[\t\t$\u0004\u0002\u00020)\u0019\u0011qE\u0018\n\t\u0005M\u0012q\u0006\u0002\u0010\u0017\u000647.Y\"p]R\u0014x\u000e\u001c7fe\u0006Y1m\u001c8ue>dG.\u001a:!\u00031\u0011'o\\6fe\u000e{gNZ5h!\u0011\tY$!\u0010\u000e\u00035J1!a\u0010.\u0005-Y\u0015MZ6b\u0007>tg-[4\u0002\tQLW.\u001a\t\u0005\u0003\u000b\ny%\u0004\u0002\u0002H)!\u0011\u0011JA&\u0003\u0015)H/\u001b7t\u0015\r\ti%_\u0001\u0007G>lWn\u001c8\n\t\u0005E\u0013q\t\u0002\u0005)&lW\r\u0006\f\u0002\u0012\u0005U\u0013qKA-\u00037\ni&a\u0018\u0002b\u0005\r\u0014QMA4\u0011\u0015A\u0016\u00031\u0001Z\u0011\u0015\u0001\u0017\u00031\u0001b\u0011\u0015!\u0017\u00031\u0001f\u0011\u0015\t\u0018\u00031\u0001s\u0011\u001d\t\t!\u0005a\u0001\u0003\u0007Aq!!\u0003\u0012\u0001\u0004\tY\u0001C\u0004\u0002\u001aE\u0001\r!a\u0007\t\u000f\u0005\u001d\u0012\u00031\u0001\u0002,!9\u0011qG\tA\u0002\u0005e\u0002bBA!#\u0001\u0007\u00111I\u0001\u0013G>tg.Z2uS>t'+Z9vKN$8/\u0006\u0002\u0002nAA\u0011qNA9\u0003k\nY(D\u0001E\u0013\r\t\u0019\b\u0012\u0002\u0012\u0007>t7-\u001e:sK:$\b*Y:i\u001b\u0006\u0004\bcA\u001c\u0002x%\u0019\u0011\u0011\u0010\u001d\u0003\u0007%sG\u000fE\u00024\u0003{J1!a ,\u00055\u0011VM^3sg\u0016\u001cE.[3oi\u0006\u00192m\u001c8oK\u000e$\u0018n\u001c8SKF,Xm\u001d;tA\u0005!b.\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012\fQC\\3yiJ+g/\u001a:tKJ+\u0017/^3ti&#\u0007%A\u000bqKJ\u001c\u0018n\u001d;f]R\u001cuN\u001c8fGRLwN\\:\u0002-A,'o]5ti\u0016tGoQ8o]\u0016\u001cG/[8og\u0002\n\u0001$Y2uSZ,'+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8t\u0003e\t7\r^5wKJ+g/\u001a:tK\u000e{gN\\3di&|gn\u001d\u0011\u0002-I,g/\u001a:tK\u000e{gN\\3di&|g.\u00113nS:,\"!a%\u0011\t]\u001a\u00181P\u0001\u001be\u00164XM]:f\u0007>tg.Z2uS>t\u0017\tZ7j]~#S-\u001d\u000b\u0005\u00033\u000by\nE\u00028\u00037K1!!(9\u0005\u0011)f.\u001b;\t\u0013\u0005\u00056$!AA\u0002\u0005M\u0015a\u0001=%c\u00059\"/\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8BI6Lg\u000e\t\u0015\u00049\u0005\u001d\u0006cA\u001c\u0002*&\u0019\u00111\u0016\u001d\u0003\u0011Y|G.\u0019;jY\u0016\f\u0011#\u001a8bE2,7\t\\;ti\u0016\u0014H*\u001b8l)\u0019\tI*!-\u0002<\"9\u00111W\u000fA\u0002\u0005U\u0016!\u00048fi^|'o[\"mS\u0016tG\u000fE\u00024\u0003oK1!!/,\u0005a\u0019E.^:uKJd\u0015N\\6OKR<xN]6DY&,g\u000e\u001e\u0005\b\u0003{k\u0002\u0019AA`\u0003=iW\r^1eCR\fW*\u00198bO\u0016\u0014\b\u0003B\u001ct\u0003\u0003\u0004B!a1\u0002J6\u0011\u0011Q\u0019\u0006\u0005\u0003\u000f\fy\"A\u0005j]R,'O\\1mg&!\u00111ZAc\u0005Q\tE-\\5o\u001b\u0016$\u0018\rZ1uC6\u000bg.Y4fe\u0006I\"/\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8Qe>4\u0018\u000eZ3s)!\t\t.a:\u0002p\u0006E\b\u0003B\u001ct\u0003'\u0004B!!6\u0002b:!\u0011q[Ao\u001b\t\tIN\u0003\u0003\u0002\\\u0006-\u0013a\u00028fi^|'o[\u0005\u0005\u0003?\fI.A\u0006SKZ,'o]3O_\u0012,\u0017\u0002BAr\u0003K\u0014!cQ8o]\u0016\u001cG/[8o!J|g/\u001b3fe*!\u0011q\\Am\u0011\u001d\t\u0019L\ba\u0001\u0003S\u00042A^Av\u0013\r\tio\u001e\u0002\u000e\u001d\u0016$xo\u001c:l\u00072LWM\u001c;\t\u000f\u0005uf\u00041\u0001\u0002@\"1\u00111\u001f\u0010A\u0002\u0015\f\u0001b\u00197jK:$\u0018\nZ\u0001\u0019aJ|7-Z:t%\u00164XM]:f\u0007>tg.Z2uS>tGCBAM\u0003s\u0014\u0019\u0001C\u0004\u0002|~\u0001\r!!@\u0002\u000f\rD\u0017M\u001c8fYB!\u0011q[A\u0000\u0013\u0011\u0011\t!!7\u0003\u0019-\u000bgm[1DQ\u0006tg.\u001a7\t\u000f\t\u0015q\u00041\u0001\u0003\b\u0005Y!/\u001a<feN,gj\u001c3f!\u0011\t9N!\u0003\n\t\t-\u0011\u0011\u001c\u0002\f%\u00164XM]:f\u001d>$W-\u0001\u000ej]&$\u0018.\u0019;f%\u00164XM]:f\u0007>tg.Z2uS>t7\u000f\u0006\u0004\u0003\u0012\tU\"Q\t\t\u0007\u0005'\u0011iBa\t\u000f\t\tU!\u0011\u0004\b\u0004Q\n]\u0011\"A\u001d\n\u0007\tm\u0001(A\u0004qC\u000e\\\u0017mZ3\n\t\t}!\u0011\u0005\u0002\u0004'\u0016\f(b\u0001B\u000eqA1\u0011q\u000eB\u0013\u0005SI1Aa\nE\u0005E\u0019u.\u001c9mKR\f'\r\\3GkR,(/\u001a\t\u0005\u0005W\u0011\t$\u0004\u0002\u0003.)\u0019!q\u0006%\u0002\t1\fgnZ\u0005\u0005\u0005g\u0011iC\u0001\u0003W_&$\u0007b\u0002B\u001cA\u0001\u0007!\u0011H\u0001\u001aS:LG/[1uK\u000e{gN\\3di&|gNU3rk\u0016\u001cH\u000f\u0005\u0003\u0003<\t\u0005SB\u0001B\u001f\u0015\u0011\u0011y$a\u0013\u0002\u0011I,\u0017/^3tiNLAAa\u0011\u0003>\t\t\u0013J\\5uS\u0006$XMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8ogJ+\u0017/^3ti\"9!q\t\u0011A\u0002\t%\u0013A\u0004:fcV,7\u000f^\"p]R,\u0007\u0010\u001e\t\u0005\u0005w\u0011Y%\u0003\u0003\u0003N\tu\"A\u0004*fcV,7\u000f^\"p]R,\u0007\u0010^\u0001\u0019e\u0016\fX/Z:u%\u00164XM]:f\u0007>tg.Z2uS>tG\u0003CAM\u0005'\u00129Fa\u0017\t\u000f\tU\u0013\u00051\u0001\u0002v\u0005I!/Z9vKN$\u0018\n\u001a\u0005\b\u00053\n\u0003\u0019AA>\u0003\u0019\u0019G.[3oi\"9!QL\u0011A\u0002\t}\u0013\u0001\u00028pI\u0016\u0004BA!\u0019\u0003d5\u0011\u00111J\u0005\u0005\u0005K\nYE\u0001\u0003O_\u0012,\u0017!\u00074pe^\f'\u000f\u001a+p%\u0016lw\u000e^3D_:$(o\u001c7mKJ$b!!'\u0003l\tm\u0004b\u0002B7E\u0001\u0007!qN\u0001\fe\u0016\fX/Z:u\t\u0006$\u0018\r\u0005\u0003\u0003r\t]TB\u0001B:\u0015\u0011\u0011)(a\u0013\u0002\u000f5,7o]1hK&!!\u0011\u0010B:\u0005\u0015Je.\u001b;jCR,'+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8t%\u0016\fX/Z:u\t\u0006$\u0018\rC\u0004\u0003~\t\u0002\rA!\u0005\u0002\u000f\u0019,H/\u001e:fg\u0006\u0011rN\\\"p]R\u0014x\u000e\u001c7fe\u000eC\u0017M\\4f)\u0011\tIJa!\t\u000f\t\u00155\u00051\u0001\u0003\b\u0006\u0011\u0012n]!di&4XmQ8oiJ|G\u000e\\3s!\r9$\u0011R\u0005\u0004\u0005\u0017C$a\u0002\"p_2,\u0017M\\\u0001\u001cG2|7/\u001a*fm\u0016\u00148/Z\"p]:,7\r^5p]\u0006#W.\u001b8\u0015\u0005\u0005e\u0015\u0001H2sK\u0006$XMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8o\u0003\u0012l\u0017N\\\u0001\u0018e\u00164XM]:f\u0007>tg.Z2uS>t7\t\\5f]R,\"Aa&\u0011\t]\u001a\u0018\u0011^\u0001\u001aa\u0016\u00148/[:uK:$8i\u001c8oK\u000e$\u0018n\u001c8D_VtG/\u0006\u0002\u0002v\u00051\"/\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8D_VtG/A\rtkB,'\u000f\n7pG\u0006dGj\\4jG\u0006d7\t\\;ti\u0016\u0014X#A3")
public class ClusterLinkDestConnectionManager
extends ClusterLinkConnectionManager
implements ClusterLinkFactory.DestConnectionManager {
    private final Option<ClientInterceptor> clientInterceptor;
    private final ClusterLinkMetrics metrics;
    private final Function2<ClusterLinkConfig, ClusterLinkDestConnectionManager, ClusterLinkAdminClient> remoteAdminFactory;
    private final ConfluentAdmin localAdmin;
    private final KafkaController controller;
    private final KafkaConfig brokerConfig;
    private final Time time;
    private final ConcurrentHashMap<Object, ReverseClient> connectionRequests;
    private final AtomicInteger nextReverseRequestId;
    private final AtomicInteger persistentConnections;
    private final AtomicInteger activeReverseConnections;
    private volatile Option<ReverseClient> reverseConnectionAdmin;

    public static AtomicInteger NextReverseRequestId() {
        return ClusterLinkDestConnectionManager$.MODULE$.NextReverseRequestId();
    }

    private /* synthetic */ String super$localLogicalCluster() {
        return super.localLogicalCluster();
    }

    public KafkaController controller() {
        return this.controller;
    }

    private ConcurrentHashMap<Object, ReverseClient> connectionRequests() {
        return this.connectionRequests;
    }

    private AtomicInteger nextReverseRequestId() {
        return this.nextReverseRequestId;
    }

    private AtomicInteger persistentConnections() {
        return this.persistentConnections;
    }

    private AtomicInteger activeReverseConnections() {
        return this.activeReverseConnections;
    }

    private Option<ReverseClient> reverseConnectionAdmin() {
        return this.reverseConnectionAdmin;
    }

    private void reverseConnectionAdmin_$eq(Option<ReverseClient> x$1) {
        this.reverseConnectionAdmin = x$1;
    }

    @Override
    public void enableClusterLink(ClusterLinkNetworkClient networkClient, Option<AdminMetadataManager> metadataManager) {
        block8: {
            block7: {
                ConnectionMode$Inbound$ connectionMode$Inbound$;
                ConnectionMode connectionMode;
                block6: {
                    KafkaClient kafkaClient = networkClient.networkClient();
                    if (kafkaClient instanceof NetworkClient) {
                        NetworkClient networkClient2 = (NetworkClient)kafkaClient;
                        networkClient2.enableDestinationClusterLink(super.linkData().linkId(), (ClientInterceptor)this.clientInterceptor.orNull(Predef$.MODULE$.$conforms()), (ReverseNode.ConnectionProvider)this.reverseConnectionProvider(networkClient2, metadataManager, networkClient.clientId()).orNull(Predef$.MODULE$.$conforms()));
                        return;
                    }
                    connectionMode = this.currentConfig().connectionMode();
                    connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
                    if (connectionMode != null) break block6;
                    if (connectionMode$Inbound$ != null) {
                        return;
                    }
                    break block7;
                }
                if (!connectionMode.equals(connectionMode$Inbound$)) break block8;
            }
            throw new IllegalStateException("Reverse connections are supported only with NetworkClient");
        }
    }

    public Option<ReverseNode.ConnectionProvider> reverseConnectionProvider(NetworkClient networkClient, Option<AdminMetadataManager> metadataManager, String clientId) {
        ConnectionMode connectionMode = this.currentConfig().connectionMode();
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (!(connectionMode != null ? !connectionMode.equals(connectionMode$Inbound$) : connectionMode$Inbound$ != null)) {
            Option<ClusterLinkAdminClient> x$4 = ReverseClient$.MODULE$.$lessinit$greater$default$3();
            ReverseClient reverseClient = new ReverseClient(networkClient, metadataManager, x$4, clientId);
            ReverseNode.ConnectionProvider provider = node -> this.requestReverseConnection(this.nextReverseRequestId().incrementAndGet(), reverseClient, node);
            return new Some((Object)provider);
        }
        return None$.MODULE$;
    }

    /*
     * WARNING - void declaration
     */
    @Override
    public void processReverseConnection(KafkaChannel channel2, ReverseNode reverseNode) {
        void var4_4;
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(53).append("Process reverse connection in destination cluster : ").append(channel2).append(" ").append(reverseNode).toString());
        this.ensureReverseConnectionsEnabled();
        Tuple2 tuple2 = !reverseNode.requestId().isPresent() ? new Tuple2(this.reverseConnectionAdmin(), (Object)BoxesRunTime.boxToBoolean((boolean)true)) : new Tuple2((Object)Option$.MODULE$.apply((Object)this.connectionRequests().remove(reverseNode.requestId().get())), (Object)BoxesRunTime.boxToBoolean((boolean)false));
        Option remoteClient = (Option)tuple2._1();
        boolean bl = tuple2._2$mcZ$sp();
        if (var4_4 instanceof Some) {
            ReverseClient client = (ReverseClient)((Some)var4_4).value();
            Consumer<KafkaChannel> closeCallback = channel -> {
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(38).append("Reverse channel ").append(channel).append(" has been disconnected").toString());
                $this.metrics.reverseConnectionClosedSensor().record();
                this.activeReverseConnections().decrementAndGet();
                if (bl && this.persistentConnections().decrementAndGet() <= 0 && this.controller().isActive()) {
                    this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Persistent connection to source controller was disconnected, awaiting new connection.");
                    return;
                }
            };
            this.activeReverseConnections().incrementAndGet();
            if (bl) {
                this.persistentConnections().incrementAndGet();
            }
            this.metrics.reverseConnectionCreatedSensor().record();
            ReverseChannel reverseChannel = new ReverseChannel(channel2, reverseNode, closeCallback);
            client.networkClient().reverseAndAdd(reverseChannel);
            client.bootstrapWithReverseChannel(reverseChannel, this.time.milliseconds());
            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(64).append("Added reverse channel ").append(reverseChannel).append(" from source to network client, requestId=").append(reverseNode.requestId()).toString());
            return;
        }
        if (None$.MODULE$.equals(var4_4)) {
            throw new NetworkException("Reverse connection is no longer required");
        }
        throw new MatchError((Object)var4_4);
    }

    @Override
    public Seq<CompletableFuture<Void>> initiateReverseConnections(InitiateReverseConnectionsRequest initiateConnectionRequest, RequestContext requestContext) {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(70).append("Initiate or forward reverse connection request: localCluster=").append(this.super$localLogicalCluster()).append(" request=").append(initiateConnectionRequest).toString());
        this.ensureReverseConnectionsEnabled();
        InitiateReverseConnectionsRequestData connData = initiateConnectionRequest.data();
        List futures = (List)List$.MODULE$.fill(connData.entries().size(), (Function0 & Serializable & scala.Serializable)() -> new CompletableFuture());
        try {
            String string = super.localLogicalCluster();
            String string2 = connData.sourceClusterId();
            if (!(string != null ? !string.equals(string2) : string2 != null)) {
                throw new InvalidRequestException(new StringBuilder(70).append("Cannot initiate reverse connection from destination cluster ").append(super.localLogicalCluster()).append(" to itself").toString());
            }
            this.forwardToRemoteController(connData, (Seq<CompletableFuture<Void>>)futures);
        }
        catch (Throwable e) {
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Failing reverse connection request", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
            futures.foreach((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)x$2.completeExceptionally(e)));
        }
        return futures;
    }

    private void requestReverseConnection(int requestId, ReverseClient client, Node node) {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(66).append("Requesting reverse connection with requestId ").append(requestId).append(" to node ").append(node).append(" for client ").append(client.clientId()).toString());
        this.ensureReverseConnectionsEnabled();
        InitiateReverseConnectionsRequestData.EntryData entry = new InitiateReverseConnectionsRequestData.EntryData().setInitiateRequestId(requestId).setSourceBrokerId(node.id()).setTargetBrokerId(this.brokerConfig.brokerId());
        InitiateReverseConnectionsRequestData requestData = new InitiateReverseConnectionsRequestData().setClusterLinkId(this.linkId()).setForwardToBroker(true).setTimeoutMs(Predef$.MODULE$.Integer2int(this.currentConfig().reverseConnectionSetupTimeoutMs())).setSourceClusterId((String)super.linkData().clusterId().orNull(Predef$.MODULE$.$conforms())).setTargetClusterId(super.localLogicalCluster()).setEntries(Collections.singletonList(entry));
        if (this.controller().isActive()) {
            CompletableFuture future = new CompletableFuture();
            this.forwardToRemoteController(requestData, (Seq<CompletableFuture<Void>>)new .colon.colon(future, (List)Nil$.MODULE$));
            future.whenComplete((x$3, e) -> this.onCompletion$1((Throwable)e, requestId, client, node));
        } else {
            ((KafkaFutureImpl)((KafkaAdminClient)this.localAdmin).initiateReverseConnections(requestData, null).get(BoxesRunTime.boxToInteger((int)requestId))).whenComplete((x$4, e) -> this.onCompletion$1((Throwable)e, requestId, client, node));
        }
        this.connectionRequests().put(BoxesRunTime.boxToInteger((int)requestId), client);
    }

    private void forwardToRemoteController(InitiateReverseConnectionsRequestData requestData, Seq<CompletableFuture<Void>> futures) {
        KafkaAdminClient admin = (KafkaAdminClient)this.reverseConnectionAdmin().flatMap((Function1 & Serializable & scala.Serializable)x$5 -> x$5.adminClient().map((Function1 & Serializable & scala.Serializable)x$6 -> x$6.admin())).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            if (this.controller().isActive()) {
                throw new NetworkException("Request cannot be forwarded to remote controller at this time.");
            }
            throw new NotControllerException("Request cannot be forwarded to remote controller since this broker is not the controller.");
        });
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(66).append("Forward initiate reverse connection request to remote controller: ").append(requestData).toString());
        Map requestFutures = admin.initiateReverseConnections(requestData, null);
        ((IterableLike)((IterableLike)CollectionConverters$.MODULE$.asScalaBufferConverter(requestData.entries()).asScala()).zip(futures, Buffer$.MODULE$.canBuildFrom())).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            if (x0$1 == null) {
                throw new MatchError(null);
            }
            InitiateReverseConnectionsRequestData.EntryData entry = (InitiateReverseConnectionsRequestData.EntryData)x0$1._1();
            CompletableFuture future = (CompletableFuture)x0$1._2();
            KafkaFuture kafkaFuture = ((KafkaFutureImpl)requestFutures.get(BoxesRunTime.boxToInteger((int)entry.initiateRequestId()))).whenComplete((x0$2, x1$1) -> {
                Void v = x0$2;
                Throwable e = x1$1;
                if (e != null) {
                    this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(57).append("Initiate reverse connection request failed for requestId=").append(entry.initiateRequestId()).toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
                    future.completeExceptionally(e);
                    return;
                }
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(58).append("Completed InitiateReverseConnectionsRequest for requestId=").append(entry.initiateRequestId()).toString());
                future.complete(v);
            });
            return kafkaFuture;
        });
    }

    @Override
    public void onControllerChange(boolean isActiveController) {
        Object object = this.stateChangeLock();
        synchronized (object) {
            if (this.reverseConnectionAdmin().isEmpty()) {
                if (isActiveController) {
                    this.resetReverseConnectionAdmin();
                }
            } else if (!isActiveController) {
                this.closeReverseConnectionAdmin();
            }
            return;
        }
    }

    @Override
    public void closeReverseConnectionAdmin() {
        this.reverseConnectionAdmin().flatMap((Function1 & Serializable & scala.Serializable)x$7 -> x$7.adminClient()).foreach((Function1 & Serializable & scala.Serializable)admin -> {
            CoreUtils$.MODULE$.swallow((Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> admin.close(), this, CoreUtils$.MODULE$.swallow$default$3());
            return BoxedUnit.UNIT;
        });
        this.reverseConnectionAdmin_$eq((Option<ReverseClient>)None$.MODULE$);
    }

    @Override
    public void createReverseConnectionAdmin() {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Recreate admin client used to initiate connection reversal requests");
        if (this.controller().isActive()) {
            ClusterLinkAdminClient admin = (ClusterLinkAdminClient)this.remoteAdminFactory.apply((Object)this.currentConfig(), (Object)this);
            this.reverseConnectionAdmin_$eq((Option<ReverseClient>)new Some((Object)new ReverseClient(admin.networkClient(), (Option<AdminMetadataManager>)new Some((Object)admin.metadataManager()), (Option<ClusterLinkAdminClient>)new Some((Object)admin), admin.clientId())));
        }
    }

    public Option<NetworkClient> reverseConnectionClient() {
        return this.reverseConnectionAdmin().map((Function1 & Serializable & scala.Serializable)x$8 -> x$8.networkClient());
    }

    @Override
    public int persistentConnectionCount() {
        if (this.controller().isActive()) {
            return this.persistentConnections().get();
        }
        return 0;
    }

    @Override
    public int reverseConnectionCount() {
        return this.activeReverseConnections().get();
    }

    private final void onCompletion$1(Throwable e, int requestId$1, ReverseClient client$1, Node node$2) {
        if (e != null) {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(50).append("Failed to create reverse connection for requestId=").append(requestId$1).toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
            this.connectionRequests().remove(BoxesRunTime.boxToInteger((int)requestId$1));
            client$1.networkClient().processReverseConnectionFailure(node$2);
            return;
        }
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(50).append("Reverse connection has been created for requestId=").append(requestId$1).toString());
    }

    public ClusterLinkDestConnectionManager(ClusterLinkData linkData, ClusterLinkConfig initialConfig, String localLogicalCluster, Option<ClientInterceptor> clientInterceptor, ClusterLinkMetrics metrics, Function2<ClusterLinkConfig, ClusterLinkDestConnectionManager, ClusterLinkAdminClient> remoteAdminFactory, ConfluentAdmin localAdmin, KafkaController controller, KafkaConfig brokerConfig, Time time) {
        this.clientInterceptor = clientInterceptor;
        this.metrics = metrics;
        this.remoteAdminFactory = remoteAdminFactory;
        this.localAdmin = localAdmin;
        this.controller = controller;
        this.brokerConfig = brokerConfig;
        this.time = time;
        super(linkData, initialConfig, localLogicalCluster, metrics);
        this.connectionRequests = new ConcurrentHashMap();
        this.nextReverseRequestId = ClusterLinkDestConnectionManager$.MODULE$.NextReverseRequestId();
        this.persistentConnections = new AtomicInteger();
        this.activeReverseConnections = new AtomicInteger();
        this.reverseConnectionAdmin = None$.MODULE$;
        this.logIdent_$eq(new StringBuilder(44).append("[ClusterLinkDestConnectionManager-").append(super.linkData().linkName()).append("-broker-").append(brokerConfig.brokerId()).append("] ").toString());
    }
}

