package kafka.server;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import kafka.api.LeaderAndIsr$;
import kafka.cluster.Broker;
import kafka.controller.ControllerChannelManager;
import kafka.controller.ControllerChannelManager$;
import kafka.controller.ControllerContext;
import kafka.controller.StateChangeLogger;
import kafka.utils.TestUtils$;
import kafka.zk.ZooKeeperTestHarness;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.message.StopReplicaRequestData;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractControlRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.requests.StopReplicaRequest;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.VolatileBooleanRef;

/* compiled from: BrokerEpochIntegrationTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]c\u0001B\u000b\u0017\u0001mAQA\t\u0001\u0005\u0002\rBqA\n\u0001C\u0002\u0013\u0005q\u0005\u0003\u0004/\u0001\u0001\u0006I\u0001\u000b\u0005\b_\u0001\u0011\r\u0011\"\u0001(\u0011\u0019\u0001\u0004\u0001)A\u0005Q!9\u0011\u0007\u0001a\u0001\n\u0003\u0011\u0004b\u0002\"\u0001\u0001\u0004%\ta\u0011\u0005\u0007\u0013\u0002\u0001\u000b\u0015B\u001a\t\u000b)\u0003A\u0011I&\t\u000be\u0003A\u0011I&\t\u000by\u0003A\u0011A&\t\u000b\r\u0004A\u0011A&\t\u000b\u0015\u0004A\u0011A&\t\u000b\u001d\u0004A\u0011A&\t\u000b%\u0004A\u0011A&\t\u000b-\u0004A\u0011\u00027\t\u000bI\u0004A\u0011B:\t\u000bQ\u0004A\u0011B;\t\u000by\u0004A\u0011B@\t\u000f\u0005\u0015\u0003\u0001\"\u0003\u0002H\tQ\"I]8lKJ,\u0005o\\2i\u0013:$Xm\u001a:bi&|g\u000eV3ti*\u0011q\u0003G\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003e\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u00019A\u0011Q\u0004I\u0007\u0002=)\u0011q\u0004G\u0001\u0003u.L!!\t\u0010\u0003)i{wnS3fa\u0016\u0014H+Z:u\u0011\u0006\u0014h.Z:t\u0003\u0019a\u0014N\\5u}Q\tA\u0005\u0005\u0002&\u00015\ta#A\u0005ce>\\WM]%ecU\t\u0001\u0006\u0005\u0002*Y5\t!FC\u0001,\u0003\u0015\u00198-\u00197b\u0013\ti#FA\u0002J]R\f!B\u0019:pW\u0016\u0014\u0018\nZ\u0019!\u0003%\u0011'o\\6fe&#''\u0001\u0006ce>\\WM]%ee\u0001\nqa]3sm\u0016\u00148/F\u00014!\r!Dh\u0010\b\u0003kir!AN\u001d\u000e\u0003]R!\u0001\u000f\u000e\u0002\rq\u0012xn\u001c;?\u0013\u0005Y\u0013BA\u001e+\u0003\u001d\u0001\u0018mY6bO\u0016L!!\u0010 \u0003\u0007M+\u0017O\u0003\u0002<UA\u0011Q\u0005Q\u0005\u0003\u0003Z\u00111bS1gW\u0006\u001cVM\u001d<fe\u0006Y1/\u001a:wKJ\u001cx\fJ3r)\t!u\t\u0005\u0002*\u000b&\u0011aI\u000b\u0002\u0005+:LG\u000fC\u0004I\u000f\u0005\u0005\t\u0019A\u001a\u0002\u0007a$\u0013'\u0001\u0005tKJ4XM]:!\u0003\u0015\u0019X\r^+q)\u0005!\u0005FA\u0005N!\tqu+D\u0001P\u0015\t\u0001\u0016+A\u0002ba&T!AU*\u0002\u000f),\b/\u001b;fe*\u0011A+V\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0002-\u0006\u0019qN]4\n\u0005a{%A\u0003\"fM>\u0014X-R1dQ\u0006AA/Z1s\t><h\u000e\u000b\u0002\u000b7B\u0011a\nX\u0005\u0003;>\u0013\u0011\"\u00114uKJ,\u0015m\u00195\u0002UQ,7\u000f\u001e*fa2L7-Y'b]\u0006<WM\u001d\"s_.,'/\u00129pG\"l\u0015\r^2iKN<\u0016\u000e\u001e5[W\"\u00121\u0002\u0019\t\u0003\u001d\u0006L!AY(\u0003\tQ+7\u000f^\u0001,i\u0016\u001cHoQ8oiJ|G\u000e\\3s\u0005J|7.\u001a:Fa>\u001c\u0007nQ1dQ\u0016l\u0015\r^2iKN<\u0016\u000e\u001e5[W\"\u0012A\u0002Y\u0001)i\u0016\u001cHoQ8oiJ|GNU3rk\u0016\u001cHoV5uQ\u000e{'O]3di\n\u0013xn[3s\u000bB|7\r\u001b\u0015\u0003\u001b\u0001\fa\u0005^3ti\u000e{g\u000e\u001e:pYJ+\u0017/^3ti^KG\u000f[*uC2,'I]8lKJ,\u0005o\\2iQ\tq\u0001-\u0001\u0014uKN$8i\u001c8ue>d'+Z9vKN$x+\u001b;i\u001d\u0016<XM\u001d\"s_.,'/\u00129pG\"D#a\u00041\u0002CQ,7\u000f^\"p]R\u0014x\u000e\u001c*fcV,7\u000f^,ji\"\u0014%o\\6fe\u0016\u0003xn\u00195\u0015\u0005\u0011k\u0007\"\u00028\u0011\u0001\u0004y\u0017AI3q_\u000eD\u0017J\u001c*fcV,7\u000f\u001e#jM\u001a4%o\\7DkJ\u0014XM\u001c;Fa>\u001c\u0007\u000e\u0005\u0002*a&\u0011\u0011O\u000b\u0002\u0005\u0019>tw-A\u0007hKR\u001cuN\u001c;s_2dWM]\u000b\u0002\u007f\u0005i3\r[3dW\u000e{g\u000e\u001e:pY2,'O\u0011:pW\u0016\u0014X\t]8dQN\u001c\u0015m\u00195f\u001b\u0006$8\r[3t/&$\bNW6\u0015\u0005\u00113\b\"B<\u0013\u0001\u0004A\u0018!E2p]R\u0014x\u000e\u001c7fe\u000e{g\u000e^3yiB\u0011\u0011\u0010`\u0007\u0002u*\u00111\u0010G\u0001\u000bG>tGO]8mY\u0016\u0014\u0018BA?{\u0005E\u0019uN\u001c;s_2dWM]\"p]R,\u0007\u0010^\u0001(g\u0016tG-\u00118e-\u0016\u0014\u0018NZ=Ti\u0006dWM\u0011:pW\u0016\u0014X\t]8dQ&s'+Z:q_:\u001cX\rF\u0003E\u0003\u0003\tY\u0001C\u0004\u0002\u0004M\u0001\r!!\u0002\u00021\r|g\u000e\u001e:pY2,'o\u00115b]:,G.T1oC\u001e,'\u000fE\u0002z\u0003\u000fI1!!\u0003{\u0005a\u0019uN\u001c;s_2dWM]\"iC:tW\r\\'b]\u0006<WM\u001d\u0005\b\u0003\u001b\u0019\u0002\u0019AA\b\u0003\u001d\u0011W/\u001b7eKJ\u0004D!!\u0005\u00024A1\u00111CA\u0015\u0003_qA!!\u0006\u0002&5\u0011\u0011q\u0003\u0006\u0005\u00033\tY\"\u0001\u0005sKF,Xm\u001d;t\u0015\u0011\ti\"a\b\u0002\r\r|W.\\8o\u0015\rI\u0012\u0011\u0005\u0006\u0004\u0003G)\u0016AB1qC\u000eDW-\u0003\u0003\u0002(\u0005]\u0011AF!cgR\u0014\u0018m\u0019;D_:$(o\u001c7SKF,Xm\u001d;\n\t\u0005-\u0012Q\u0006\u0002\b\u0005VLG\u000eZ3s\u0015\u0011\t9#a\u0006\u0011\t\u0005E\u00121\u0007\u0007\u0001\t1\t)$a\u0003\u0002\u0002\u0003\u0005)\u0011AA\u001c\u0005\ryF%M\t\u0005\u0003s\ty\u0004E\u0002*\u0003wI1!!\u0010+\u0005\u001dqu\u000e\u001e5j]\u001e\u0004B!!\u0006\u0002B%!\u00111IA\f\u0005Y\t%m\u001d;sC\u000e$8i\u001c8ue>d'+Z9vKN$\u0018aH:f]\u0012\fe\u000e\u001a,fe&4\u0017pU;dG\u0016\u001c8OZ;m%\u0016\u001c\bo\u001c8tKR)A)!\u0013\u0002L!9\u00111\u0001\u000bA\u0002\u0005\u0015\u0001bBA\u0007)\u0001\u0007\u0011Q\n\u0019\u0005\u0003\u001f\n\u0019\u0006\u0005\u0004\u0002\u0014\u0005%\u0012\u0011\u000b\t\u0005\u0003c\t\u0019\u0006\u0002\u0007\u0002V\u0005-\u0013\u0011!A\u0001\u0006\u0003\t9DA\u0002`II\u0002")
/* loaded from: input_file:kafka/server/BrokerEpochIntegrationTest.class */
public class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
    private final int brokerId1 = 0;
    private final int brokerId2 = 1;
    private Seq<KafkaServer> servers = Nil$.MODULE$;

    public int brokerId1() {
        return this.brokerId1;
    }

    public int brokerId2() {
        return this.brokerId2;
    }

    public Seq<KafkaServer> servers() {
        return this.servers;
    }

    public void servers_$eq(Seq<KafkaServer> seq) {
        this.servers = seq;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    @BeforeEach
    public void setUp() {
        super.setUp();
        $colon.colon colonVar = new $colon.colon(TestUtils$.MODULE$.createBrokerConfig(brokerId1(), zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()), new $colon.colon(TestUtils$.MODULE$.createBrokerConfig(brokerId2(), zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()), Nil$.MODULE$));
        colonVar.foreach(properties -> {
            return properties.setProperty(KafkaConfig$.MODULE$.AutoLeaderRebalanceEnableProp(), Boolean.toString(false));
        });
        servers_$eq((Seq) colonVar.map(properties2 -> {
            return TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(properties2), TestUtils$.MODULE$.createServer$default$2());
        }, Seq$.MODULE$.canBuildFrom()));
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    @AfterEach
    public void tearDown() {
        TestUtils$.MODULE$.shutdownServers(servers());
        super.tearDown();
    }

    @Test
    public void testReplicaManagerBrokerEpochMatchesWithZk() {
        Map allBrokerAndEpochsInCluster = zkClient().getAllBrokerAndEpochsInCluster();
        Assertions.assertEquals(allBrokerAndEpochsInCluster.size(), servers().size());
        allBrokerAndEpochsInCluster.foreach(tuple2 -> {
            $anonfun$testReplicaManagerBrokerEpochMatchesWithZk$1(this, tuple2);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void testControllerBrokerEpochCacheMatchesWithZk() {
        KafkaServer controller = getController();
        KafkaServer kafkaServer = (KafkaServer) servers().find(kafkaServer2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$testControllerBrokerEpochCacheMatchesWithZk$1(controller, kafkaServer2));
        }).get();
        checkControllerBrokerEpochsCacheMatchesWithZk(controller.kafkaController().controllerContext());
        kafkaServer.shutdown();
        checkControllerBrokerEpochsCacheMatchesWithZk(controller.kafkaController().controllerContext());
        kafkaServer.startup();
        checkControllerBrokerEpochsCacheMatchesWithZk(controller.kafkaController().controllerContext());
    }

    @Test
    public void testControlRequestWithCorrectBrokerEpoch() {
        testControlRequestWithBrokerEpoch(0L);
    }

    @Test
    public void testControlRequestWithStaleBrokerEpoch() {
        testControlRequestWithBrokerEpoch(-1L);
    }

    @Test
    public void testControlRequestWithNewerBrokerEpoch() {
        testControlRequestWithBrokerEpoch(1L);
    }

    private void testControlRequestWithBrokerEpoch(long j) {
        TopicPartition topicPartition = new TopicPartition("new-topic", 0);
        TestUtils$.MODULE$.createTopic(zkClient(), topicPartition.topic(), (Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(0)), Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{brokerId1(), brokerId2()})))})), servers());
        java.util.Map map = (java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(getController().kafkaController().controllerContext().topicIds().toMap(Predef$.MODULE$.$conforms())).asJava();
        int _1$mcI$sp = ((Tuple2) zkClient().getControllerEpoch().get())._1$mcI$sp();
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(2, zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(securityProtocol);
        scala.collection.immutable.Map map2 = ((TraversableOnce) servers().map(kafkaServer -> {
            return new Tuple2(new Broker(kafkaServer.config().brokerId(), "localhost", TestUtils$.MODULE$.boundPort(kafkaServer, TestUtils$.MODULE$.boundPort$default$2()), forSecurityProtocol, securityProtocol), BoxesRunTime.boxToLong(kafkaServer.kafkaController().brokerEpoch()));
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        Iterable iterable = (Iterable) map2.keys().map(broker -> {
            return broker.node(forSecurityProtocol);
        }, Iterable$.MODULE$.canBuildFrom());
        ControllerContext controllerContext = new ControllerContext();
        controllerContext.setLiveBrokers(map2);
        Metrics metrics = new Metrics();
        ControllerChannelManager controllerChannelManager = new ControllerChannelManager(controllerContext, fromProps, Time.SYSTEM, metrics, new StateChangeLogger(2, true, None$.MODULE$), ControllerChannelManager$.MODULE$.$lessinit$greater$default$6());
        controllerChannelManager.startup();
        KafkaServer kafkaServer2 = (KafkaServer) servers().apply(brokerId2());
        long brokerEpoch = kafkaServer2.kafkaController().brokerEpoch() + j;
        try {
            LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion(), 2, _1$mcI$sp, brokerEpoch, (List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new LeaderAndIsrRequestData.LeaderAndIsrPartitionState().setTopicName(topicPartition.topic()).setPartitionIndex(topicPartition.partition()).setControllerEpoch(_1$mcI$sp).setLeader(brokerId2()).setLeaderEpoch(LeaderAndIsr$.MODULE$.initialLeaderEpoch() + 1).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{brokerId1(), brokerId2()})).map(obj -> {
                return Integer.valueOf(BoxesRunTime.unboxToInt(obj));
            }, Seq$.MODULE$.canBuildFrom())).asJava()).setZkVersion(LeaderAndIsr$.MODULE$.initialZKVersion()).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0, 1})).map(obj2 -> {
                return Integer.valueOf(BoxesRunTime.unboxToInt(obj2));
            }, Seq$.MODULE$.canBuildFrom())).asJava()).setIsNew(false), Nil$.MODULE$)).asJava(), map, (Collection) CollectionConverters$.MODULE$.setAsJavaSetConverter(iterable.toSet()).asJava());
            if (j < 0) {
                sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, builder);
            } else {
                sendAndVerifySuccessfulResponse(controllerChannelManager, builder);
                TestUtils$.MODULE$.waitUntilLeaderIsKnown(new $colon.colon(kafkaServer2, Nil$.MODULE$), topicPartition, 10000L);
            }
            UpdateMetadataRequest.Builder builder2 = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(), 2, _1$mcI$sp, brokerEpoch, (List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new UpdateMetadataRequestData.UpdateMetadataPartitionState().setTopicName(topicPartition.topic()).setPartitionIndex(topicPartition.partition()).setControllerEpoch(_1$mcI$sp).setLeader(brokerId2()).setLeaderEpoch(LeaderAndIsr$.MODULE$.initialLeaderEpoch() + 1).setIsr((List) CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{brokerId1(), brokerId2()})).map(obj3 -> {
                return Integer.valueOf(BoxesRunTime.unboxToInt(obj3));
            }, Seq$.MODULE$.canBuildFrom())).asJava()).setZkVersion(LeaderAndIsr$.MODULE$.initialZKVersion()).setReplicas((List) CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0, 1})).map(obj4 -> {
                return Integer.valueOf(BoxesRunTime.unboxToInt(obj4));
            }, Seq$.MODULE$.canBuildFrom())).asJava()), Nil$.MODULE$)).asJava(), (List) CollectionConverters$.MODULE$.bufferAsJavaListConverter(((TraversableOnce) map2.map(tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError((Object) null);
                }
                Broker broker2 = (Broker) tuple2._1();
                SecurityProtocol securityProtocol2 = SecurityProtocol.PLAINTEXT;
                ListenerName forSecurityProtocol2 = ListenerName.forSecurityProtocol(securityProtocol2);
                Node node = broker2.node(forSecurityProtocol2);
                return new UpdateMetadataRequestData.UpdateMetadataBroker().setId(broker2.id()).setEndpoints((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new UpdateMetadataRequestData.UpdateMetadataEndpoint().setHost(node.host()).setPort(node.port()).setSecurityProtocol(securityProtocol2.id).setListener(forSecurityProtocol2.value()), Nil$.MODULE$)).asJava()).setRack((String) broker2.rack().orNull(Predef$.MODULE$.$conforms()));
            }, scala.collection.immutable.Iterable$.MODULE$.canBuildFrom())).toBuffer()).asJava(), Collections.emptyMap());
            if (j < 0) {
                sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, builder2);
            } else {
                sendAndVerifySuccessfulResponse(controllerChannelManager, builder2);
                TestUtils$.MODULE$.waitForPartitionMetadata(new $colon.colon(kafkaServer2, Nil$.MODULE$), topicPartition.topic(), topicPartition.partition(), 10000L);
                Assertions.assertEquals(brokerId2(), ((UpdateMetadataRequestData.UpdateMetadataPartitionState) kafkaServer2.metadataCache().getPartitionInfo(topicPartition.topic(), topicPartition.partition()).get()).leader());
            }
            StopReplicaRequest.Builder builder3 = new StopReplicaRequest.Builder(ApiKeys.STOP_REPLICA.latestVersion(), 2, _1$mcI$sp, brokerEpoch, false, (List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new StopReplicaRequestData.StopReplicaTopicState().setTopicName(topicPartition.topic()).setPartitionStates((List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new StopReplicaRequestData.StopReplicaPartitionState().setPartitionIndex(topicPartition.partition()).setLeaderEpoch(LeaderAndIsr$.MODULE$.initialLeaderEpoch() + 2).setDeletePartition(true), Nil$.MODULE$)).asJava()), Nil$.MODULE$)).asJava());
            if (j < 0) {
                sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, builder3);
            } else {
                sendAndVerifySuccessfulResponse(controllerChannelManager, builder3);
                Assertions.assertEquals(HostedPartition$None$.MODULE$, kafkaServer2.replicaManager().getPartition(topicPartition));
            }
        } finally {
            controllerChannelManager.shutdown();
            metrics.close();
        }
    }

    private KafkaServer getController() {
        int waitUntilControllerElected = TestUtils$.MODULE$.waitUntilControllerElected(zkClient(), TestUtils$.MODULE$.waitUntilControllerElected$default$2());
        return (KafkaServer) ((IterableLike) servers().filter(kafkaServer -> {
            return BoxesRunTime.boxToBoolean($anonfun$getController$1(waitUntilControllerElected, kafkaServer));
        })).head();
    }

    private void checkControllerBrokerEpochsCacheMatchesWithZk(ControllerContext controllerContext) {
        Map allBrokerAndEpochsInCluster = zkClient().getAllBrokerAndEpochsInCluster();
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$checkControllerBrokerEpochsCacheMatchesWithZk$1(controllerContext, allBrokerAndEpochsInCluster)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$checkControllerBrokerEpochsCacheMatchesWithZk$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    private void sendAndVerifyStaleBrokerEpochInResponse(ControllerChannelManager controllerChannelManager, AbstractControlRequest.Builder<? extends AbstractControlRequest> builder) {
        BooleanRef create = BooleanRef.create(false);
        controllerChannelManager.sendRequest(brokerId2(), builder, abstractResponse -> {
            $anonfun$sendAndVerifyStaleBrokerEpochInResponse$1(create, abstractResponse);
            return BoxedUnit.UNIT;
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!create.elem) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$sendAndVerifyStaleBrokerEpochInResponse$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        Assertions.assertTrue(create.elem, "Stale broker epoch not detected by the broker");
    }

    private void sendAndVerifySuccessfulResponse(ControllerChannelManager controllerChannelManager, AbstractControlRequest.Builder<? extends AbstractControlRequest> builder) {
        VolatileBooleanRef create = VolatileBooleanRef.create(false);
        controllerChannelManager.sendRequest(brokerId2(), builder, abstractResponse -> {
            $anonfun$sendAndVerifySuccessfulResponse$1(create, abstractResponse);
            return BoxedUnit.UNIT;
        });
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!create.elem) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$sendAndVerifySuccessfulResponse$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    public static final /* synthetic */ boolean $anonfun$testReplicaManagerBrokerEpochMatchesWithZk$2(Broker broker, KafkaServer kafkaServer) {
        return kafkaServer.config().brokerId() == broker.id();
    }

    public static final /* synthetic */ void $anonfun$testReplicaManagerBrokerEpochMatchesWithZk$1(BrokerEpochIntegrationTest brokerEpochIntegrationTest, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Broker broker = (Broker) tuple2._1();
        long _2$mcJ$sp = tuple2._2$mcJ$sp();
        Option find = brokerEpochIntegrationTest.servers().find(kafkaServer -> {
            return BoxesRunTime.boxToBoolean($anonfun$testReplicaManagerBrokerEpochMatchesWithZk$2(broker, kafkaServer));
        });
        Assertions.assertTrue(find.isDefined());
        Assertions.assertEquals(_2$mcJ$sp, ((KafkaServer) find.get()).kafkaController().brokerEpoch());
    }

    public static final /* synthetic */ boolean $anonfun$testControllerBrokerEpochCacheMatchesWithZk$1(KafkaServer kafkaServer, KafkaServer kafkaServer2) {
        return kafkaServer2.config().brokerId() != kafkaServer.config().brokerId();
    }

    public static final /* synthetic */ boolean $anonfun$getController$1(int i, KafkaServer kafkaServer) {
        return kafkaServer.config().brokerId() == i;
    }

    public static final /* synthetic */ boolean $anonfun$checkControllerBrokerEpochsCacheMatchesWithZk$2(Map map, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        return map.get(BoxesRunTime.boxToInteger(((Broker) tuple2._1()).id())).contains(BoxesRunTime.boxToLong(tuple2._2$mcJ$sp()));
    }

    public static final /* synthetic */ boolean $anonfun$checkControllerBrokerEpochsCacheMatchesWithZk$1(ControllerContext controllerContext, Map map) {
        Map liveBrokerIdAndEpochs = controllerContext.liveBrokerIdAndEpochs();
        if (map.size() != liveBrokerIdAndEpochs.size()) {
            return false;
        }
        return map.forall(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$checkControllerBrokerEpochsCacheMatchesWithZk$2(liveBrokerIdAndEpochs, tuple2));
        });
    }

    public static final /* synthetic */ String $anonfun$checkControllerBrokerEpochsCacheMatchesWithZk$3() {
        return "Broker epoch mismatches";
    }

    public static final /* synthetic */ void $anonfun$sendAndVerifyStaleBrokerEpochInResponse$1(BooleanRef booleanRef, AbstractResponse abstractResponse) {
        booleanRef.elem = abstractResponse.errorCounts().containsKey(Errors.STALE_BROKER_EPOCH);
    }

    public static final /* synthetic */ String $anonfun$sendAndVerifyStaleBrokerEpochInResponse$3() {
        return "Broker epoch should be stale";
    }

    public static final /* synthetic */ void $anonfun$sendAndVerifySuccessfulResponse$1(VolatileBooleanRef volatileBooleanRef, AbstractResponse abstractResponse) {
        volatileBooleanRef.elem = abstractResponse.errorCounts().isEmpty() || (abstractResponse.errorCounts().containsKey(Errors.NONE) && abstractResponse.errorCounts().size() == 1);
    }

    public static final /* synthetic */ String $anonfun$sendAndVerifySuccessfulResponse$3() {
        return "Should receive response with no errors";
    }
}
