/*
 * Decompiled with CFR 0.152.
 */
package kafka.link;

import java.io.Serializable;
import java.util.Properties;
import kafka.link.ClusterLinkIntegrationTest;
import kafka.link.ClusterLinkTestHarness;
import kafka.link.ClusterLinkTestHarness$;
import kafka.log.Defaults$;
import kafka.log.LogConfig$;
import kafka.server.KafkaBroker;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkConfigDefaults$;
import kafka.utils.TestInfoUtils$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.ClusterLinkListing;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Function1;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.IterableLike;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.java8.JFunction1;

@Tag(value="integration")
@ScalaSignature(bytes="\u0006\u0001\u0005%a\u0001B\u0006\r\u0001EAQA\u0006\u0001\u0005\u0002]AQ!\u0007\u0001\u0005BiAQa\r\u0001\u0005\u0002QBQa\u0016\u0001\u0005\u0002aCQ!\u0018\u0001\u0005\nyCQa\u0018\u0001\u0005\u0002\u0001DQ!\u001a\u0001\u0005\u0002\u0019DQa\u001b\u0001\u0005\u00021DQ!\u001d\u0001\u0005\u0002IDQa\u001e\u0001\u0005\u0002a\u0014!eU8ve\u000e,\u0017J\\5uS\u0006$X\r\u001a'j].Le\u000e^3he\u0006$\u0018n\u001c8UKN$(BA\u0007\u000f\u0003\u0011a\u0017N\\6\u000b\u0003=\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u0001%A\u00111\u0003F\u0007\u0002\u0019%\u0011Q\u0003\u0004\u0002\u001b\u00072,8\u000f^3s\u0019&t7.\u00138uK\u001e\u0014\u0018\r^5p]R+7\u000f^\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003a\u0001\"a\u0005\u0001\u0002\u000bM,G/\u00169\u0015\u0005m\t\u0003C\u0001\u000f \u001b\u0005i\"\"\u0001\u0010\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0001j\"\u0001B+oSRDQA\t\u0002A\u0002\r\n\u0001\u0002^3ti&sgm\u001c\t\u0003I5j\u0011!\n\u0006\u0003M\u001d\n1!\u00199j\u0015\tA\u0013&A\u0004kkBLG/\u001a:\u000b\u0005)Z\u0013!\u00026v]&$(\"\u0001\u0017\u0002\u0007=\u0014x-\u0003\u0002/K\tAA+Z:u\u0013:4w\u000e\u000b\u0002\u0003aA\u0011A%M\u0005\u0003e\u0015\u0012!BQ3g_J,W)Y2i\u0003)\"Xm\u001d;D_:$(o\u001c7mKJ\u001c\u0005.\u00198hK^KG\u000f\u001b*fm\u0016\u00148/Z\"p]:,7\r^5p]N$\"aG\u001b\t\u000bY\u001a\u0001\u0019A\u001c\u0002\rE,xN];n!\tAtH\u0004\u0002:{A\u0011!(H\u0007\u0002w)\u0011A\bE\u0001\u0007yI|w\u000e\u001e \n\u0005yj\u0012A\u0002)sK\u0012,g-\u0003\u0002A\u0003\n11\u000b\u001e:j]\u001eT!AP\u000f)\t\r\u00195\n\u0014\t\u0003\t&k\u0011!\u0012\u0006\u0003\r\u001e\u000b\u0001\u0002\u001d:pm&$WM\u001d\u0006\u0003\u0011\u001e\na\u0001]1sC6\u001c\u0018B\u0001&F\u0005-1\u0016\r\\;f'>,(oY3\u0002\u000fM$(/\u001b8hg2\nQ*I\u0001O\u0003\tQ8\u000e\u000b\u0003\u0004!R+\u0006CA)S\u001b\u00059\u0015BA*H\u0005E\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f^\u0001\u0005]\u0006lW-I\u0001W\u0003aYH-[:qY\u0006Lh*Y7f{:\nXo\u001c:v[vZ\b'`\u0001\u0019i\u0016\u001cH\u000fV8qS\u000e\u001cuN\u001c4jONKhn\u0019*vY\u0016\u001cHCA\u000eZ\u0011\u00151D\u00011\u00018Q\u0011!1iS.-\u00035CC\u0001\u0002)U+\u0006qb/\u001a:jMf\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\\'fiJL7m\u001d\u000b\u00027\u0005AB/Z:u'>,(oY3DYV\u001cH/\u001a:SKN$\u0018M\u001d;\u0015\u0005m\t\u0007\"\u0002\u001c\u0007\u0001\u00049\u0004\u0006\u0002\u0004D\u0017\u000ed\u0013!\u0014\u0015\u0005\rA#V+\u0001\ruKN$H)Z:u\u0007>tGO]8mY\u0016\u00148\t[1oO\u0016$\"aG4\t\u000bY:\u0001\u0019A\u001c)\t\u001d\u00195*\u001b\u0017\u0002\u001b\"\"q\u0001\u0015+V\u0003e!Xm\u001d;M_\u000e\fG\u000eT5ti\u0016tWM](wKJ\u0014\u0018\u000eZ3\u0015\u0005mi\u0007\"\u0002\u001c\t\u0001\u00049\u0004\u0006\u0002\u0005D\u0017>d\u0013!\u0014\u0015\u0005\u0011A#V+\u0001\u0015uKN$H*\u001b8l-\u0006d\u0017\u000eZ1uS>tg)Y5mkJ,wJ\\*pkJ\u001cWm\u00117vgR,'\u000f\u0006\u0002\u001cg\")a'\u0003a\u0001o!\"\u0011bQ&vY\u0005i\u0005\u0006B\u0005Q)V\u000b\u0001\u0005^3ti\u0012+7o\u0019:jE\u0016\u001cv.\u001e:dKNKG-\u001a'j].\u001cuN\u001c4jOR\u00111$\u001f\u0005\u0006m)\u0001\ra\u000e\u0015\u0005\u0015\r[5\u0010L\u0001NQ\u0011Q\u0001\u000bV+)\r\u0001q\u00181AA\u0003!\t!s0C\u0002\u0002\u0002\u0015\u00121\u0001V1h\u0003\u00151\u0018\r\\;fC\t\t9!A\u0006j]R,wM]1uS>t\u0007")
public class SourceInitiatedLinkIntegrationTest
extends ClusterLinkIntegrationTest {
    @Override
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        if (TestInfoUtils$.MODULE$.isKRaft(testInfo) && this.sourceCluster() == null && this.destCluster() == null) {
            this.sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.PLAINTEXT, (Option<SecurityProtocol>)new Some((Object)SecurityProtocol.PLAINTEXT), 0, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4()));
            this.destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.PLAINTEXT, (Option<SecurityProtocol>)new Some((Object)SecurityProtocol.PLAINTEXT), 100, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4()));
        } else if (this.sourceCluster() == null && this.destCluster() == null) {
            this.sourceCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_SSL, (Option<SecurityProtocol>)new Some((Object)SecurityProtocol.PLAINTEXT), 0, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4()));
            this.destCluster_$eq(new ClusterLinkTestHarness(SecurityProtocol.SASL_PLAINTEXT, (Option<SecurityProtocol>)new Some((Object)SecurityProtocol.PLAINTEXT), 100, ClusterLinkTestHarness$.MODULE$.$lessinit$greater$default$4()));
        }
        super.setUp(testInfo);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk"})
    public void testControllerChangeWithReverseConnections(String quorum) {
        this.createClusterLink(this.linkName(), this.createClusterLink$default$2(), this.createClusterLink$default$3(), this.createClusterLink$default$4());
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$12 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        qual$1.createTopic(x$12, x$2, x$3, x$4, x$5);
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$6 = this.topic();
        short x$7 = this.replicationFactor();
        String x$8 = this.linkName();
        Map<String, String> x$9 = qual$2.linkTopic$default$4();
        String x$10 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$6, x$7, x$8, x$9, x$10);
        this.produceToSourceCluster(10);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.verifyReverseConnectionMetrics();
        KafkaBroker destController = this.destCluster().controller();
        this.destCluster().killBroker(this.destCluster().brokers().indexOf((Object)destController));
        this.produceToSourceCluster(10);
        Buffer destServers = (Buffer)this.destCluster().brokers().filter((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)SourceInitiatedLinkIntegrationTest.$anonfun$testControllerChangeWithReverseConnections$1(this, destController, x$1)));
        this.waitForMirror((Seq<KafkaBroker>)destServers, this.waitForMirror$default$2());
        KafkaBroker sourceController = this.sourceCluster().controller();
        this.sourceCluster().killBroker(this.sourceCluster().brokers().indexOf((Object)sourceController));
        this.produceToSourceCluster(10);
        this.waitForMirror((Seq<KafkaBroker>)destServers, this.waitForMirror$default$2());
        this.verifyReverseConnectionMetrics();
        this.verifyMirror(this.topic(), (Seq<KafkaBroker>)destServers, this.verifyMirror$default$3());
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk"})
    public void testTopicConfigSyncRules(String quorum) {
        String topicConfigSyncIncludeOverride = ((TraversableOnce)((TraversableOnce)CollectionConverters$.MODULE$.asScalaBufferConverter(ClusterLinkConfigDefaults$.MODULE$.TopicConfigSyncIncludeDefault()).asScala()).toSet().$plus$plus((GenTraversableOnce)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{LogConfig$.MODULE$.MinCompactionLagMsProp(), LogConfig$.MODULE$.CompressionTypeProp()}))).$minus$minus((GenTraversableOnce)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{LogConfig$.MODULE$.MaxCompactionLagMsProp()})))).mkString(",");
        Map linkConfigsToUpdate = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.TopicConfigSyncIncludeProp()), (Object)topicConfigSyncIncludeOverride), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.TopicConfigSyncMsProp()), (Object)"100")}));
        this.createClusterLink(this.linkName(), this.createClusterLink$default$2(), this.createClusterLink$default$3(), this.createClusterLink$default$4());
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = new Properties(null){
            {
                this.put(LogConfig$.MODULE$.MinCompactionLagMsProp(), "142857");
                this.put(LogConfig$.MODULE$.CompressionTypeProp(), "snappy");
            }
        };
        ListenerName x$5 = qual$1.createTopic$default$5();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5);
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$6 = this.linkName();
        Seq<KafkaBroker> x$8 = qual$2.alterClusterLink$default$3();
        qual$2.alterClusterLink(x$6, (Map<String, String>)linkConfigsToUpdate, x$8);
        ClusterLinkTestHarness qual$3 = this.destCluster();
        String x$9 = this.topic();
        short x$10 = this.replicationFactor();
        String x$11 = this.linkName();
        Map<String, String> x$12 = qual$3.linkTopic$default$4();
        String x$13 = qual$3.linkTopic$default$5();
        qual$3.linkTopic(x$9, x$10, x$11, x$12, x$13);
        this.produceToSourceCluster(10);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
        this.verifyReverseConnectionMetrics();
        Map expect = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)LogConfig$.MODULE$.MinCompactionLagMsProp()), (Object)"142857"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)LogConfig$.MODULE$.CompressionTypeProp()), (Object)"snappy"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)LogConfig$.MODULE$.MaxCompactionLagMsProp()), (Object)Long.toString(Defaults$.MODULE$.MaxCompactionLagMs()))}));
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!SourceInitiatedLinkIntegrationTest.$anonfun$testTopicConfigSyncRules$1(this, expect)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)SourceInitiatedLinkIntegrationTest.$anonfun$testTopicConfigSyncRules$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    private void verifyReverseConnectionMetrics() {
        this.verifyMetricRange$1((Seq)new .colon.colon((Object)this.sourceCluster().controller(), (List)Nil$.MODULE$), "controller-reverse-connection-count", "source", 1.0, 2.0);
        this.verifyMetricRange$1((Seq)new .colon.colon((Object)this.destCluster().controller(), (List)Nil$.MODULE$), "controller-reverse-connection-count", "destination", 1.0, 2.0);
        this.verifyMetricRange$1(this.sourceCluster().aliveServers(), "reverse-connection-count", "source", 2.0, 10.0);
        this.verifyMetricRange$1(this.destCluster().aliveServers(), "reverse-connection-count", "destination", 2.0, 10.0);
        this.verifyMetricRange$1(this.sourceCluster().aliveServers(), "reverse-connection-created-total", "source", 2.0, 1000.0);
        this.verifyMetricRange$1(this.destCluster().aliveServers(), "reverse-connection-created-total", "destination", 2.0, 1000.0);
        this.verifyMetricRange$1(this.sourceCluster().aliveServers(), "reverse-connection-closed-total", "source", 0.0, 1000.0);
        this.verifyMetricRange$1(this.destCluster().aliveServers(), "reverse-connection-closed-total", "destination", 0.0, 1000.0);
        String x$1 = "reverse-connection-failed-total";
        Seq<KafkaBroker> x$2 = this.sourceCluster().aliveServers();
        boolean x$3 = false;
        String x$4 = this.verifyKafkaMetric$default$2();
        Option<String> x$5 = this.verifyKafkaMetric$default$4();
        Map<String, String> x$6 = this.verifyKafkaMetric$default$5();
        boolean x$7 = this.verifyKafkaMetric$default$7();
        this.verifyKafkaMetric(x$1, x$4, x$3, x$5, x$6, x$2, x$7);
        Map sourceTag = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"mode"), (Object)"source")}));
        Map dstTag = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"mode"), (Object)"destination")}));
        double sourceLinks = this.totalKafkaMetricValue(this.sourceCluster().aliveServers(), "link-count", (Map<String, String>)sourceTag, this.totalKafkaMetricValue$default$4(), this.totalKafkaMetricValue$default$5());
        double sourceConns = this.totalKafkaMetricValue(this.sourceCluster().aliveServers(), "reverse-connection-count", (Map<String, String>)sourceTag, this.totalKafkaMetricValue$default$4(), this.totalKafkaMetricValue$default$5());
        double sourceCreated = this.totalKafkaMetricValue(this.sourceCluster().aliveServers(), "reverse-connection-created-total", (Map<String, String>)sourceTag, this.totalKafkaMetricValue$default$4(), this.totalKafkaMetricValue$default$5());
        double sourceClosed = this.totalKafkaMetricValue(this.sourceCluster().aliveServers(), "reverse-connection-closed-total", (Map<String, String>)sourceTag, this.totalKafkaMetricValue$default$4(), this.totalKafkaMetricValue$default$5());
        double destLinks = this.totalKafkaMetricValue(this.destCluster().aliveServers(), "link-count", (Map<String, String>)dstTag, this.totalKafkaMetricValue$default$4(), this.totalKafkaMetricValue$default$5());
        double destConns = this.totalKafkaMetricValue(this.destCluster().aliveServers(), "reverse-connection-count", (Map<String, String>)dstTag, this.totalKafkaMetricValue$default$4(), this.totalKafkaMetricValue$default$5());
        double destCreated = this.totalKafkaMetricValue(this.destCluster().aliveServers(), "reverse-connection-created-total", (Map<String, String>)dstTag, this.totalKafkaMetricValue$default$4(), this.totalKafkaMetricValue$default$5());
        double destClosed = this.totalKafkaMetricValue(this.destCluster().aliveServers(), "reverse-connection-closed-total", (Map<String, String>)dstTag, this.totalKafkaMetricValue$default$4(), this.totalKafkaMetricValue$default$5());
        SourceInitiatedLinkIntegrationTest.verifyRange$1(sourceLinks, 1.0, 0.0, "Source links vs source alive servers");
        SourceInitiatedLinkIntegrationTest.verifyRange$1(sourceConns, destConns, 2.0, "Dest vs source active connections");
        SourceInitiatedLinkIntegrationTest.verifyRange$1(sourceConns, sourceCreated - sourceClosed, 2.0, "Source active connections vs created-closed");
        SourceInitiatedLinkIntegrationTest.verifyRange$1(destLinks, 1.0, 0.0, "Dest links vs dest alive servers");
        SourceInitiatedLinkIntegrationTest.verifyRange$1(destConns, destCreated - destClosed, 2.0, "Dest active connections vs created-closed");
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk"})
    public void testSourceClusterRestart(String quorum) {
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        int x$3 = 2;
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5);
        this.produceToSourceCluster(100);
        this.createClusterLink(this.linkName(), this.createClusterLink$default$2(), this.createClusterLink$default$3(), this.createClusterLink$default$4());
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$6 = this.topic();
        short x$7 = 2;
        String x$8 = this.linkName();
        Map<String, String> x$9 = qual$2.linkTopic$default$4();
        String x$10 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$6, x$7, x$8, x$9, x$10);
        this.shutdownSource$1();
        this.restartSource$1();
        this.destCluster().deleteTopic(this.topic(), true);
        ClusterLinkTestHarness qual$3 = this.destCluster();
        String x$11 = this.topic();
        short x$12 = 2;
        String x$13 = this.linkName();
        Map<String, String> x$14 = qual$3.linkTopic$default$4();
        String x$15 = qual$3.linkTopic$default$5();
        qual$3.linkTopic(x$11, x$12, x$13, x$14, x$15);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk"})
    public void testDestControllerChange(String quorum) {
        this.createClusterLink(this.linkName(), this.createClusterLink$default$2(), this.createClusterLink$default$3(), this.createClusterLink$default$4());
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5);
        this.destCluster().changeController();
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$6 = this.topic();
        short x$7 = this.replicationFactor();
        String x$8 = this.linkName();
        Map<String, String> x$9 = qual$2.linkTopic$default$4();
        String x$10 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$6, x$7, x$8, x$9, x$10);
        this.produceToSourceCluster(10);
        this.waitForMirror(this.waitForMirror$default$1(), this.waitForMirror$default$2());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk"})
    public void testLocalListenerOverride(String quorum) {
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$12 = this.topic();
        int x$22 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        qual$1.createTopic(x$12, x$22, x$3, x$4, x$5);
        this.produceToSourceCluster(20);
        Properties sourceProps = (Properties)this.sourceLinkProps(this.sourceLinkProps$default$1()).get();
        ((IterableLike)((TraversableLike)CollectionConverters$.MODULE$.asScalaSetConverter(sourceProps.stringPropertyNames()).asScala()).filter((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)x$2.startsWith("local.")))).foreach((Function1 & Serializable & scala.Serializable)x$1 -> sourceProps.remove(x$1));
        sourceProps.setProperty(ClusterLinkConfig$.MODULE$.LocalListenerNameProp(), this.sourceCluster().interBrokerListenerName().value());
        sourceProps.setProperty(new StringBuilder(23).append("local.").append("security.protocol").toString(), this.sourceCluster().interBrokerSecurityProtocol().name);
        String x$6 = this.linkName();
        Some x$7 = new Some((Object)sourceProps);
        Properties x$8 = this.createClusterLink$default$2();
        boolean x$9 = this.createClusterLink$default$4();
        this.createClusterLink(x$6, x$8, (Option<Properties>)x$7, x$9);
        ClusterLinkTestHarness qual$2 = this.destCluster();
        String x$10 = this.topic();
        short x$11 = this.replicationFactor();
        String x$122 = this.linkName();
        Map<String, String> x$13 = qual$2.linkTopic$default$4();
        String x$14 = qual$2.linkTopic$default$5();
        qual$2.linkTopic(x$10, x$11, x$122, x$13, x$14);
        this.verifyMirror(this.topic(), this.verifyMirror$default$2(), this.verifyMirror$default$3());
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk"})
    public void testLinkValidationFailureOnSourceCluster(String quorum) {
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        String x$1 = this.topic();
        int x$2 = this.numPartitions();
        short x$3 = this.replicationFactor();
        Properties x$4 = qual$1.createTopic$default$4();
        ListenerName x$5 = qual$1.createTopic$default$5();
        qual$1.createTopic(x$1, x$2, x$3, x$4, x$5);
        Properties sourceProps = (Properties)this.sourceLinkProps(this.sourceLinkProps$default$1()).get();
        sourceProps.setProperty(ClusterLinkConfig$.MODULE$.AclSyncEnableProp(), "true");
        sourceProps.setProperty(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp(), "true");
        sourceProps.setProperty(ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp(), "true");
        sourceProps.setProperty(ClusterLinkConfig$.MODULE$.TopicFiltersProp(), this.topicFilter());
        String link2Name = "testLink2";
        String x$6 = this.linkName();
        Some x$7 = new Some((Object)sourceProps);
        Properties x$8 = this.createClusterLink$default$2();
        boolean x$9 = this.createClusterLink$default$4();
        this.createClusterLink(x$6, x$8, (Option<Properties>)x$7, x$9);
        Some x$11 = new Some((Object)sourceProps);
        Properties x$12 = this.createClusterLink$default$2();
        boolean x$13 = this.createClusterLink$default$4();
        this.createClusterLink(link2Name, x$12, (Option<Properties>)x$11, x$13);
        ClusterLinkTestHarness qual$2 = this.sourceCluster();
        String x$14 = this.linkName();
        Map x$15 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), (Object)"false")}));
        Seq<KafkaBroker> x$16 = qual$2.alterClusterLink$default$3();
        qual$2.alterClusterLink(x$14, (Map<String, String>)x$15, x$16);
        ClusterLinkTestHarness qual$3 = this.sourceCluster();
        String x$17 = this.linkName();
        Map x$18 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), (Object)"true")}));
        Seq<KafkaBroker> x$19 = qual$3.alterClusterLink$default$3();
        qual$3.alterClusterLink(x$17, (Map<String, String>)x$18, x$19);
        ClusterLinkTestHarness qual$4 = this.sourceCluster();
        String x$20 = this.linkName();
        Map x$21 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.AclSyncEnableProp()), (Object)"false")}));
        Seq<KafkaBroker> x$22 = qual$4.alterClusterLink$default$3();
        qual$4.alterClusterLink(x$20, (Map<String, String>)x$21, x$22);
        ClusterLinkTestHarness qual$5 = this.sourceCluster();
        String x$23 = this.linkName();
        Map x$24 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.AclSyncEnableProp()), (Object)"true")}));
        Seq<KafkaBroker> x$25 = qual$5.alterClusterLink$default$3();
        qual$5.alterClusterLink(x$23, (Map<String, String>)x$24, x$25);
        ClusterLinkTestHarness qual$6 = this.sourceCluster();
        String x$26 = this.linkName();
        Map x$27 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), (Object)"false")}));
        Seq<KafkaBroker> x$28 = qual$6.alterClusterLink$default$3();
        qual$6.alterClusterLink(x$26, (Map<String, String>)x$27, x$28);
        ClusterLinkTestHarness qual$7 = this.sourceCluster();
        String x$29 = this.linkName();
        Map x$30 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), (Object)"true")}));
        Seq<KafkaBroker> x$31 = qual$7.alterClusterLink$default$3();
        qual$7.alterClusterLink(x$29, (Map<String, String>)x$30, x$31);
        ClusterLinkTestHarness qual$8 = this.sourceCluster();
        Map x$33 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), (Object)"false")}));
        Seq<KafkaBroker> x$34 = qual$8.alterClusterLink$default$3();
        qual$8.alterClusterLink(link2Name, (Map<String, String>)x$33, x$34);
        ClusterLinkTestHarness qual$9 = this.sourceCluster();
        Map x$36 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.AutoMirroringEnableProp()), (Object)"true")}));
        Seq<KafkaBroker> x$37 = qual$9.alterClusterLink$default$3();
        qual$9.alterClusterLink(link2Name, (Map<String, String>)x$36, x$37);
        ClusterLinkTestHarness qual$10 = this.sourceCluster();
        Map x$39 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.AclSyncEnableProp()), (Object)"false")}));
        Seq<KafkaBroker> x$40 = qual$10.alterClusterLink$default$3();
        qual$10.alterClusterLink(link2Name, (Map<String, String>)x$39, x$40);
        ClusterLinkTestHarness qual$11 = this.sourceCluster();
        Map x$42 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.AclSyncEnableProp()), (Object)"true")}));
        Seq<KafkaBroker> x$43 = qual$11.alterClusterLink$default$3();
        qual$11.alterClusterLink(link2Name, (Map<String, String>)x$42, x$43);
        ClusterLinkTestHarness qual$12 = this.sourceCluster();
        Map x$45 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), (Object)"false")}));
        Seq<KafkaBroker> x$46 = qual$12.alterClusterLink$default$3();
        qual$12.alterClusterLink(link2Name, (Map<String, String>)x$45, x$46);
        ClusterLinkTestHarness qual$13 = this.sourceCluster();
        Map x$48 = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()), (Object)"true")}));
        Seq<KafkaBroker> x$49 = qual$13.alterClusterLink$default$3();
        qual$13.alterClusterLink(link2Name, (Map<String, String>)x$48, x$49);
        ClusterLinkTestHarness qual$14 = this.destCluster();
        boolean x$50 = qual$14.listClusterLinks$default$1();
        qual$14.listClusterLinks(x$50).foreach((Function1 & Serializable & scala.Serializable)link -> {
            SourceInitiatedLinkIntegrationTest.$anonfun$testLinkValidationFailureOnSourceCluster$1(link);
            return BoxedUnit.UNIT;
        });
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk"})
    public void testDescribeSourceSideLinkConfig(String quorum) {
        Map sourcePropOverrides = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"custom.credential"), (Object)"secret"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"local.custom.credential"), (Object)"secret")}));
        String x$1 = this.linkName();
        Option<Properties> x$2 = this.sourceLinkProps((Map<String, String>)sourcePropOverrides);
        Properties x$3 = this.createClusterLink$default$2();
        boolean x$4 = this.createClusterLink$default$4();
        this.createClusterLink(x$1, x$3, x$2, x$4);
        Config linkConfig = this.sourceCluster().describeClusterLink(this.linkName());
        ((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"sasl.jaas.config", "ssl.keystore.password", "ssl.key.password", "ssl.keystore.key", "ssl.keystore.certificate.chain", "ssl.truststore.certificates", "ssl.truststore.password", "custom.credential"}))).foreach((Function1 & Serializable & scala.Serializable)config -> {
            SourceInitiatedLinkIntegrationTest.$anonfun$testDescribeSourceSideLinkConfig$1(linkConfig, config);
            return BoxedUnit.UNIT;
        });
        new .colon.colon((Object)"ssl.truststore.type", (List)new .colon.colon((Object)"security.protocol", (List)Nil$.MODULE$)).foreach((Function1 & Serializable & scala.Serializable)config -> {
            SourceInitiatedLinkIntegrationTest.$anonfun$testDescribeSourceSideLinkConfig$2(linkConfig, config);
            return BoxedUnit.UNIT;
        });
        linkConfig.entries().forEach(entry -> {
            if (ClusterLinkConfig$.MODULE$.configKeys().get((Object)entry.name()).forall((Function1 & Serializable & scala.Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)SourceInitiatedLinkIntegrationTest.$anonfun$testDescribeSourceSideLinkConfig$4(x$3)))) {
                SourceInitiatedLinkIntegrationTest.verifySensitive$1(entry.name(), linkConfig);
            }
        });
    }

    public static final /* synthetic */ boolean $anonfun$testControllerChangeWithReverseConnections$1(SourceInitiatedLinkIntegrationTest $this, KafkaBroker destController$1, KafkaBroker x$1) {
        KafkaBroker kafkaBroker = x$1;
        KafkaBroker kafkaBroker2 = $this.destCluster().serverWithBrokerId(destController$1.config().brokerId());
        return kafkaBroker == null ? kafkaBroker2 != null : !kafkaBroker.equals(kafkaBroker2);
    }

    public static final /* synthetic */ boolean $anonfun$testTopicConfigSyncRules$1(SourceInitiatedLinkIntegrationTest $this, Map expect$1) {
        return $this.destCluster().describeTopicConfigEquals($this.topic(), (Map<String, String>)expect$1);
    }

    public static final /* synthetic */ String $anonfun$testTopicConfigSyncRules$2() {
        return "min.compaction.lag.ms, compression.type should sync, max.compaction.lag.ms shouldn't sync";
    }

    private final void verifyMetricRange$1(Seq servers, String name, String mode, double minValue, double maxValue) {
        double value = this.kafkaMetricValue((Seq<KafkaBroker>)servers, name, (Map<String, String>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"mode"), (Object)mode)}))), this.kafkaMetricValue$default$4(), this.kafkaMetricValue$default$5());
        Assertions.assertTrue((value >= minValue ? 1 : 0) != 0, (String)new StringBuilder(22).append("Metric ").append(name).append(" too low for ").append(mode).append(": ").append(value).toString());
        Assertions.assertTrue((value <= maxValue ? 1 : 0) != 0, (String)new StringBuilder(23).append("Metric ").append(name).append(" too high for ").append(mode).append(": ").append(value).toString());
    }

    private static final void verifyRange$1(double first, double second, double maxDiff, String desc) {
        Assertions.assertTrue((Math.abs(first - second) <= maxDiff ? 1 : 0) != 0, (String)new StringBuilder(25).append(desc).append(" : (").append(first).append(", ").append(second).append(") not within ").append(maxDiff).append(" range").toString());
    }

    private final void shutdownSource$1() {
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.sourceCluster().brokers().length()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> this.sourceCluster().killBroker(i - this.sourceCluster().firstBrokerId()));
    }

    private final void restartSource$1() {
        ClusterLinkTestHarness qual$1 = this.sourceCluster();
        boolean x$1 = qual$1.restartDeadBrokers$default$1();
        qual$1.restartDeadBrokers(x$1);
        this.sourceCluster().updateBootstrapServers();
    }

    public static final /* synthetic */ void $anonfun$testLinkValidationFailureOnSourceCluster$1(ClusterLinkListing link) {
        Assertions.assertTrue((boolean)link.available());
    }

    private static final void verifySensitive$1(String configName, Config linkConfig$1) {
        ConfigEntry configEntry = linkConfig$1.get(configName);
        Assertions.assertNotNull((Object)configEntry, (String)new StringBuilder(17).append("Config not found ").append(configName).toString());
        Assertions.assertNull((Object)configEntry.value(), (String)new StringBuilder(26).append("Sensitive config ").append(configName).append(" returned").toString());
        Assertions.assertTrue((boolean)configEntry.isSensitive(), (String)new StringBuilder(41).append("Sensitive config ").append(configName).append(" not marked as sensitive").toString());
    }

    private static final void verifyNotSensitive$1(String configName, Config linkConfig$1) {
        ConfigEntry configEntry = linkConfig$1.get(configName);
        Assertions.assertNotNull((Object)configEntry, (String)new StringBuilder(17).append("Config not found ").append(configName).toString());
        Assertions.assertNotNull((Object)configEntry.value(), (String)new StringBuilder(48).append("Config ").append(configName).append(" returned null, even though not sensitive").toString());
        Assertions.assertFalse((boolean)configEntry.isSensitive(), (String)new StringBuilder(36).append("Config ").append(configName).append(" marked sensitive incorrectly").toString());
    }

    public static final /* synthetic */ void $anonfun$testDescribeSourceSideLinkConfig$1(Config linkConfig$1, String config) {
        SourceInitiatedLinkIntegrationTest.verifySensitive$1(config, linkConfig$1);
        SourceInitiatedLinkIntegrationTest.verifySensitive$1(new StringBuilder(0).append(ClusterLinkConfig$.MODULE$.LocalPrefix()).append(config).toString(), linkConfig$1);
    }

    public static final /* synthetic */ void $anonfun$testDescribeSourceSideLinkConfig$2(Config linkConfig$1, String config) {
        SourceInitiatedLinkIntegrationTest.verifyNotSensitive$1(config, linkConfig$1);
        SourceInitiatedLinkIntegrationTest.verifyNotSensitive$1(new StringBuilder(0).append(ClusterLinkConfig$.MODULE$.LocalPrefix()).append(config).toString(), linkConfig$1);
    }

    public static final /* synthetic */ boolean $anonfun$testDescribeSourceSideLinkConfig$4(ConfigDef.ConfigKey x$3) {
        ConfigDef.Type type = x$3.type;
        ConfigDef.Type type2 = ConfigDef.Type.PASSWORD;
        return !(type != null ? !type.equals(type2) : type2 != null);
    }

    public SourceInitiatedLinkIntegrationTest() {
        this.useSourceInitiatedLink_$eq(true);
    }
}

