/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.lang.invoke.LambdaMetafactory;
import java.util.concurrent.atomic.AtomicReference;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ConnectionMode$Inbound$;
import kafka.server.link.ConnectionMode$Outbound$;
import kafka.server.link.LinkMode$Destination$;
import kafka.server.link.LinkMode$Source$;
import kafka.utils.Logging;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ClusterLinkPausedException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.slf4j.event.Level;
import scala.Function0;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.Set;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0015g!\u0002\u0014(\u0003\u0003q\u0003\u0002C\"\u0001\u0005\u000b\u0007I\u0011\u0001#\t\u0011-\u0003!\u0011!Q\u0001\n\u0015C\u0001\u0002\u0014\u0001\u0003\u0002\u0003\u0006I!\u0014\u0005\t!\u0002\u0011)\u0019!C\u0001#\"AQ\f\u0001B\u0001B\u0003%!\u000b\u0003\u0005_\u0001\t\u0005\t\u0015!\u0003`\u0011!\u0011\u0007A!A!\u0002\u0013\u0019\u0007\"\u00024\u0001\t\u00039\u0007b\u00028\u0001\u0005\u0004%\tb\u001c\u0005\u0007w\u0002\u0001\u000b\u0011\u00029\t\u000fq\u0004!\u0019!C\t{\"9\u0011Q\u0002\u0001!\u0002\u0013q\b\"CA\b\u0001\u0001\u0007I\u0011BA\t\u0011%\t\u0019\u0002\u0001a\u0001\n\u0013\t)\u0002C\u0004\u0002\"\u0001\u0001\u000b\u0015B'\t\u0013\u0005-\u0002\u00011A\u0005\u0012\u00055\u0002\"CA\u001b\u0001\u0001\u0007I\u0011CA\u001c\u0011!\tY\u0004\u0001Q!\n\u0005=\u0002\"CA \u0001\u0001\u0007I\u0011CA\u0017\u0011%\t\t\u0005\u0001a\u0001\n#\t\u0019\u0005\u0003\u0005\u0002H\u0001\u0001\u000b\u0015BA\u0018\u0011%\tY\u0005\u0001b\u0001\n\u0003\ti\u0005\u0003\u0005\u0002t\u0001\u0001\u000b\u0011BA(\u0011\u001d\t)\b\u0001C!\u0003oB\u0001\"!\u001f\u0001\t\u0003:\u00131\u0010\u0005\b\u0003#\u0003A\u0011IAJ\u0011\u001d\tI\n\u0001C!\u0003oBq!a'\u0001\t\u0003\ni\u0003C\u0004\u0002\u001e\u0002!\t%!\u0005\t\u000f\u0005}\u0005\u0001\"\u0001\u0002\"\"9\u0011\u0011\u0016\u0001\u0005\u0012\u0005]\u0004bBAV\u0001\u0011E\u0011q\u000f\u0005\b\u0003[\u0003a\u0011CA<\u0011\u001d\ty\u000b\u0001D\t\u0003oBq!!-\u0001\t#\t9\bC\u0004\u00024\u0002!\t%!\f\t\u000f\u0005U\u0006\u0001\"\u0011\u00028\na2\t\\;ti\u0016\u0014H*\u001b8l\u0007>tg.Z2uS>tW*\u00198bO\u0016\u0014(B\u0001\u0015*\u0003\u0011a\u0017N\\6\u000b\u0005)Z\u0013AB:feZ,'OC\u0001-\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019B\u0001A\u00186{A\u0011\u0001gM\u0007\u0002c)\t!'A\u0003tG\u0006d\u0017-\u0003\u00025c\t1\u0011I\\=SK\u001a\u0004\"A\u000e\u001e\u000f\u0005]BT\"A\u0014\n\u0005e:\u0013AE\"mkN$XM\u001d'j].4\u0015m\u0019;pefL!a\u000f\u001f\u0003#\r{gN\\3di&|g.T1oC\u001e,'O\u0003\u0002:OA\u0011a(Q\u0007\u0002\u007f)\u0011\u0001iK\u0001\u0006kRLGn]\u0005\u0003\u0005~\u0012q\u0001T8hO&tw-\u0001\u0005mS:\\G)\u0019;b+\u0005)\u0005C\u0001$J\u001b\u00059%B\u0001%,\u0003\tQ8.\u0003\u0002K\u000f\ny1\t\\;ti\u0016\u0014H*\u001b8l\t\u0006$\u0018-A\u0005mS:\\G)\u0019;bA\u0005i\u0011N\\5uS\u0006d7i\u001c8gS\u001e\u0004\"a\u000e(\n\u0005=;#!E\"mkN$XM\u001d'j].\u001cuN\u001c4jO\u0006\u0019Bn\\2bY2{w-[2bY\u000ecWo\u001d;feV\t!\u000b\u0005\u0002T5:\u0011A\u000b\u0017\t\u0003+Fj\u0011A\u0016\u0006\u0003/6\na\u0001\u0010:p_Rt\u0014BA-2\u0003\u0019\u0001&/\u001a3fM&\u00111\f\u0018\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005e\u000b\u0014\u0001\u00067pG\u0006dGj\\4jG\u0006d7\t\\;ti\u0016\u0014\b%A\bnKR\fG-\u0019;b\u001b\u0006t\u0017mZ3s!\t9\u0004-\u0003\u0002bO\tQ2\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$\u0018\rZ1uC6\u000bg.Y4fe\u00069Q.\u001a;sS\u000e\u001c\bCA\u001ce\u0013\t)wE\u0001\nDYV\u001cH/\u001a:MS:\\W*\u001a;sS\u000e\u001c\u0018A\u0002\u001fj]&$h\b\u0006\u0004iS*\\G.\u001c\t\u0003o\u0001AQa\u0011\u0005A\u0002\u0015CQ\u0001\u0014\u0005A\u00025CQ\u0001\u0015\u0005A\u0002ICQA\u0018\u0005A\u0002}CQA\u0019\u0005A\u0002\r\fa\u0001\\5oW&#W#\u00019\u0011\u0005ELX\"\u0001:\u000b\u0005M$\u0018AB2p[6|gN\u0003\u0002-k*\u0011ao^\u0001\u0007CB\f7\r[3\u000b\u0003a\f1a\u001c:h\u0013\tQ(O\u0001\u0003Vk&$\u0017a\u00027j].LE\rI\u0001\u0010gR\fG/Z\"iC:<W\rT8dWV\ta\u0010E\u0002\u0000\u0003\u0013i!!!\u0001\u000b\t\u0005\r\u0011QA\u0001\u0005Y\u0006twM\u0003\u0002\u0002\b\u0005!!.\u0019<b\u0013\u0011\tY!!\u0001\u0003\r=\u0013'.Z2u\u0003A\u0019H/\u0019;f\u0007\"\fgnZ3M_\u000e\\\u0007%A\tdYV\u001cH/\u001a:MS:\\7i\u001c8gS\u001e,\u0012!T\u0001\u0016G2,8\u000f^3s\u0019&t7nQ8oM&<w\fJ3r)\u0011\t9\"!\b\u0011\u0007A\nI\"C\u0002\u0002\u001cE\u0012A!\u00168ji\"A\u0011q\u0004\b\u0002\u0002\u0003\u0007Q*A\u0002yIE\n!c\u00197vgR,'\u000fT5oW\u000e{gNZ5hA!\u001aq\"!\n\u0011\u0007A\n9#C\u0002\u0002*E\u0012\u0001B^8mCRLG.Z\u0001\u0018g>,(oY3D_:tWm\u0019;j_:,e.\u00192mK\u0012,\"!a\f\u0011\u0007A\n\t$C\u0002\u00024E\u0012qAQ8pY\u0016\fg.A\u000et_V\u00148-Z\"p]:,7\r^5p]\u0016s\u0017M\u00197fI~#S-\u001d\u000b\u0005\u0003/\tI\u0004C\u0005\u0002 E\t\t\u00111\u0001\u00020\u0005A2o\\;sG\u0016\u001cuN\u001c8fGRLwN\\#oC\ndW\r\u001a\u0011)\u0007I\t)#\u0001\u0005jg\u0006\u001bG/\u001b<f\u00031I7/Q2uSZ,w\fJ3r)\u0011\t9\"!\u0012\t\u0013\u0005}A#!AA\u0002\u0005=\u0012!C5t\u0003\u000e$\u0018N^3!Q\r)\u0012QE\u0001\f[\u0006DHj\\4MKZ,G.\u0006\u0002\u0002PA1\u0011\u0011KA0\u0003Gj!!a\u0015\u000b\t\u0005U\u0013qK\u0001\u0007CR|W.[2\u000b\t\u0005e\u00131L\u0001\u000bG>t7-\u001e:sK:$(\u0002BA/\u0003\u000b\tA!\u001e;jY&!\u0011\u0011MA*\u0005=\tEo\\7jGJ+g-\u001a:f]\u000e,\u0007\u0003BA3\u0003_j!!a\u001a\u000b\t\u0005%\u00141N\u0001\u0006KZ,g\u000e\u001e\u0006\u0004\u0003[:\u0018!B:mMRR\u0017\u0002BA9\u0003O\u0012Q\u0001T3wK2\fA\"\\1y\u0019><G*\u001a<fY\u0002\nqa\u001d;beR,\b\u000f\u0006\u0002\u0002\u0018\u0005Y!/Z2p]\u001aLw-\u001e:f)\u0019\t9\"! \u0002\u0002\"1\u0011qP\rA\u00025\u000b\u0011B\\3x\u0007>tg-[4\t\u000f\u0005\r\u0015\u00041\u0001\u0002\u0006\u0006YQ\u000f\u001d3bi\u0016$7*Z=t!\u0015\t9)!$S\u001b\t\tIIC\u0002\u0002\fF\n!bY8mY\u0016\u001cG/[8o\u0013\u0011\ty)!#\u0003\u0007M+G/\u0001\u000bp]\u00063\u0018-\u001b7bE&d\u0017\u000e^=DQ\u0006tw-\u001a\u000b\u0005\u0003/\t)\nC\u0004\u0002\u0018j\u0001\r!a\f\u0002\u0017%\u001c\u0018I^1jY\u0006\u0014G.Z\u0001\tg\",H\u000fZ8x]\u00061\u0011m\u0019;jm\u0016\fQbY;se\u0016tGoQ8oM&<\u0017\u0001\u00047j].\u001cE.^:uKJ\u001cXCAAR!\u0015\t9)!*S\u0013\u0011\t9+!#\u0003\u0007M+\u0017/A\u0010f]N,(/\u001a*fm\u0016\u00148/Z\"p]:,7\r^5p]N,e.\u00192mK\u0012\f1D]3tKR\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\\!e[&t\u0017\u0001H2sK\u0006$XMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8o\u0003\u0012l\u0017N\\\u0001\u001cG2|7/\u001a*fm\u0016\u00148/Z\"p]:,7\r^5p]\u0006#W.\u001b8\u0002+U\u0004H-\u0019;f\u0003\u000e$\u0018N^3MS:\\7i\\;oi\u0006\t\u0012n\u001d'j].\u001cun\u001c:eS:\fGo\u001c:\u0002\u001f1Lgn[\"p_J$\u0017N\\1u_J,\"!!/\u0011\u000bA\nY,a0\n\u0007\u0005u\u0016G\u0001\u0004PaRLwN\u001c\t\u0004c\u0006\u0005\u0017bAAbe\n!aj\u001c3f\u0001")
public abstract class ClusterLinkConnectionManager
implements ClusterLinkFactory.ConnectionManager,
Logging {
    private final ClusterLinkData linkData;
    private final String localLogicalCluster;
    private final ClusterLinkMetadataManager metadataManager;
    private final ClusterLinkMetrics metrics;
    private final Uuid linkId;
    private final Object stateChangeLock;
    private volatile ClusterLinkConfig clusterLinkConfig;
    private volatile boolean sourceConnectionEnabled;
    private volatile boolean isActive;
    private final AtomicReference<Level> maxLogLevel;
    private Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    @Override
    public void onControllerChange(boolean isActive) {
        ClusterLinkFactory.ConnectionManager.onControllerChange$(this, isActive);
    }

    @Override
    public void onLinkMetadataPartitionLeaderChange() {
        ClusterLinkFactory.ConnectionManager.onLinkMetadataPartitionLeaderChange$(this);
    }

    @Override
    public int persistentConnectionCount() {
        return ClusterLinkFactory.ConnectionManager.persistentConnectionCount$(this);
    }

    @Override
    public int reverseConnectionCount() {
        return ClusterLinkFactory.ConnectionManager.reverseConnectionCount$(this);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if (!this.bitmap$0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        if (!this.bitmap$0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    @Override
    public ClusterLinkData linkData() {
        return this.linkData;
    }

    @Override
    public String localLogicalCluster() {
        return this.localLogicalCluster;
    }

    public Uuid linkId() {
        return this.linkId;
    }

    public Object stateChangeLock() {
        return this.stateChangeLock;
    }

    private ClusterLinkConfig clusterLinkConfig() {
        return this.clusterLinkConfig;
    }

    private void clusterLinkConfig_$eq(ClusterLinkConfig x$1) {
        this.clusterLinkConfig = x$1;
    }

    public boolean sourceConnectionEnabled() {
        return this.sourceConnectionEnabled;
    }

    public void sourceConnectionEnabled_$eq(boolean x$1) {
        this.sourceConnectionEnabled = x$1;
    }

    public boolean isActive() {
        return this.isActive;
    }

    public void isActive_$eq(boolean x$1) {
        this.isActive = x$1;
    }

    public AtomicReference<Level> maxLogLevel() {
        return this.maxLogLevel;
    }

    @Override
    public void startup() {
        Object object = this.stateChangeLock();
        synchronized (object) {
            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Cluster link connection manager has started up.");
            if (!Predef$.MODULE$.Boolean2boolean(this.clusterLinkConfig().clusterLinkPaused())) {
                this.isActive_$eq(true);
                this.updateActiveLinkCount();
            } else {
                this.isActive_$eq(false);
            }
            this.resetReverseConnectionAdmin();
            return;
        }
    }

    @Override
    public void reconfigure(ClusterLinkConfig newConfig, Set<String> updatedKeys) {
        Object object = this.stateChangeLock();
        synchronized (object) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(74).append("Reconfiguring link connection manager with new configs updated=").append(updatedKeys).append(" newConfig=").append(newConfig.values()).toString());
            this.clusterLinkConfig_$eq(newConfig);
            if (Predef$.MODULE$.Boolean2boolean(newConfig.clusterLinkPaused())) {
                if (this.isActive()) {
                    this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Shutting down cluster link connection manager because link has been paused");
                }
                this.shutdown();
            } else {
                this.isActive_$eq(true);
                this.resetReverseConnectionAdmin();
            }
            this.updateActiveLinkCount();
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Completed reconfiguration of cluster link");
            return;
        }
    }

    @Override
    public void onAvailabilityChange(boolean isAvailable) {
        if (isAvailable) {
            this.maxLogLevel().set(null);
            return;
        }
        this.maxLogLevel().set(Level.DEBUG);
    }

    @Override
    public void shutdown() {
        Object object = this.stateChangeLock();
        synchronized (object) {
            this.isActive_$eq(false);
            this.closeReverseConnectionAdmin();
            this.updateActiveLinkCount();
            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(64).append("Shutdown of ClusterLinkConnectionManager with cluster link data ").append(this.linkData()).toString());
            return;
        }
    }

    @Override
    public boolean active() {
        return this.isActive();
    }

    @Override
    public ClusterLinkConfig currentConfig() {
        return this.clusterLinkConfig();
    }

    @Override
    public Seq<String> linkClusters() {
        return (Seq)new .colon.colon((Object)this.localLogicalCluster(), (List)Nil$.MODULE$).$plus$plus((GenTraversableOnce)Option$.MODULE$.option2Iterable(this.linkData().clusterId()).toSeq(), Seq$.MODULE$.canBuildFrom());
    }

    public void ensureReverseConnectionsEnabled() {
        if (!this.isActive() || Predef$.MODULE$.Boolean2boolean(this.clusterLinkConfig().clusterLinkPaused())) {
            throw new ClusterLinkPausedException(new StringBuilder(36).append("Cluster link ").append(this.linkData().linkName()).append(" is not active,").append(" paused=").append(this.clusterLinkConfig().clusterLinkPaused()).toString());
        }
        if (!this.sourceConnectionEnabled()) {
            throw new InvalidRequestException(new StringBuilder(46).append("Cluster link '").append(this.linkData().linkName()).append("' is not a source initiated link").toString());
        }
    }

    /*
     * Unable to fully structure code
     */
    public void resetReverseConnectionAdmin() {
        var1_1 = this.stateChangeLock();
        synchronized (var1_1) {
            block7: {
                config = this.currentConfig();
                if (!this.isActive() || Predef$.MODULE$.Boolean2boolean(config.clusterLinkPaused())) ** GOTO lbl-1000
                v0 = config.linkMode();
                var3_3 = LinkMode$Destination$.MODULE$;
                if (v0 != null ? v0.equals(var3_3) == false : var3_3 != null) break block7;
                v1 = config.connectionMode();
                var4_4 = ConnectionMode$Inbound$.MODULE$;
                if (!(v1 == null ? var4_4 != null : v1.equals(var4_4) == false)) ** GOTO lbl-1000
            }
            v2 = config.linkMode();
            var5_5 = LinkMode$Source$.MODULE$;
            if (!(v2 != null ? v2.equals(var5_5) == false : var5_5 != null)) {
                v3 = config.connectionMode();
                var6_6 = ConnectionMode$Outbound$.MODULE$;
                ** if (v3 != null ? v3.equals((Object)var6_6) == false : var6_6 != null) goto lbl-1000
            }
            ** GOTO lbl-1000
lbl-1000:
            // 2 sources

            {
                v4 = true;
                ** GOTO lbl23
            }
lbl-1000:
            // 3 sources

            {
                v4 = false;
            }
lbl23:
            // 2 sources

            this.sourceConnectionEnabled_$eq(v4);
            this.closeReverseConnectionAdmin();
            if (this.sourceConnectionEnabled()) {
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)LambdaMetafactory.altMetafactory(null, null, null, ()Ljava/lang/Object;, $anonfun$resetReverseConnectionAdmin$1(kafka.server.link.ClusterLinkConnectionManager ), ()Ljava/lang/String;)((ClusterLinkConnectionManager)this));
                this.createReverseConnectionAdmin();
            }
            return;
        }
    }

    public abstract void createReverseConnectionAdmin();

    public abstract void closeReverseConnectionAdmin();

    public void updateActiveLinkCount() {
        boolean count = this.isActive();
        this.metrics.activeLinkCountSensor().record((double)count);
    }

    @Override
    public boolean isLinkCoordinator() {
        return this.metadataManager.isLinkCoordinator(this.linkData().linkName(), this.metadataManager.isLinkCoordinator$default$2());
    }

    @Override
    public Option<Node> linkCoordinator() {
        return this.metadataManager.linkCoordinator(this.linkData().linkName());
    }

    public static final /* synthetic */ String $anonfun$resetReverseConnectionAdmin$1(ClusterLinkConnectionManager $this) {
        return new StringBuilder(62).append("Recreating reverse connection clients sourceConnectionEnabled=").append($this.sourceConnectionEnabled()).toString();
    }

    public ClusterLinkConnectionManager(ClusterLinkData linkData, ClusterLinkConfig initialConfig, String localLogicalCluster, ClusterLinkMetadataManager metadataManager, ClusterLinkMetrics metrics) {
        this.linkData = linkData;
        this.localLogicalCluster = localLogicalCluster;
        this.metadataManager = metadataManager;
        this.metrics = metrics;
        ClusterLinkFactory.ConnectionManager.$init$(this);
        Logging.$init$(this);
        this.linkId = new Uuid(linkData.linkId().getMostSignificantBits(), linkData.linkId().getLeastSignificantBits());
        this.stateChangeLock = new Object();
        this.clusterLinkConfig = initialConfig;
        this.sourceConnectionEnabled = false;
        this.isActive = true;
        this.maxLogLevel = new AtomicReference();
    }
}

