/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import kafka.server.link.ClusterLinkAutoMirroring$;
import kafka.server.link.ClusterLinkClientManager;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFilterInfo;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkScheduler;
import kafka.server.link.ClusterLinkUtils$;
import kafka.server.link.SourceCluster$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListMirrorsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewMirrorTopic;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.GenSetLike;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\t=c\u0001B\u001b7\u0001uB\u0001B\u0012\u0001\u0003\u0006\u0004%\ta\u0012\u0005\t\u0017\u0002\u0011\t\u0011)A\u0005\u0011\"AA\n\u0001BC\u0002\u0013\u0005Q\n\u0003\u0005R\u0001\t\u0005\t\u0015!\u0003O\u0011!\u0011\u0006A!A!\u0002\u0013\u0019\u0006\u0002C-\u0001\u0005\u000b\u0007I\u0011\u0001.\t\u00119\u0004!\u0011!Q\u0001\nmC\u0001b\u001c\u0001\u0003\u0002\u0003\u0006I\u0001\u001d\u0005\u0006g\u0002!\t\u0001\u001e\u0005\bw\u0002\u0001\r\u0011\"\u0003}\u0011%\t\t\u0001\u0001a\u0001\n\u0013\t\u0019\u0001C\u0004\u0002\u0010\u0001\u0001\u000b\u0015B?\t\u0013\u0005E\u0001A1A\u0005\n\u0005M\u0001\u0002CA\u001e\u0001\u0001\u0006I!!\u0006\t\u0013\u0005u\u0002A1A\u0005\n\u0005M\u0001\u0002CA \u0001\u0001\u0006I!!\u0006\t\u0013\u0005\u0005\u0003\u00011A\u0005\n\u0005\r\u0003\"CA&\u0001\u0001\u0007I\u0011BA'\u0011!\t\t\u0006\u0001Q!\n\u0005\u0015\u0003\"CA*\u0001\t\u0007I\u0011BA+\u0011\u001d\t9\u0006\u0001Q\u0001\n\u0005D\u0011\"!\u0017\u0001\u0005\u0004%\t!a\u0011\t\u0011\u0005m\u0003\u0001)A\u0005\u0003\u000bB\u0011\"!\u0018\u0001\u0001\u0004%I!a\u0018\t\u0013\u0005\u001d\u0004\u00011A\u0005\n\u0005%\u0004\u0002CA7\u0001\u0001\u0006K!!\u0019\t\u0017\u0005=\u0004\u00011AA\u0002\u0013%\u0011q\f\u0005\f\u0003c\u0002\u0001\u0019!a\u0001\n\u0013\t\u0019\bC\u0006\u0002x\u0001\u0001\r\u0011!Q!\n\u0005\u0005\u0004bBA=\u0001\u0011E\u00131\u0010\u0005\b\u0003{\u0002A\u0011BA>\u0011\u001d\ty\b\u0001C\u0005\u0003wBq!!!\u0001\t\u0013\t\u0019\tC\u0004\u0002\u0010\u0002!I!!%\t\u000f\u0005m\u0006\u0001\"\u0003\u0002>\"9\u00111\u0019\u0001\u0005\n\u0005\u0015\u0007bBAf\u0001\u0011%\u0011Q\u001a\u0005\b\u0003#\u0004A\u0011BAj\u0011\u001d\t9\u000e\u0001C\u0005\u00033Dq!a8\u0001\t\u0013\t\t\u000fC\u0004\u0002|\u0002!I!!@\t\u000f\t\u001d\u0001\u0001\"\u0003\u0003\n!9!1\u0002\u0001\u0005\n\t5\u0001\u0002\u0003B\b\u0001\u0011\u0005a'a\u0005\t\u0011\tE\u0001\u0001\"\u00017\u0003'A\u0001Ba\u0005\u0001\t\u00031\u0014qL\u0004\b\u0005+1\u0004\u0012\u0001B\f\r\u0019)d\u0007#\u0001\u0003\u001a!11\u000f\rC\u0001\u0005CA\u0011Ba\t1\u0005\u0004%\tA!\n\t\u0011\tU\u0002\u0007)A\u0005\u0005OAqAa\u000e1\t\u0003\u0011ID\u0001\rDYV\u001cH/\u001a:MS:\\\u0017)\u001e;p\u001b&\u0014(o\u001c:j]\u001eT!a\u000e\u001d\u0002\t1Lgn\u001b\u0006\u0003si\naa]3sm\u0016\u0014(\"A\u001e\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001A\u0010\t\u0003\u007f\rs!\u0001Q!\u000e\u0003YJ!A\u0011\u001c\u0002)\rcWo\u001d;fe2Kgn[*dQ\u0016$W\u000f\\3s\u0013\t!UI\u0001\u0007QKJLw\u000eZ5d)\u0006\u001c8N\u0003\u0002Cm\u0005i1\r\\5f]Rl\u0015M\\1hKJ,\u0012\u0001\u0013\t\u0003\u0001&K!A\u0013\u001c\u00031\rcWo\u001d;fe2Kgn[\"mS\u0016tG/T1oC\u001e,'/\u0001\bdY&,g\u000e^'b]\u0006<WM\u001d\u0011\u0002\u001f5,G/\u00193bi\u0006l\u0015M\\1hKJ,\u0012A\u0014\t\u0003\u0001>K!\u0001\u0015\u001c\u00035\rcWo\u001d;fe2Kgn['fi\u0006$\u0017\r^1NC:\fw-\u001a:\u0002!5,G/\u00193bi\u0006l\u0015M\\1hKJ\u0004\u0013\u0001\u00037j].$\u0015\r^1\u0011\u0005Q;V\"A+\u000b\u0005YS\u0014A\u0001>l\u0013\tAVKA\bDYV\u001cH/\u001a:MS:\\G)\u0019;b\u0003A!Wm\u001d;BI6LgNR1di>\u0014\u00180F\u0001\\!\rav,Y\u0007\u0002;*\ta,A\u0003tG\u0006d\u0017-\u0003\u0002a;\nIa)\u001e8di&|g\u000e\r\t\u0003E2l\u0011a\u0019\u0006\u0003I\u0016\fQ!\u00193nS:T!AZ4\u0002\u000f\rd\u0017.\u001a8ug*\u00111\b\u001b\u0006\u0003S*\fa!\u00199bG\",'\"A6\u0002\u0007=\u0014x-\u0003\u0002nG\nq1i\u001c8gYV,g\u000e^!e[&t\u0017!\u00053fgR\fE-\\5o\r\u0006\u001cGo\u001c:zA\u00059Q.\u001a;sS\u000e\u001c\bC\u0001!r\u0013\t\u0011hG\u0001\nDYV\u001cH/\u001a:MS:\\W*\u001a;sS\u000e\u001c\u0018A\u0002\u001fj]&$h\b\u0006\u0004vm^D\u0018P\u001f\t\u0003\u0001\u0002AQAR\u0005A\u0002!CQ\u0001T\u0005A\u00029CQAU\u0005A\u0002MCQ!W\u0005A\u0002mCQa\\\u0005A\u0002A\faaY8oM&<W#A?\u0011\u0005\u0001s\u0018BA@7\u0005E\u0019E.^:uKJd\u0015N\\6D_:4\u0017nZ\u0001\u000bG>tg-[4`I\u0015\fH\u0003BA\u0003\u0003\u0017\u00012\u0001XA\u0004\u0013\r\tI!\u0018\u0002\u0005+:LG\u000f\u0003\u0005\u0002\u000e-\t\t\u00111\u0001~\u0003\rAH%M\u0001\bG>tg-[4!\u00031i\u0017N\u001d:peR{\u0007/[2t+\t\t)\u0002\u0005\u0004\u0002\u0018\u0005\u0005\u0012QE\u0007\u0003\u00033QA!a\u0007\u0002\u001e\u00059Q.\u001e;bE2,'bAA\u0010;\u0006Q1m\u001c7mK\u000e$\u0018n\u001c8\n\t\u0005\r\u0012\u0011\u0004\u0002\u0004'\u0016$\b\u0003BA\u0014\u0003kqA!!\u000b\u00022A\u0019\u00111F/\u000e\u0005\u00055\"bAA\u0018y\u00051AH]8pizJ1!a\r^\u0003\u0019\u0001&/\u001a3fM&!\u0011qGA\u001d\u0005\u0019\u0019FO]5oO*\u0019\u00111G/\u0002\u001b5L'O]8s)>\u0004\u0018nY:!\u0003U\u0019wN\u001c4mS\u000e$\u0018N\\4EKN$Hk\u001c9jGN\facY8oM2L7\r^5oO\u0012+7\u000f\u001e+pa&\u001c7\u000fI\u0001\u0011i\u0006\u001c8n](viN$\u0018M\u001c3j]\u001e,\"!!\u0012\u0011\u0007q\u000b9%C\u0002\u0002Ju\u00131!\u00138u\u0003Q!\u0018m]6t\u001fV$8\u000f^1oI&twm\u0018\u0013fcR!\u0011QAA(\u0011%\tiAEA\u0001\u0002\u0004\t)%A\tuCN\\7oT;ugR\fg\u000eZ5oO\u0002\n\u0011\u0002Z3ti\u0006#W.\u001b8\u0016\u0003\u0005\f!\u0002Z3ti\u0006#W.\u001b8!\u0003a\u0019%+R!U\u000b~#v\nU%D'~\u0013\u0015\tV\"I?NK%,R\u0001\u001a\u0007J+\u0015\tV#`)>\u0003\u0016jQ*`\u0005\u0006#6\tS0T\u0013j+\u0005%A\u0006j]&$\u0018.\u00197ju\u0016$WCAA1!\ra\u00161M\u0005\u0004\u0003Kj&a\u0002\"p_2,\u0017M\\\u0001\u0010S:LG/[1mSj,Gm\u0018\u0013fcR!\u0011QAA6\u0011%\ti!GA\u0001\u0002\u0004\t\t'\u0001\u0007j]&$\u0018.\u00197ju\u0016$\u0007%\u0001\rtQ>,H\u000e\u001a$jYR,'/T5se>\u0014Hk\u001c9jGN\fAd\u001d5pk2$g)\u001b7uKJl\u0015N\u001d:peR{\u0007/[2t?\u0012*\u0017\u000f\u0006\u0003\u0002\u0006\u0005U\u0004\"CA\u00079\u0005\u0005\t\u0019AA1\u0003e\u0019\bn\\;mI\u001aKG\u000e^3s\u001b&\u0014(o\u001c:U_BL7m\u001d\u0011\u0002\u0007I,h\u000e\u0006\u0002\u0002b\u00059\u0012N\\5uS\u0006d\u0017N_3B]\u0012\fU\u000f^8NSJ\u0014xN]\u0001\u000bCV$x.T5se>\u0014\u0018A\u00065b]\u0012dW\rT5tiN{WO]2f)>\u0004\u0018nY:\u0015\t\u0005\u0005\u0014Q\u0011\u0005\b\u0003\u000f\u000b\u0003\u0019AAE\u0003\u0019\u0011Xm];miB\u0019!-a#\n\u0007\u000555M\u0001\tMSN$Hk\u001c9jGN\u0014Vm];mi\u0006a\u0002.\u00198eY\u0016d\u0015n\u001d;T_V\u00148-Z'jeJ|'\u000fV8qS\u000e\u001cHCBA1\u0003'\u000bY\nC\u0004\u0002\u0016\n\u0002\r!a&\u0002\rQ|\u0007/[2t!\u0019\t9#!'\u0002&%!\u00111EA\u001d\u0011\u001d\tiJ\ta\u0001\u0003?\u000b\u0011\u0003\\5ti6K'O]8sg\u001a+H/\u001e:f!\u0019\t\t+a*\u0002,6\u0011\u00111\u0015\u0006\u0004\u0003K;\u0017AB2p[6|g.\u0003\u0003\u0002*\u0006\r&aC&bM.\fg)\u001e;ve\u0016\u0004b!!,\u00028\u0006\u0015RBAAX\u0015\u0011\t\t,a-\u0002\tU$\u0018\u000e\u001c\u0006\u0003\u0003k\u000bAA[1wC&!\u0011\u0011XAX\u0005)\u0019u\u000e\u001c7fGRLwN\\\u0001\u0015Q\u0006tG\r\\3GS2$XM]3e)>\u0004\u0018nY:\u0015\t\u0005\u0005\u0014q\u0018\u0005\b\u0003\u0003\u001c\u0003\u0019AAL\u000391\u0017\u000e\u001c;fe\u0016$Gk\u001c9jGN\fA\u0003[1oI2,G*[:u\t\u0016\u001cH\u000fV8qS\u000e\u001cHCBA1\u0003\u000f\fI\rC\u0004\u0002\b\u0012\u0002\r!!#\t\u000f\u0005\u0005G\u00051\u0001\u0002\u0018\u0006aa-\u001b7uKJ$v\u000e]5dgR!\u0011qSAh\u0011\u001d\t)*\na\u0001\u0003/\u000bAb\u0019:fCR,Gk\u001c9jGN$B!!\u0019\u0002V\"9\u0011Q\u0013\u0014A\u0002\u0005]\u0015\u0001\u0007:f[>4XmU8ve\u000e,W*\u001b:s_J$v\u000e]5dgR1\u0011qSAn\u0003;Dq!!&(\u0001\u0004\t9\nC\u0004\u0002\u0012\u001d\u0002\r!a&\u0002/!\fg\u000e\u001a7f\u0005\u0006$8\r[\"sK\u0006$X\rV8qS\u000e\u001cH\u0003BA1\u0003GDq!!:)\u0001\u0004\t9/A\u0007u_BL7\rT5ti&twm\u001d\t\t\u0003[\u000bI/!\n\u0002n&!\u00111^AX\u0005\ri\u0015\r\u001d\t\u0007\u0003C\u000b9+a<\u0011\t\u0005E\u0018q_\u0007\u0003\u0003gTA!!>\u00024\u0006!A.\u00198h\u0013\u0011\tI0a=\u0003\tY{\u0017\u000eZ\u0001\u001fC\u0012$W*\u001b:s_J,G\rV8qS\u000e\fe\u000e\u001a'pO^\u000b'O\\5oON$b!!\u0002\u0002\u0000\n\r\u0001b\u0002B\u0001S\u0001\u0007\u0011QE\u0001\u0006i>\u0004\u0018n\u0019\u0005\b\u0005\u000bI\u0003\u0019AAw\u0003\u00191W\u000f^;sK\u0006y\u0011\r\u001c7MS:\\W\r\u001a+pa&\u001c7\u000f\u0006\u0002\u0002\u0018\u0006Y#/Z2pe\u00124\u0015-\u001b7fIB\u0013XMZ5yK\u0012\fU\u000f^8NSJ\u0014xN]\"sK\u0006$X-T3ue&\u001c7\u000f\u0006\u0002\u0002\u0006\u0005yq-\u001a;NSJ\u0014xN\u001d+pa&\u001c7/\u0001\rhKR\u001cuN\u001c4mS\u000e$\u0018N\\4EKN$Hk\u001c9jGN\fabZ3u\u0013:LG/[1mSj,G-\u0001\rDYV\u001cH/\u001a:MS:\\\u0017)\u001e;p\u001b&\u0014(o\u001c:j]\u001e\u0004\"\u0001\u0011\u0019\u0014\u0007A\u0012Y\u0002E\u0002]\u0005;I1Aa\b^\u0005\u0019\te.\u001f*fMR\u0011!qC\u0001\u0019C\u0012$\u0017\u000e^5p]\u0006d\u0017J\u001c;fe:\fG\u000eV8qS\u000e\u001cXC\u0001B\u0014!\u0019\u0011ICa\f\u000325\u0011!1\u0006\u0006\u0005\u0005[\ti\"A\u0005j[6,H/\u00192mK&!\u00111\u0005B\u0016!\u0011\t\tPa\r\n\t\u0005]\u00121_\u0001\u001aC\u0012$\u0017\u000e^5p]\u0006d\u0017J\u001c;fe:\fG\u000eV8qS\u000e\u001c\b%A\u0006gS2$XM\u001d+pa&\u001cG\u0003CA1\u0005w\u0011IE!\u0014\t\u000f\tuB\u00071\u0001\u0003@\u0005ya.Y7fgR{G*[:uS:<7\u000f\u0005\u0005\u0002\u0018\t\u0005\u0013Q\u0005B\"\u0013\u0011\tY/!\u0007\u0011\u0007\t\u0014)%C\u0002\u0003H\r\u0014A\u0002V8qS\u000ed\u0015n\u001d;j]\u001eDqAa\u00135\u0001\u0004\t)%A\u0005qe\u00164\u0017\u000e\u001f'f]\"9!\u0011\u0001\u001bA\u0002\u0005\u0015\u0002")
public class ClusterLinkAutoMirroring
extends ClusterLinkScheduler.PeriodicTask {
    private final ClusterLinkClientManager clientManager;
    private final ClusterLinkMetadataManager metadataManager;
    private final ClusterLinkData linkData;
    private final Function0<ConfluentAdmin> destAdminFactory;
    private final ClusterLinkMetrics metrics;
    private ClusterLinkConfig config;
    private final scala.collection.mutable.Set<String> mirrorTopics;
    private final scala.collection.mutable.Set<String> conflictingDestTopics;
    private int tasksOutstanding;
    private final ConfluentAdmin destAdmin;
    private final int CREATE_TOPICS_BATCH_SIZE;
    private boolean initialized;
    private boolean shouldFilterMirrorTopics;

    public static boolean filterTopic(scala.collection.mutable.Map<String, TopicListing> map, int n, String string) {
        return ClusterLinkAutoMirroring$.MODULE$.filterTopic(map, n, string);
    }

    public static scala.collection.immutable.Set<String> additionalInternalTopics() {
        return ClusterLinkAutoMirroring$.MODULE$.additionalInternalTopics();
    }

    public ClusterLinkClientManager clientManager() {
        return this.clientManager;
    }

    public ClusterLinkMetadataManager metadataManager() {
        return this.metadataManager;
    }

    public Function0<ConfluentAdmin> destAdminFactory() {
        return this.destAdminFactory;
    }

    private ClusterLinkConfig config() {
        return this.config;
    }

    private void config_$eq(ClusterLinkConfig x$1) {
        this.config = x$1;
    }

    private scala.collection.mutable.Set<String> mirrorTopics() {
        return this.mirrorTopics;
    }

    private scala.collection.mutable.Set<String> conflictingDestTopics() {
        return this.conflictingDestTopics;
    }

    private int tasksOutstanding() {
        return this.tasksOutstanding;
    }

    private void tasksOutstanding_$eq(int x$1) {
        this.tasksOutstanding = x$1;
    }

    private ConfluentAdmin destAdmin() {
        return this.destAdmin;
    }

    public int CREATE_TOPICS_BATCH_SIZE() {
        return this.CREATE_TOPICS_BATCH_SIZE;
    }

    private boolean initialized() {
        return this.initialized;
    }

    private void initialized_$eq(boolean x$1) {
        this.initialized = x$1;
    }

    private boolean shouldFilterMirrorTopics() {
        return this.shouldFilterMirrorTopics;
    }

    private void shouldFilterMirrorTopics_$eq(boolean x$1) {
        this.shouldFilterMirrorTopics = x$1;
    }

    @Override
    public boolean run() {
        if (!this.initialized()) {
            return this.initializeAndAutoMirror();
        }
        return this.autoMirror();
    }

    private boolean initializeAndAutoMirror() {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(28).append("Initializing auto-mirroring ").append($this.linkData.linkId()).toString());
        if (this.config().clusterLinkPrefix().isEmpty() || ((String)this.config().clusterLinkPrefix().get()).isEmpty()) {
            this.shouldFilterMirrorTopics_$eq(false);
            this.initialized_$eq(true);
            this.autoMirror();
        } else {
            KafkaFuture listMirrorsFuture = this.clientManager().getAdmin().listMirrors(new ListMirrorsOptions()).result();
            this.scheduleWhenComplete(listMirrorsFuture, (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
                this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
                this.shouldFilterMirrorTopics_$eq(SourceCluster$.MODULE$.canContainMirrorTopics($this.metrics, (KafkaFuture<Collection<String>>)listMirrorsFuture));
                this.initialized_$eq(true);
                this.autoMirror();
                return this.tasksOutstanding() == 0;
            });
            this.tasksOutstanding_$eq(this.tasksOutstanding() + 1);
        }
        return this.tasksOutstanding() == 0;
    }

    private boolean autoMirror() {
        this.mirrorTopics().clear();
        this.config_$eq(this.clientManager().currentConfig());
        if (this.metadataManager().isLinkCoordinator(this.linkData.linkName(), true) && this.config().autoMirroringEnable()) {
            if (this.tasksOutstanding() != 0) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Number of outstanding tasks was not 0 at the beginning of run. Resetting to 0 and continuing on.");
                this.tasksOutstanding_$eq(0);
            }
            if (this.config().topicFilters().isEmpty()) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(72).append(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()).append(" is true but no topic filters are specified. No topics will be mirrored.").toString());
            } else {
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Attempting to list topics from source cluster");
                ListTopicsResult listTopicsResult = this.clientManager().getAdmin().listTopics();
                this.scheduleWhenComplete(listTopicsResult.namesToListings(), (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> this.handleListSourceTopics(listTopicsResult));
                this.tasksOutstanding_$eq(this.tasksOutstanding() + 1);
            }
        }
        return this.tasksOutstanding() == 0;
    }

    private boolean handleListSourceTopics(ListTopicsResult result) {
        this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
        try {
            ObjectRef filteredTopics = ObjectRef.create((Object)((scala.collection.immutable.Set)this.filterTopics((scala.collection.immutable.Set<String>)((TraversableOnce)CollectionConverters$.MODULE$.asScalaSetConverter((java.util.Set)result.names().get()).asScala()).toSet()).$minus$minus(this.allLinkedTopics())));
            Map namesToListings = (Map)result.namesToListings().get();
            int prefixLength = ((String)this.linkData.tenantPrefix().getOrElse((Function0 & Serializable & scala.Serializable)() -> "")).length() + ((String)this.config().clusterLinkPrefix().getOrElse((Function0 & Serializable & scala.Serializable)() -> "")).length();
            ((scala.collection.immutable.Set)filteredTopics.elem).foreach((Function1 & Serializable & scala.Serializable)topicName -> {
                ClusterLinkAutoMirroring.$anonfun$handleListSourceTopics$3(this, namesToListings, prefixLength, filteredTopics, topicName);
                return BoxedUnit.UNIT;
            });
            if (!this.shouldFilterMirrorTopics()) {
                this.handleFilteredTopics((scala.collection.immutable.Set<String>)((scala.collection.immutable.Set)filteredTopics.elem));
            } else {
                KafkaFuture listMirrorsFuture = this.clientManager().getAdmin().listMirrors(new ListMirrorsOptions()).result();
                this.scheduleWhenComplete(listMirrorsFuture, (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> this.handleListSourceMirrorTopics((scala.collection.immutable.Set<String>)((scala.collection.immutable.Set)filteredTopics$1.elem), (KafkaFuture<Collection<String>>)listMirrorsFuture));
                this.tasksOutstanding_$eq(this.tasksOutstanding() + 1);
            }
        }
        catch (ExecutionException ex) {
            this.metrics.autoMirrorListTopicsFromSourceFailedSensor().record();
            Throwable throwable = ex.getCause();
            if (throwable instanceof AuthorizationException) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(107).append("Unable to list topics on the source cluster. Please enable DESCRIBE ACLs on the source cluster to proceed. ").append(ex.getCause()).toString());
            }
            if (throwable != null) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(102).append("Unable to list topics on the source cluster due to unexpected exception. Topics will not be mirrored. ").append(ex).toString());
            }
            throw new MatchError(null);
        }
        catch (Throwable ex) {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(102).append("Unable to list topics on the source cluster due to unexpected exception. Topics will not be mirrored. ").append(ex).toString());
            this.metrics.autoMirrorListTopicsFromSourceFailedSensor().record();
        }
        return this.tasksOutstanding() == 0;
    }

    private boolean handleListSourceMirrorTopics(scala.collection.immutable.Set<String> topics, KafkaFuture<Collection<String>> listMirrorsFuture) {
        this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
        try {
            scala.collection.immutable.Set<String> filteredTopics = this.removeSourceMirrorTopics(topics, (scala.collection.immutable.Set<String>)((TraversableOnce)CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)listMirrorsFuture.get()).asScala()).toSet());
            this.handleFilteredTopics(filteredTopics);
        }
        catch (ExecutionException ex) {
            this.metrics.autoMirrorListMirrorsFromSourceFailedSensor().record();
            Throwable throwable = ex.getCause();
            if (throwable instanceof AuthorizationException) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(116).append("Unable to list mirrors on the source cluster. Please enable Describe:Cluster ACLs on the source cluster to proceed. ").append(ex.getCause()).toString());
            }
            if (throwable != null) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(103).append("Unable to list mirrors on the source cluster due to unexpected exception. Topics will not be mirrored. ").append(ex).toString());
            }
            throw new MatchError(null);
        }
        catch (Throwable ex) {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(103).append("Unable to list mirrors on the source cluster due to unexpected exception. Topics will not be mirrored. ").append(ex).toString());
            this.metrics.autoMirrorListMirrorsFromSourceFailedSensor().record();
        }
        return this.tasksOutstanding() == 0;
    }

    private boolean handleFilteredTopics(scala.collection.immutable.Set<String> filteredTopics) {
        this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(64).append("Will attempt to mirror following topics on destination cluster: ").append(filteredTopics).toString());
        if (((TraversableOnce)this.conflictingDestTopics().intersect(filteredTopics)).nonEmpty()) {
            ListTopicsResult listTopicsResult = this.destAdmin().listTopics();
            this.scheduleWhenComplete(listTopicsResult.namesToListings(), (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> this.handleListDestTopics(listTopicsResult, filteredTopics));
            this.tasksOutstanding_$eq(this.tasksOutstanding() + 1);
        } else {
            if (this.conflictingDestTopics().nonEmpty()) {
                this.conflictingDestTopics().clear();
            }
            this.createTopics(filteredTopics);
        }
        return this.tasksOutstanding() == 0;
    }

    private boolean handleListDestTopics(ListTopicsResult result, scala.collection.immutable.Set<String> filteredTopics) {
        this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
        try {
            java.util.Set destTopics = (java.util.Set)result.names().get();
            if (this.conflictingDestTopics().nonEmpty()) {
                this.conflictingDestTopics().clear();
            }
            this.conflictingDestTopics().$plus$plus$eq((TraversableOnce)((GenSetLike)CollectionConverters$.MODULE$.asScalaSetConverter(destTopics).asScala()).intersect(filteredTopics));
            if (this.conflictingDestTopics().nonEmpty()) {
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(51).append("Found following conflicting topics on destination: ").append(this.conflictingDestTopics()).toString());
            }
            this.conflictingDestTopics().foreach((Function1 & Serializable & scala.Serializable)topic -> {
                ClusterLinkAutoMirroring.$anonfun$handleListDestTopics$2(this, topic);
                return BoxedUnit.UNIT;
            });
            scala.collection.immutable.Set topicsToCreate = (scala.collection.immutable.Set)filteredTopics.$minus$minus(this.conflictingDestTopics());
            this.createTopics((scala.collection.immutable.Set<String>)topicsToCreate);
        }
        catch (Throwable ex) {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(78).append("Unable to list topics on destination cluster to check for conflicting topics. ").append(ex).toString());
            this.metrics.autoMirrorListTopicsFromDestinationFailedSensor().record();
            this.createTopics(filteredTopics);
        }
        return this.tasksOutstanding() == 0;
    }

    /*
     * WARNING - void declaration
     */
    private scala.collection.immutable.Set<String> filterTopics(scala.collection.immutable.Set<String> topics) {
        void var4_4;
        void var3_3;
        this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(52).append("Filtering source topics ").append(topics).append(" to match topic filters JSON").toString());
        Tuple2<scala.collection.immutable.Set<String>, Seq<ClusterLinkFilterInfo>> tuple2 = ClusterLinkUtils$.MODULE$.doFilter(topics, this.config().topicFilters(), this.linkData.tenantPrefix(), this.config().clusterLinkPrefix());
        if (tuple2 == null) {
            throw new MatchError(null);
        }
        scala.collection.immutable.Set filtered = (scala.collection.immutable.Set)tuple2._1();
        Seq unusedFilters = (Seq)tuple2._2();
        void filtered2 = var3_3;
        var4_4.foreach((Function1 & Serializable & scala.Serializable)unusedFilter -> {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(197).append("The filter ").append(unusedFilter).append(" does not match any source topic. This filter may not be ").append("required or the topics it referred to may not have the correct DESCRIBE ACL ").append("for the cluster link principal on the source cluster.").toString());
            return BoxedUnit.UNIT;
        });
        return filtered2;
    }

    private boolean createTopics(scala.collection.immutable.Set<String> topics) {
        int clusterLinkPrefixLength = ((String)this.config().clusterLinkPrefix().getOrElse((Function0 & Serializable & scala.Serializable)() -> "")).length();
        int tenantPrefixLength = ((String)this.linkData.tenantPrefix().getOrElse((Function0 & Serializable & scala.Serializable)() -> "")).length();
        ((scala.collection.immutable.Set)topics.map((Function1 & Serializable & scala.Serializable)topic -> {
            String mirrorTopicName = clusterLinkPrefixLength > 0 ? new StringBuilder(0).append(topic.substring(0, tenantPrefixLength)).append(topic.substring(tenantPrefixLength + clusterLinkPrefixLength)).toString() : topic;
            return new NewTopic(topic, Optional.empty(), Optional.empty()).mirror(Optional.of(new NewMirrorTopic($this.linkData.linkName(), mirrorTopicName)));
        }, Set$.MODULE$.canBuildFrom())).grouped(this.CREATE_TOPICS_BATCH_SIZE()).foreach((Function1 & Serializable & scala.Serializable)batch -> {
            ClusterLinkAutoMirroring.$anonfun$createTopics$4(this, batch);
            return BoxedUnit.UNIT;
        });
        return this.tasksOutstanding() == 0;
    }

    /*
     * WARNING - void declaration
     */
    private scala.collection.immutable.Set<String> removeSourceMirrorTopics(scala.collection.immutable.Set<String> topics, scala.collection.immutable.Set<String> mirrorTopics) {
        void var3_3;
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(24).append("Mirror topics at source ").append(mirrorTopics).toString());
        scala.collection.immutable.Set filteredTopics = (scala.collection.immutable.Set)topics.filter((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkAutoMirroring.$anonfun$removeSourceMirrorTopics$2(mirrorTopics, x$2)));
        scala.collection.immutable.Set filteredOutTopics = (scala.collection.immutable.Set)topics.$minus$minus((GenTraversableOnce)filteredTopics);
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(67).append("Filtering out ").append(filteredOutTopics).append(" because they are mirror topics at the source cluster").toString());
        if (filteredOutTopics.nonEmpty()) {
            this.metrics.prefixedAutoMirrorTopicFilteredSensor().record((double)filteredOutTopics.size());
        } else {
            this.metrics.prefixedAutoMirrorTopicFilteredSensor().record(0.0);
        }
        return var3_3;
    }

    private boolean handleBatchCreateTopics(Map<String, KafkaFuture<Void>> topicListings) {
        this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
        topicListings.forEach((topic, future) -> this.addMirroredTopicAndLogWarnings((String)topic, (KafkaFuture<Void>)future));
        return this.tasksOutstanding() == 0;
    }

    private void addMirroredTopicAndLogWarnings(String topic, KafkaFuture<Void> future) {
        try {
            future.get();
            this.mirrorTopics().$plus$eq((Object)topic);
            this.metrics.autoMirrorCreateSensor().record();
            if (!this.config().clusterLinkPrefix().forall((Function1 & Serializable & scala.Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)x$3.isEmpty()))) {
                this.metrics.prefixedAutoMirrorCreateSensor().record();
            }
        }
        catch (ExecutionException ex) {
            Throwable throwable = ex.getCause();
            if (throwable instanceof TopicExistsException) {
                this.conflictingDestTopics().add((Object)topic);
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(76).append("Topic ").append(topic).append(" already exists on destination cluster. Will not mirror source").append(" topic. ").append(ex.getCause()).toString());
                this.metrics.autoMirrorCreateFailedSensor().record();
                this.recordFailedPrefixedAutoMirrorCreateMetrics();
                return;
            }
            if (throwable instanceof AuthorizationException) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(87).append("Unable to create topic ").append(topic).append(". Please allow CREATE access on destination cluster to proceed. ").append(ex.getCause()).toString());
                this.metrics.autoMirrorCreateFailedSensor().record();
                this.recordFailedPrefixedAutoMirrorCreateMetrics();
                return;
            }
            if (throwable != null) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(48).append("Unable to create topic ").append(topic).append(" on destination cluster. ").append(ex).toString());
                this.metrics.autoMirrorCreateFailedSensor().record();
                this.recordFailedPrefixedAutoMirrorCreateMetrics();
                return;
            }
            throw new MatchError(null);
        }
        catch (Throwable ex) {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(48).append("Unable to create topic ").append(topic).append(" on destination cluster. ").append(ex).toString());
            this.metrics.autoMirrorCreateFailedSensor().record();
            this.recordFailedPrefixedAutoMirrorCreateMetrics();
        }
    }

    private scala.collection.immutable.Set<String> allLinkedTopics() {
        return this.metadataManager().mirrorTopicsForLink(this.linkData.linkName()).keySet().toSet();
    }

    private void recordFailedPrefixedAutoMirrorCreateMetrics() {
        if (!this.config().clusterLinkPrefix().forall((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)x$4.isEmpty()))) {
            this.metrics.prefixedAutoMirrorCreateFailedSensor().record();
        }
    }

    public scala.collection.mutable.Set<String> getMirrorTopics() {
        return this.mirrorTopics();
    }

    public scala.collection.mutable.Set<String> getConflictingDestTopics() {
        return this.conflictingDestTopics();
    }

    public boolean getInitialized() {
        return this.initialized();
    }

    public static final /* synthetic */ void $anonfun$handleListSourceTopics$3(ClusterLinkAutoMirroring $this, Map namesToListings$1, int prefixLength$1, ObjectRef filteredTopics$1, String topicName) {
        if (ClusterLinkAutoMirroring$.MODULE$.filterTopic((scala.collection.mutable.Map<String, TopicListing>)((scala.collection.mutable.Map)CollectionConverters$.MODULE$.mapAsScalaMapConverter(namesToListings$1).asScala()), prefixLength$1, topicName)) {
            $this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(50).append("Internal or Confluent topic ").append(topicName).append(" will not be mirrored.").toString());
            filteredTopics$1.elem = (scala.collection.immutable.Set)((scala.collection.immutable.Set)filteredTopics$1.elem).$minus((Object)topicName);
        }
    }

    public static final /* synthetic */ void $anonfun$handleListDestTopics$2(ClusterLinkAutoMirroring $this, String topic) {
        $this.metrics.autoMirrorCreateFailedSensor().record();
        $this.recordFailedPrefixedAutoMirrorCreateMetrics();
    }

    public static final /* synthetic */ void $anonfun$createTopics$4(ClusterLinkAutoMirroring $this, scala.collection.immutable.Set batch) {
        $this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(50).append("Creating following topics on destination cluster: ").append(batch).toString());
        CreateTopicsResult result = $this.destAdmin().createTopics((Collection)CollectionConverters$.MODULE$.setAsJavaSetConverter((Set)batch).asJava());
        $this.scheduleWhenComplete(result.all(), (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> $this.handleBatchCreateTopics(result.values()));
        $this.tasksOutstanding_$eq($this.tasksOutstanding() + 1);
    }

    public static final /* synthetic */ boolean $anonfun$removeSourceMirrorTopics$2(scala.collection.immutable.Set mirrorTopics$1, String x$2) {
        return !mirrorTopics$1.contains((Object)x$2);
    }

    public ClusterLinkAutoMirroring(ClusterLinkClientManager clientManager, ClusterLinkMetadataManager metadataManager, ClusterLinkData linkData, Function0<ConfluentAdmin> destAdminFactory, ClusterLinkMetrics metrics) {
        this.clientManager = clientManager;
        this.metadataManager = metadataManager;
        this.linkData = linkData;
        this.destAdminFactory = destAdminFactory;
        this.metrics = metrics;
        super(clientManager.scheduler(), "ClusterLinkAutoMirroring", (int)Predef$.MODULE$.Long2long(clientManager.currentConfig().metadataMaxAgeMs()));
        this.config = clientManager.currentConfig();
        this.mirrorTopics = scala.collection.mutable.Set$.MODULE$.empty();
        this.conflictingDestTopics = scala.collection.mutable.Set$.MODULE$.empty();
        this.tasksOutstanding = 0;
        this.destAdmin = (ConfluentAdmin)destAdminFactory.apply();
        this.CREATE_TOPICS_BATCH_SIZE = 100;
        this.initialized = false;
    }
}

