/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import kafka.server.MetadataCache;
import kafka.server.link.ClusterLinkClientManager;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkFailed;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkScheduler;
import kafka.server.link.ClusterLinkStopMirrorTopic;
import kafka.server.link.ClusterLinkStopMirrorTopic$;
import kafka.server.link.LocalClusterLinkAdminClient;
import kafka.server.metadata.KRaftMetadataCache;
import kafka.utils.CoreUtils$;
import kafka.utils.Logging;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicDelta;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.MirrorTopic;
import org.apache.kafka.raft.LeaderAndEpoch;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.IterableLike;
import scala.collection.MapLike;
import scala.collection.TraversableLike;
import scala.collection.immutable.Set;
import scala.compat.java8.OptionConverters;
import scala.compat.java8.OptionConverters$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\u0005mb\u0001\u0002\t\u0012\u0001aA\u0001b\f\u0001\u0003\u0006\u0004%\t\u0001\r\u0005\ti\u0001\u0011\t\u0011)A\u0005c!AQ\u0007\u0001BC\u0002\u0013\u0005a\u0007\u0003\u0005;\u0001\t\u0005\t\u0015!\u00038\u0011\u0015Y\u0004\u0001\"\u0001=\u0011\u001d\u0001\u0005A1A\u0005\n\u0005Ca!\u0012\u0001!\u0002\u0013\u0011\u0005b\u0002$\u0001\u0005\u0004%Ia\u0012\u0005\u0007)\u0002\u0001\u000b\u0011\u0002%\t\u000bU\u0003A\u0011\t,\t\u000b1\u0004A\u0011I7\t\u000f\u0005\r\u0001\u0001\"\u0011\u0002\u0006!9\u0011\u0011\u0002\u0001\u0005\n\u0005-\u0001bBA\f\u0001\u0011%\u0011\u0011\u0004\u0005\b\u0003W\u0001A\u0011BA\u0017\u0005\u0001\u001aE.^:uKJd\u0015N\\6NKR\fG-\u0019;b\u00136\fw-\u001a'jgR,g.\u001a:\u000b\u0005I\u0019\u0012\u0001\u00027j].T!\u0001F\u000b\u0002\rM,'O^3s\u0015\u00051\u0012!B6bM.\f7\u0001A\n\u0005\u0001e\t\u0013\u0006\u0005\u0002\u001b?5\t1D\u0003\u0002\u001d;\u0005!A.\u00198h\u0015\u0005q\u0012\u0001\u00026bm\u0006L!\u0001I\u000e\u0003\r=\u0013'.Z2u!\t\u0011cE\u0004\u0002$I5\t\u0011#\u0003\u0002&#\u0005\u00112\t\\;ti\u0016\u0014H*\u001b8l\r\u0006\u001cGo\u001c:z\u0013\t9\u0003FA\rMS:\\W*\u001a;bI\u0006$\u0018-S7bO\u0016d\u0015n\u001d;f]\u0016\u0014(BA\u0013\u0012!\tQS&D\u0001,\u0015\taS#A\u0003vi&d7/\u0003\u0002/W\t9Aj\\4hS:<\u0017a\u00037j].l\u0015M\\1hKJ,\u0012!\r\t\u0003GIJ!aM\t\u0003%\rcWo\u001d;fe2Kgn['b]\u0006<WM]\u0001\rY&t7.T1oC\u001e,'\u000fI\u0001\u0010[\u0016$\u0018\rZ1uC6\u000bg.Y4feV\tq\u0007\u0005\u0002$q%\u0011\u0011(\u0005\u0002\u001b\u00072,8\u000f^3s\u0019&t7.T3uC\u0012\fG/Y'b]\u0006<WM]\u0001\u0011[\u0016$\u0018\rZ1uC6\u000bg.Y4fe\u0002\na\u0001P5oSRtDcA\u001f?\u007fA\u00111\u0005\u0001\u0005\u0006_\u0015\u0001\r!\r\u0005\u0006k\u0015\u0001\raN\u0001\ng\u000eDW\rZ;mKJ,\u0012A\u0011\t\u0003G\rK!\u0001R\t\u0003)\rcWo\u001d;fe2Kgn[*dQ\u0016$W\u000f\\3s\u0003)\u00198\r[3ek2,'\u000fI\u0001\u0013WJ\fg\r^'fi\u0006$\u0017\r^1DC\u000eDW-F\u0001I!\rIEJT\u0007\u0002\u0015*\t1*A\u0003tG\u0006d\u0017-\u0003\u0002N\u0015\n1q\n\u001d;j_:\u0004\"a\u0014*\u000e\u0003AS!!U\n\u0002\u00115,G/\u00193bi\u0006L!a\u0015)\u0003%-\u0013\u0016M\u001a;NKR\fG-\u0019;b\u0007\u0006\u001c\u0007.Z\u0001\u0014WJ\fg\r^'fi\u0006$\u0017\r^1DC\u000eDW\rI\u0001\u0016_:lU\r^1eCR\f\u0017*\\1hKV\u0003H-\u0019;f)\r9&l\u001a\t\u0003\u0013bK!!\u0017&\u0003\tUs\u0017\u000e\u001e\u0005\u00067*\u0001\r\u0001X\u0001\u000e[\u0016$\u0018\rZ1uC\u0012+G\u000e^1\u0011\u0005u+W\"\u00010\u000b\u0005}\u0003\u0017!B5nC\u001e,'B\u0001\fb\u0015\t\u00117-\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002I\u0006\u0019qN]4\n\u0005\u0019t&!D'fi\u0006$\u0017\r^1EK2$\u0018\rC\u0003i\u0015\u0001\u0007\u0011.\u0001\toK^lU\r^1eCR\f\u0017*\\1hKB\u0011QL[\u0005\u0003Wz\u0013Q\"T3uC\u0012\fG/Y%nC\u001e,\u0017\u0001I8o\u00072,8\u000f^3s\u0019&t7nQ8pe\u0012Lg.\u0019;pe\u0016cWm\u0019;j_:$\"a\u00168\t\u000b=\\\u0001\u0019\u00019\u0002\u00131Lgn[%e'\u0016$\bcA9yw:\u0011!O\u001e\t\u0003g*k\u0011\u0001\u001e\u0006\u0003k^\ta\u0001\u0010:p_Rt\u0014BA<K\u0003\u0019\u0001&/\u001a3fM&\u0011\u0011P\u001f\u0002\u0004'\u0016$(BA<K!\tax0D\u0001~\u0015\tq\b-\u0001\u0004d_6lwN\\\u0005\u0004\u0003\u0003i(\u0001B+vS\u0012\f1e\u001c8DYV\u001cH/\u001a:MS:\\7i\\8sI&t\u0017\r^8s%\u0016\u001c\u0018n\u001a8bi&|g\u000eF\u0002X\u0003\u000fAQa\u001c\u0007A\u0002A\fAf\u00195fG.4uN\u001d)f]\u0012LgnZ*u_B\u0004X\rZ'jeJ|'o]!oI\u0016CXmY;uKR\u000b7o[:\u0015\u000f]\u000bi!!\u0005\u0002\u0016!1\u0011qB\u0007A\u0002m\fa\u0001\\5oW&#\u0007BB.\u000e\u0001\u0004\t\u0019\u0002E\u0002J\u0019rCQaX\u0007A\u0002%\f\u0011f\u00195fG.Le-T5se>\u00148\u000b^1uK\u000eC\u0017M\\4fIR{\u0007+\u001a8eS:<7\u000b^8qa\u0016$G\u0003BA\u000e\u0003C\u00012!SA\u000f\u0013\r\tyB\u0013\u0002\b\u0005>|G.Z1o\u0011\u001d\t\u0019C\u0004a\u0001\u0003K\tQ\u0001Z3mi\u0006\u00042!XA\u0014\u0013\r\tIC\u0018\u0002\u000b)>\u0004\u0018n\u0019#fYR\f\u0017!\u0005:v]N#x\u000e]'jeJ|'\u000fV1tWR)q+a\f\u00022!1\u0011qB\bA\u0002mDq!a\r\u0010\u0001\u0004\t)$A\u0005u_BL7MT1nKB\u0019\u0011/a\u000e\n\u0007\u0005e\"P\u0001\u0004TiJLgn\u001a")
public class ClusterLinkMetadataImageListener
implements ClusterLinkFactory.LinkMetadataImageListener,
Logging {
    private final ClusterLinkManager linkManager;
    private final ClusterLinkMetadataManager metadataManager;
    private final ClusterLinkScheduler scheduler;
    private final Option<KRaftMetadataCache> kraftMetadataCache;
    private Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    @Override
    public void onLeaderUpdate(LeaderAndEpoch leader) {
        ClusterLinkFactory.LinkMetadataImageListener.onLeaderUpdate$(this, leader);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if (!this.bitmap$0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        if (!this.bitmap$0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public ClusterLinkManager linkManager() {
        return this.linkManager;
    }

    public ClusterLinkMetadataManager metadataManager() {
        return this.metadataManager;
    }

    private ClusterLinkScheduler scheduler() {
        return this.scheduler;
    }

    private Option<KRaftMetadataCache> kraftMetadataCache() {
        return this.kraftMetadataCache;
    }

    @Override
    public void onMetadataImageUpdate(MetadataDelta metadataDelta, MetadataImage newMetadataImage) {
        this.scheduler().scheduleOnce("scheduleChangesForDeltaByClusterLinkLeaders", (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.metadataManager().getClusterLinkIdsWithOwnedLinkCoordinator().foreach((Function1 & Serializable & scala.Serializable)linkId -> {
            this.checkForPendingStoppedMirrorsAndExecuteTasks(linkId, (Option<MetadataDelta>)new Some((Object)metadataDelta), newMetadataImage);
            return BoxedUnit.UNIT;
        }));
    }

    @Override
    public void onClusterLinkCoordinatorElection(Set<Uuid> linkIdSet) {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(36).append("link coordinator elected for links: ").append(linkIdSet).toString());
        linkIdSet.foreach((Function1 & Serializable & scala.Serializable)linkId -> {
            ClusterLinkMetadataImageListener.$anonfun$onClusterLinkCoordinatorElection$2(this, linkId);
            return BoxedUnit.UNIT;
        });
    }

    @Override
    public void onClusterLinkCoordinatorResignation(Set<Uuid> linkIdSet) {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(37).append("link coordinator resigned for links: ").append(linkIdSet).toString());
        linkIdSet.foreach((Function1 & Serializable & scala.Serializable)linkId -> {
            ClusterLinkMetadataImageListener.$anonfun$onClusterLinkCoordinatorResignation$2(this, linkId);
            return BoxedUnit.UNIT;
        });
    }

    private void checkForPendingStoppedMirrorsAndExecuteTasks(Uuid linkId, Option<MetadataDelta> metadataDelta, MetadataImage image) {
        if (metadataDelta instanceof Some) {
            MetadataDelta delta = (MetadataDelta)((Some)metadataDelta).value();
            if (delta.topicsDelta() != null) {
                ((IterableLike)((MapLike)CollectionConverters$.MODULE$.mapAsScalaMapConverter(delta.topicsDelta().changedTopics()).asScala()).values().filter((Function1 & Serializable & scala.Serializable)topicDelta -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkMetadataImageListener.$anonfun$checkForPendingStoppedMirrorsAndExecuteTasks$1(this, linkId, topicDelta)))).foreach((Function1 & Serializable & scala.Serializable)topicDelta -> {
                    this.runStopMirrorTask(linkId, topicDelta.name());
                    return BoxedUnit.UNIT;
                });
                return;
            }
        } else {
            if (None$.MODULE$.equals(metadataDelta)) {
                ((IterableLike)((TraversableLike)CollectionConverters$.MODULE$.asScalaBufferConverter(image.topics().topicsByLinkId(linkId)).asScala()).filter((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkMetadataImageListener.$anonfun$checkForPendingStoppedMirrorsAndExecuteTasks$4(x$4)))).foreach((Function1 & Serializable & scala.Serializable)topic -> {
                    this.runStopMirrorTask(linkId, topic.name());
                    return BoxedUnit.UNIT;
                });
                return;
            }
            throw new MatchError(metadataDelta);
        }
    }

    private boolean checkIfMirrorStateChangedToPendingStopped(TopicDelta delta) {
        Option newMirrorState = OptionConverters.RichOptionalGeneric$.MODULE$.asScala$extension(OptionConverters$.MODULE$.RichOptionalGeneric(delta.mirrorTopicStateChange()));
        Option prevMirrorState = OptionConverters.RichOptionalGeneric$.MODULE$.asScala$extension(OptionConverters$.MODULE$.RichOptionalGeneric(delta.previousMirrorTopicState()));
        return newMirrorState.exists((Function1 & Serializable & scala.Serializable)x$6 -> BoxesRunTime.boxToBoolean((boolean)(bl = x$6 instanceof MirrorTopic.PendingStoppedMirrorTopic ? prevMirrorState.forall((Function1 & Serializable & scala.Serializable)x$7 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkMetadataImageListener.$anonfun$checkIfMirrorStateChangedToPendingStopped$2(x$7))) : false)));
    }

    private void runStopMirrorTask(Uuid linkId, String topicName) {
        Option<ClusterLinkFactory.ClientManager> option = this.linkManager().clientManager(CoreUtils$.MODULE$.toJavaUUID(linkId));
        if (option instanceof Some) {
            ClusterLinkFactory.ClientManager clientManager = (ClusterLinkFactory.ClientManager)((Some)option).value();
            if (clientManager instanceof ClusterLinkClientManager) {
                ClusterLinkClientManager clusterLinkClientManager = (ClusterLinkClientManager)clientManager;
                ConfluentAdmin destAdmin = (ConfluentAdmin)clusterLinkClientManager.destAdminFactory().apply();
                new ClusterLinkStopMirrorTopic(topicName, clusterLinkClientManager, (Function0<ConfluentAdmin>)(Function0 & Serializable & scala.Serializable)() -> destAdmin, new LocalClusterLinkAdminClient(destAdmin), ClusterLinkStopMirrorTopic$.MODULE$.$lessinit$greater$default$5()).startup();
                return;
            }
            if (clientManager instanceof ClusterLinkFailed.ClientManager) {
                this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(75).append("Cannot start 'stop topic mirror' task, cluster link '").append(linkId).append("' is in a failed state").toString());
                return;
            }
            throw new IllegalStateException(new StringBuilder(28).append("Unhandled link manager type ").append(clientManager).toString());
        }
        if (None$.MODULE$.equals(option)) {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(120).append("Unable to resolve client manager for link ID '").append(linkId).append("'. Cluster link may have ").append("been deleted or the managers are not initialized.").toString());
            return;
        }
        throw new MatchError(option);
    }

    public static final /* synthetic */ void $anonfun$onClusterLinkCoordinatorElection$3(ClusterLinkMetadataImageListener $this, Uuid linkId$1, KRaftMetadataCache cache) {
        MetadataImage image = cache.currentImage();
        $this.checkForPendingStoppedMirrorsAndExecuteTasks(linkId$1, (Option<MetadataDelta>)None$.MODULE$, image);
    }

    public static final /* synthetic */ void $anonfun$onClusterLinkCoordinatorElection$2(ClusterLinkMetadataImageListener $this, Uuid linkId) {
        try {
            $this.kraftMetadataCache().foreach((Function1 & Serializable & scala.Serializable)cache -> {
                ClusterLinkMetadataImageListener.$anonfun$onClusterLinkCoordinatorElection$3($this, linkId, cache);
                return BoxedUnit.UNIT;
            });
            $this.linkManager().connectionManager(CoreUtils$.MODULE$.toJavaUUID(linkId)).foreach((Function1 & Serializable & scala.Serializable)x$1 -> {
                x$1.onLinkMetadataPartitionLeaderChange();
                return BoxedUnit.UNIT;
            });
            return;
        }
        catch (Throwable ex) {
            $this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(51).append("Unable to process link coordinator change for link ").append(linkId).toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> ex);
            return;
        }
    }

    public static final /* synthetic */ void $anonfun$onClusterLinkCoordinatorResignation$2(ClusterLinkMetadataImageListener $this, Uuid linkId) {
        try {
            $this.linkManager().connectionManager(CoreUtils$.MODULE$.toJavaUUID(linkId)).foreach((Function1 & Serializable & scala.Serializable)x$2 -> {
                x$2.onLinkMetadataPartitionLeaderChange();
                return BoxedUnit.UNIT;
            });
            return;
        }
        catch (Throwable ex) {
            $this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(51).append("Unable to process link coordinator change for link ").append(linkId).toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> ex);
            return;
        }
    }

    public static final /* synthetic */ boolean $anonfun$checkForPendingStoppedMirrorsAndExecuteTasks$2(Uuid linkId$3, MirrorTopic x$3) {
        Uuid uuid = x$3.linkId();
        return !(uuid != null ? !uuid.equals(linkId$3) : linkId$3 != null);
    }

    public static final /* synthetic */ boolean $anonfun$checkForPendingStoppedMirrorsAndExecuteTasks$1(ClusterLinkMetadataImageListener $this, Uuid linkId$3, TopicDelta topicDelta) {
        return OptionConverters.RichOptionalGeneric$.MODULE$.asScala$extension(OptionConverters$.MODULE$.RichOptionalGeneric(topicDelta.mirrorTopicStateChange())).exists((Function1 & Serializable & scala.Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkMetadataImageListener.$anonfun$checkForPendingStoppedMirrorsAndExecuteTasks$2(linkId$3, x$3))) && $this.checkIfMirrorStateChangedToPendingStopped(topicDelta);
    }

    public static final /* synthetic */ boolean $anonfun$checkForPendingStoppedMirrorsAndExecuteTasks$5(MirrorTopic x$5) {
        MirrorTopic.State state = x$5.mirrorState();
        MirrorTopic.State state2 = MirrorTopic.State.PENDING_STOPPED;
        return !(state != null ? !state.equals(state2) : state2 != null);
    }

    public static final /* synthetic */ boolean $anonfun$checkForPendingStoppedMirrorsAndExecuteTasks$4(TopicImage x$4) {
        return OptionConverters.RichOptionalGeneric$.MODULE$.asScala$extension(OptionConverters$.MODULE$.RichOptionalGeneric(x$4.mirrorTopic())).exists((Function1 & Serializable & scala.Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkMetadataImageListener.$anonfun$checkForPendingStoppedMirrorsAndExecuteTasks$5(x$5)));
    }

    public static final /* synthetic */ boolean $anonfun$checkIfMirrorStateChangedToPendingStopped$2(MirrorTopic x$7) {
        MirrorTopic.State state = x$7.mirrorState();
        MirrorTopic.State state2 = MirrorTopic.State.PENDING_STOPPED;
        return state == null ? state2 != null : !state.equals(state2);
    }

    public ClusterLinkMetadataImageListener(ClusterLinkManager linkManager, ClusterLinkMetadataManager metadataManager) {
        None$ none$;
        this.linkManager = linkManager;
        this.metadataManager = metadataManager;
        ClusterLinkFactory.LinkMetadataImageListener.$init$(this);
        Logging.$init$(this);
        this.scheduler = linkManager.scheduler();
        MetadataCache metadataCache = metadataManager.metadataCache();
        if (metadataCache instanceof KRaftMetadataCache) {
            KRaftMetadataCache kRaftMetadataCache = (KRaftMetadataCache)metadataCache;
            none$ = new Some((Object)kRaftMetadataCache);
        } else {
            none$ = None$.MODULE$;
        }
        this.kraftMetadataCache = none$;
    }
}

