/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import com.yammer.metrics.core.Meter;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import kafka.api.ApiVersion;
import kafka.api.ApiVersion$;
import kafka.api.KAFKA_2_6_IV0$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogAppendInfo;
import kafka.log.LogManager;
import kafka.server.AbstractFetcherThread;
import kafka.server.BlockingSend;
import kafka.server.BrokerTopicStats;
import kafka.server.DiskUsageBasedThrottleListener;
import kafka.server.DiskUsageBasedThrottler$;
import kafka.server.FailedPartitions;
import kafka.server.Fetching$;
import kafka.server.InitialFetchState;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.OffsetAndEpoch;
import kafka.server.PartitionFetchState;
import kafka.server.PartitionFetchState$;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaAlterLogDirsManager;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaFetcherThread$;
import kafka.server.ReplicaFetcherThreadTest$Quota$1$;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.ReplicaState;
import kafka.server.ReplicationQuotaManager;
import kafka.server.Truncating$;
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend;
import kafka.server.metadata.ZkMetadataCache;
import kafka.server.metadata.ZkMetadataCache$;
import kafka.tier.fetcher.TierStateFetcher;
import kafka.utils.DelayedItem;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.SeqLike;
import scala.collection.Set;
import scala.collection.TraversableLike;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\r\u001df\u0001B A\u0001\u0015CQ\u0001\u0014\u0001\u0005\u00025Cq\u0001\u0015\u0001C\u0002\u0013%\u0011\u000b\u0003\u0004^\u0001\u0001\u0006IA\u0015\u0005\b=\u0002\u0011\r\u0011\"\u0003R\u0011\u0019y\u0006\u0001)A\u0005%\"9\u0001\r\u0001b\u0001\n\u0013\t\u0006BB1\u0001A\u0003%!\u000bC\u0004c\u0001\t\u0007I\u0011C2\t\r)\u0004\u0001\u0015!\u0003e\u0011\u001dY\u0007A1A\u0005\u00121Da\u0001\u001d\u0001!\u0002\u0013i\u0007bB9\u0001\u0005\u0004%IA\u001d\u0005\u0007m\u0002\u0001\u000b\u0011B:\t\u000f]\u0004!\u0019!C\u0005e\"1\u0001\u0010\u0001Q\u0001\nMDq!\u001f\u0001C\u0002\u0013%!\u0010C\u0004\u0002\u0014\u0001\u0001\u000b\u0011B>\t\u0013\u0005U\u0001A1A\u0005\n\u0005]\u0001\u0002CA\u001d\u0001\u0001\u0006I!!\u0007\t\u0013\u0005m\u0002A1A\u0005\n\u0005u\u0002\u0002CA&\u0001\u0001\u0006I!a\u0010\t\u0013\u00055\u0003A1A\u0005\n\u0005=\u0003\u0002CA/\u0001\u0001\u0006I!!\u0015\t\u000f\u0005}\u0003\u0001\"\u0003\u0002b!I\u0011q\u0011\u0001\u0012\u0002\u0013%\u0011\u0011\u0012\u0005\b\u0003?\u0003A\u0011AAQ\u0011\u001d\ty\f\u0001C\t\u0003\u0003D\u0011Ba\u0018\u0001#\u0003%\tB!\u0019\t\u0013\t\u0015\u0004!%A\u0005\u0012\t\u001d\u0004\"\u0003B6\u0001E\u0005I\u0011\u0003B7\u0011%\u0011\t\bAI\u0001\n#\u0011i\u0007C\u0004\u0003t\u0001!\t!!)\t\u000f\tu\u0004\u0001\"\u0001\u0002\"\"9!\u0011\u0011\u0001\u0005\u0002\t\r\u0005b\u0002BP\u0001\u0011\u0005\u0011\u0011\u0015\u0005\b\u0005G\u0003A\u0011AAQ\u0011\u001d\u00119\u000b\u0001C\u0001\u0003CCqAa+\u0001\t#\u0011i\u000bC\u0005\u0003B\u0002\t\n\u0011\"\u0005\u0002\n\"9!1\u0019\u0001\u0005\u0012\t\u0015\u0007b\u0002Bv\u0001\u0011\u0005!Q\u001e\u0005\n\u0005o\u0004\u0011\u0013!C\u0001\u0003\u0013CqA!?\u0001\t\u0003\t\t\u000bC\u0004\u0003~\u0002!\t!!)\t\u000f\r\u0005\u0001\u0001\"\u0001\u0002\"\"91Q\u0001\u0001\u0005\u0002\u0005\u0005\u0006bBB\u0005\u0001\u0011\u0005\u0011\u0011\u0015\u0005\b\u0007\u001b\u0001A\u0011AAQ\u0011\u001d\u0019\t\u0002\u0001C\u0001\u0003CCqa!\u0006\u0001\t\u0003\t\t\u000bC\u0004\u0004\u001a\u0001!\t!!)\t\u000f\ru\u0001\u0001\"\u0001\u0002\"\"91\u0011\u0005\u0001\u0005\u0002\u0005\u0005\u0006bBB\u0013\u0001\u0011\u0005\u0011\u0011\u0015\u0005\b\u0007S\u0001A\u0011AAQ\u0011\u001d\u0019i\u0003\u0001C\u0001\u0003CCqa!\r\u0001\t\u0003\t\t\u000bC\u0004\u00046\u0001!Iaa\u000e\t\u000f\rU\u0002\u0001\"\u0003\u0004f!91Q\u0010\u0001\u0005\n\r}\u0004bBBC\u0001\u0011\u00051q\u0011\u0005\b\u0007G\u0003A\u0011BBS\u0005a\u0011V\r\u001d7jG\u00064U\r^2iKJ$\u0006N]3bIR+7\u000f\u001e\u0006\u0003\u0003\n\u000baa]3sm\u0016\u0014(\"A\"\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001A\u0012\t\u0003\u000f*k\u0011\u0001\u0013\u0006\u0002\u0013\u0006)1oY1mC&\u00111\n\u0013\u0002\u0007\u0003:L(+\u001a4\u0002\rqJg.\u001b;?)\u0005q\u0005CA(\u0001\u001b\u0005\u0001\u0015\u0001\u0002;2aB*\u0012A\u0015\t\u0003'nk\u0011\u0001\u0016\u0006\u0003+Z\u000baaY8n[>t'BA\"X\u0015\tA\u0016,\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u00025\u0006\u0019qN]4\n\u0005q#&A\u0004+pa&\u001c\u0007+\u0019:uSRLwN\\\u0001\u0006iF\u0002\b\u0007I\u0001\u0005iF\u0002\u0018'A\u0003ucA\f\u0004%\u0001\u0003ueA\f\u0014!\u0002;3aF\u0002\u0013A\u00042s_.,'/\u00128e!>Lg\u000e^\u000b\u0002IB\u0011Q\r[\u0007\u0002M*\u0011qMQ\u0001\bG2,8\u000f^3s\u0013\tIgM\u0001\bCe>\\WM]#oIB{\u0017N\u001c;\u0002\u001f\t\u0014xn[3s\u000b:$\u0007k\\5oi\u0002\n\u0001CZ1jY\u0016$\u0007+\u0019:uSRLwN\\:\u0016\u00035\u0004\"a\u00148\n\u0005=\u0004%\u0001\u0005$bS2,G\rU1si&$\u0018n\u001c8t\u0003E1\u0017-\u001b7fIB\u000b'\u000f^5uS>t7\u000fI\u0001\ti>\u0004\u0018nY%ecU\t1\u000f\u0005\u0002Ti&\u0011Q\u000f\u0016\u0002\u0005+VLG-A\u0005u_BL7-\u001332A\u0005AAo\u001c9jG&#''A\u0005u_BL7-\u001333A\u0005AAo\u001c9jG&#7/F\u0001|!\u0015ax0a\u0001t\u001b\u0005i(B\u0001@I\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0004\u0003\u0003i(aA'baB!\u0011QAA\b\u001b\t\t9A\u0003\u0003\u0002\n\u0005-\u0011\u0001\u00027b]\u001eT!!!\u0004\u0002\t)\fg/Y\u0005\u0005\u0003#\t9A\u0001\u0004TiJLgnZ\u0001\ni>\u0004\u0018nY%eg\u0002\nq\u0002]1si&$\u0018n\u001c8Ti\u0006$Xm]\u000b\u0003\u00033\u0001b!a\u0007\u0002\"\u0005\u0015RBAA\u000f\u0015\u0011\ty\"a\u0003\u0002\tU$\u0018\u000e\\\u0005\u0005\u0003G\tiB\u0001\u0003MSN$\b\u0003BA\u0014\u0003gqA!!\u000b\u000205\u0011\u00111\u0006\u0006\u0004\u0003[!\u0016aB7fgN\fw-Z\u0005\u0005\u0003c\tY#A\rVa\u0012\fG/Z'fi\u0006$\u0017\r^1SKF,Xm\u001d;ECR\f\u0017\u0002BA\u001b\u0003o\u0011A$\u00169eCR,W*\u001a;bI\u0006$\u0018\rU1si&$\u0018n\u001c8Ti\u0006$XM\u0003\u0003\u00022\u0005-\u0012\u0001\u00059beRLG/[8o'R\fG/Z:!\u0003U)\b\u000fZ1uK6+G/\u00193bi\u0006\u0014V-];fgR,\"!a\u0010\u0011\t\u0005\u0005\u0013qI\u0007\u0003\u0003\u0007R1!!\u0012U\u0003!\u0011X-];fgR\u001c\u0018\u0002BA%\u0003\u0007\u0012Q#\u00169eCR,W*\u001a;bI\u0006$\u0018MU3rk\u0016\u001cH/\u0001\fva\u0012\fG/Z'fi\u0006$\u0017\r^1SKF,Xm\u001d;!\u00035iW\r^1eCR\f7)Y2iKV\u0011\u0011\u0011\u000b\t\u0005\u0003'\nI&\u0004\u0002\u0002V)\u0019\u0011q\u000b!\u0002\u00115,G/\u00193bi\u0006LA!a\u0017\u0002V\ty!l['fi\u0006$\u0017\r^1DC\u000eDW-\u0001\bnKR\fG-\u0019;b\u0007\u0006\u001c\u0007.\u001a\u0011\u0002#%t\u0017\u000e^5bY\u001a+Go\u00195Ti\u0006$X\r\u0006\u0005\u0002d\u0005%\u00141OA?!\ry\u0015QM\u0005\u0004\u0003O\u0002%!E%oSRL\u0017\r\u001c$fi\u000eD7\u000b^1uK\"9\u00111\u000e\rA\u0002\u00055\u0014a\u0002;pa&\u001c\u0017\n\u001a\t\u0005\u000f\u0006=4/C\u0002\u0002r!\u0013aa\u00149uS>t\u0007bBA;1\u0001\u0007\u0011qO\u0001\fM\u0016$8\r[(gMN,G\u000fE\u0002H\u0003sJ1!a\u001fI\u0005\u0011auN\\4\t\u0013\u0005}\u0004\u0004%AA\u0002\u0005\u0005\u0015a\u00037fC\u0012,'/\u00129pG\"\u00042aRAB\u0013\r\t)\t\u0013\u0002\u0004\u0013:$\u0018aG5oSRL\u0017\r\u001c$fi\u000eD7\u000b^1uK\u0012\"WMZ1vYR$3'\u0006\u0002\u0002\f*\"\u0011\u0011QAGW\t\ty\t\u0005\u0003\u0002\u0012\u0006mUBAAJ\u0015\u0011\t)*a&\u0002\u0013Ut7\r[3dW\u0016$'bAAM\u0011\u0006Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u0005u\u00151\u0013\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017aB2mK\u0006tW\u000f\u001d\u000b\u0003\u0003G\u00032aRAS\u0013\r\t9\u000b\u0013\u0002\u0005+:LG\u000fK\u0002\u001b\u0003W\u0003B!!,\u0002<6\u0011\u0011q\u0016\u0006\u0005\u0003c\u000b\u0019,A\u0002ba&TA!!.\u00028\u00069!.\u001e9ji\u0016\u0014(bAA]3\u0006)!.\u001e8ji&!\u0011QXAX\u0005%\te\r^3s\u000b\u0006\u001c\u0007.\u0001\u000ede\u0016\fG/\u001a*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\r\u0006\u0010\u0002D\u0006%\u0017\u0011]As\u0003S\f\u00190!>\u0002\u0000\n5!Q\u0004B\u0014\u0005{\u0011IE!\u0016\u0003\\A\u0019q*!2\n\u0007\u0005\u001d\u0007I\u0001\u000bSKBd\u0017nY1GKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a\u0005\b\u0003\u0017\\\u0002\u0019AAg\u0003\u0011q\u0017-\\3\u0011\t\u0005=\u0017Q\u001c\b\u0005\u0003#\fI\u000eE\u0002\u0002T\"k!!!6\u000b\u0007\u0005]G)\u0001\u0004=e>|GOP\u0005\u0004\u00037D\u0015A\u0002)sK\u0012,g-\u0003\u0003\u0002\u0012\u0005}'bAAn\u0011\"9\u00111]\u000eA\u0002\u0005\u0005\u0015!\u00034fi\u000eDWM]%e\u0011\u0019\t9o\u0007a\u0001I\u0006a1o\\;sG\u0016\u0014%o\\6fe\"9\u00111^\u000eA\u0002\u00055\u0018\u0001\u00042s_.,'oQ8oM&<\u0007cA(\u0002p&\u0019\u0011\u0011\u001f!\u0003\u0017-\u000bgm[1D_:4\u0017n\u001a\u0005\u0006Wn\u0001\r!\u001c\u0005\b\u0003o\\\u0002\u0019AA}\u0003)\u0011X\r\u001d7jG\u0006luM\u001d\t\u0004\u001f\u0006m\u0018bAA\u007f\u0001\nq!+\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\bb\u0002B\u00017\u0001\u0007!1A\u0001\b[\u0016$(/[2t!\u0011\u0011)A!\u0003\u000e\u0005\t\u001d!b\u0001B\u0001)&!!1\u0002B\u0004\u0005\u001diU\r\u001e:jGNDqAa\u0004\u001c\u0001\u0004\u0011\t\"\u0001\u0003uS6,\u0007\u0003\u0002B\n\u00053i!A!\u0006\u000b\u0007\t]A+A\u0003vi&d7/\u0003\u0003\u0003\u001c\tU!\u0001\u0002+j[\u0016DqAa\b\u001c\u0001\u0004\u0011\t#A\u0003rk>$\u0018\rE\u0002P\u0005GI1A!\nA\u00051\u0011V\r\u001d7jG\u0006\fVo\u001c;b\u0011\u001d\u0011Ic\u0007a\u0001\u0005W\t\u0001\u0003^5feN#\u0018\r^3GKR\u001c\u0007.\u001a:\u0011\u000b\u001d\u000byG!\f\u0011\t\t=\"\u0011H\u0007\u0003\u0005cQAAa\r\u00036\u00059a-\u001a;dQ\u0016\u0014(b\u0001B\u001c\u0005\u0006!A/[3s\u0013\u0011\u0011YD!\r\u0003!QKWM]*uCR,g)\u001a;dQ\u0016\u0014\b\"\u0003B 7A\u0005\t\u0019\u0001B!\u0003iaW-\u00193fe\u0016sG\r]8j]R\u0014En\\2lS:<7+\u001a8e!\u00159\u0015q\u000eB\"!\ry%QI\u0005\u0004\u0005\u000f\u0002%\u0001\u0004\"m_\u000e\\\u0017N\\4TK:$\u0007\"\u0003B&7A\u0005\t\u0019\u0001B'\u00035awnZ\"p]R,\u0007\u0010^(qiB)q)a\u001c\u0003PA!!1\u0003B)\u0013\u0011\u0011\u0019F!\u0006\u0003\u00151{wmQ8oi\u0016DH\u000fC\u0005\u0003Xm\u0001\n\u00111\u0001\u0003Z\u0005\tb-\u001a;dQ\n\u000b7m[(gM6\u001bx\n\u001d;\u0011\u000b\u001d\u000by'a\u001e\t\u0013\tu3\u0004%AA\u0002\te\u0013\u0001\u00064fi\u000eD')Y2l\u001f\u001a4Wj]'bq>\u0003H/A\u0013de\u0016\fG/\u001a*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\r\n3fM\u0006,H\u000e\u001e\u00132cU\u0011!1\r\u0016\u0005\u0005\u0003\ni)A\u0013de\u0016\fG/\u001a*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\r\n3fM\u0006,H\u000e\u001e\u00132eU\u0011!\u0011\u000e\u0016\u0005\u0005\u001b\ni)A\u0013de\u0016\fG/\u001a*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\r\n3fM\u0006,H\u000e\u001e\u00132gU\u0011!q\u000e\u0016\u0005\u00053\ni)A\u0013de\u0016\fG/\u001a*fa2L7-\u0019$fi\u000eDWM\u001d+ie\u0016\fG\r\n3fM\u0006,H\u000e\u001e\u00132i\u0005A3\u000f[8vY\u0012\u001cVM\u001c3MCR,7\u000f\u001e*fcV,7\u000f\u001e,feNLwN\\:Cs\u0012+g-Y;mi\"\u001a\u0001Ea\u001e\u0011\t\u00055&\u0011P\u0005\u0005\u0005w\nyK\u0001\u0003UKN$\u0018A\u0010;fgR4U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007NU3rk\u0016\u001cH/\u00134MCN$X\t]8dQ\u0012+g-\u001b8fI\u001a{'oU8nKB\u000b'\u000f^5uS>t7\u000fK\u0002\"\u0005o\nQ#Y:tKJ$\b+\u0019:uSRLwN\\*uCR,7\u000f\u0006\u0006\u0002$\n\u0015%Q\u0012BL\u00057CqAa\r#\u0001\u0004\u00119\tE\u0002P\u0005\u0013K1Aa#A\u0005U\t%m\u001d;sC\u000e$h)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012DqAa$#\u0001\u0004\u0011\t*A\u000btQ>,H\u000e\u001a\"f%\u0016\fG-\u001f$pe\u001a+Go\u00195\u0011\u0007\u001d\u0013\u0019*C\u0002\u0003\u0016\"\u0013qAQ8pY\u0016\fg\u000eC\u0004\u0003\u001a\n\u0002\rA!%\u0002+MDw.\u001e7e\u0005\u0016$&/\u001e8dCRLgn\u001a'pO\"9!Q\u0014\u0012A\u0002\tE\u0015aD:i_VdGMQ3EK2\f\u00170\u001a3\u0002KMDw.\u001e7e\u0011\u0006tG\r\\3Fq\u000e,\u0007\u000f^5p]\u001a\u0013x.\u001c\"m_\u000e\\\u0017N\\4TK:$\u0007fA\u0012\u0003x\u0005\u00195\u000f[8vY\u00124U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00148GSJ\u001cHOR3uG\"|e\u000e\\=JM2+\u0017\rZ3s\u000bB|7\r[&o_^tGk\u001c\"pi\"L%\r\u001d\u001a7Q\r!#qO\u00019g\"|W\u000f\u001c3O_R4U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007n\u00148GSJ\u001cHOR3uG\"<\u0016\u000e\u001e5UeVt7-\u0019;f\u001f:4U\r^2iQ\r)#qO\u0001#m\u0016\u0014\u0018NZ=GKR\u001c\u0007\u000eT3bI\u0016\u0014X\t]8dQ>sg)\u001b:ti\u001a+Go\u00195\u0015\r\u0005\r&q\u0016B_\u0011\u001d\u0011\tL\na\u0001\u0005g\u000b1!\u001b2q!\u0011\u0011)L!/\u000e\u0005\t]&bAAY\u0005&!!1\u0018B\\\u0005)\t\u0005/\u001b,feNLwN\u001c\u0005\n\u0005\u007f3\u0003\u0013!a\u0001\u0003\u0003\u000bq\"\u001a9pG\"4U\r^2i\u0007>,h\u000e^\u0001-m\u0016\u0014\u0018NZ=GKR\u001c\u0007\u000eT3bI\u0016\u0014X\t]8dQ>sg)\u001b:ti\u001a+Go\u00195%I\u00164\u0017-\u001e7uII\n!D^3sS\u001aLxJ\u001a4tKR\u0014V-];fgR4VM]:j_:$\u0002\"a)\u0003H\n%'1\u001c\u0005\b\u0005cC\u0003\u0019\u0001BZ\u0011\u001d\u0011Y\r\u000ba\u0001\u0005\u001b\f1d\u001c4gg\u0016$hi\u001c:MK\u0006$WM]#q_\u000eD'+Z9vKN$\b\u0003\u0002Bh\u0005+tA!!\u0011\u0003R&!!1[A\"\u0003qyeMZ:fiN4uN\u001d'fC\u0012,'/\u00129pG\"\u0014V-];fgRLAAa6\u0003Z\n9!)^5mI\u0016\u0014(\u0002\u0002Bj\u0003\u0007BqA!8)\u0001\u0004\u0011y.\u0001\nmSN$xJ\u001a4tKR\u001c(+Z9vKN$\b\u0003\u0002Bq\u0005OtA!!\u0011\u0003d&!!Q]A\"\u0003Ia\u0015n\u001d;PM\u001a\u001cX\r^:SKF,Xm\u001d;\n\t\t]'\u0011\u001e\u0006\u0005\u0005K\f\u0019%A\rwKJLg-_'be.\u0014V\r\u001d7jG\u0006$\u0006N]8ui2,GCBAR\u0005_\u0014\u0019\u0010C\u0004\u0003r&\u0002\r!!?\u0002\u001dI,\u0007\u000f\\5dC6\u000bg.Y4fe\"I!Q_\u0015\u0011\u0002\u0003\u0007\u0011\u0011Q\u0001\u0006i&lWm]\u0001$m\u0016\u0014\u0018NZ=NCJ\\'+\u001a9mS\u000e\fG\u000b\u001b:piRdW\r\n3fM\u0006,H\u000e\u001e\u00133\u0003u\u0019\bn\\;mIRC'o\u001c;uY\u00164u\u000e\u001c7po\u0016\u0014(+\u001a9mS\u000e\f\u0007fA\u0016\u0003x\u0005\u0001C/Z:u\r>dGn\\<fe&\u001bH\u000b\u001b:piRdW\rZ(o\u0019><H)[:lQ\ra#qO\u00015g\"|W\u000f\u001c3UeVt7-\u0019;f)>|eMZ:fiN\u0003XmY5gS\u0016$\u0017J\\#q_\u000eDwJ\u001a4tKR\u0014Vm\u001d9p]N,\u0007fA\u0017\u0003x\u0005i5\u000f[8vY\u0012$&/\u001e8dCR,Gk\\(gMN,Go\u00159fG&4\u0017.\u001a3J]\u0016\u0003xn\u00195PM\u001a\u001cX\r\u001e*fgB|gn]3JM\u001a{G\u000e\\8xKJD\u0015m\u001d(p\u001b>\u0014X-\u00129pG\"\u001c\bf\u0001\u0018\u0003x\u0005Q5\u000f[8vY\u00124U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007nU3d_:$G+[7f\u0013\u001adU-\u00193feJ+\u0007\u000f\\5fg^KG\u000f[#q_\u000eDgj\u001c;L]><h\u000eV8G_2dwn^3sQ\ry#qO\u0001Bg\"|W\u000f\u001c3UeVt7-\u0019;f\u0013\u001adU-\u00193feJ+\u0007\u000f\\5fg^KG\u000f\u001b#jm\u0016\u0014x-\u001b8h\u000bB|7\r\u001b(pi.swn\u001e8U_\u001a{G\u000e\\8xKJD3\u0001\rB<\u0003M\u001a\bn\\;mIV\u001bX\rT3bI\u0016\u0014XI\u001c3PM\u001a\u001cX\r^%g\u0013:$XM\u001d\"s_.,'OV3sg&|gNQ3m_^\u0014\u0004\u0007K\u00022\u0005o\n\u0001i\u001d5pk2$GK];oG\u0006$X\rV8J]&$\u0018.\u00197GKR\u001c\u0007n\u00144gg\u0016$\u0018J\u001a'fC\u0012,'OU3ukJt7/\u00168eK\u001aLg.\u001a3PM\u001a\u001cX\r\u001e\u0015\u0004e\t]\u0014!M:i_VdG\rU8mY&sG-\u001a4j]&$X\r\\=JM2+\u0017\rZ3s%\u0016$XO\u001d8t\u0003:LX\t_2faRLwN\u001c\u0015\u0004g\t]\u0014aK:i_VdG-T8wKB\u000b'\u000f^5uS>t7oT;u\u001f\u001a$&/\u001e8dCRLgn\u001a'pON#\u0018\r^3)\u0007Q\u00129(\u0001\u001dtQ>,H\u000e\u001a$jYR,'\u000fU1si&$\u0018n\u001c8t\u001b\u0006$W\rT3bI\u0016\u0014H)\u001e:j]\u001edU-\u00193fe\u0016\u0003xn\u00195SKF,Xm\u001d;)\u0007U\u00129(\u0001%tQ>,H\u000eZ\"bi\u000eDW\t_2faRLwN\u001c$s_6\u0014En\\2lS:<7+\u001a8e/\",gn\u00155viRLgn\u001a#po:\u0014V\r\u001d7jG\u00064U\r^2iKJ$\u0006N]3bI\"\u001aaGa\u001e\u0002MMDw.\u001e7e+B$\u0017\r^3SK\u0006\u001c8/[4o[\u0016tGOQ=uKNLe.T3ue&\u001c7\u000fK\u00028\u0005o\nai\u001d5pk2$gj\u001c;Va\u0012\fG/\u001a*fCN\u001c\u0018n\u001a8nK:$()\u001f;fg&sW*\u001a;sS\u000e\u001cx\u000b[3o\u001d>\u0014V-Y:tS\u001etW.\u001a8ug&s\u0007K]8he\u0016\u001c8\u000fK\u00029\u0005o\na\u0002^3ti\n+\u0018\u000e\u001c3GKR\u001c\u0007\u000eK\u0002:\u0005o\n\u0011E\\3x\u001f\u001a47/\u001a;G_JdU-\u00193feB\u000b'\u000f^5uS>t'+Z:vYR$\u0002b!\u000f\u0004\\\r}3\u0011\r\t\u0005\u0007w\u0019)F\u0004\u0003\u0004>\rEc\u0002BB \u0007\u001frAa!\u0011\u0004N9!11IB&\u001d\u0011\u0019)e!\u0013\u000f\t\u0005M7qI\u0005\u00025&\u0011\u0001,W\u0005\u0003\u0007^K!!\u0016,\n\u0007\u00055B+\u0003\u0003\u0004T\u0005-\u0012\u0001I(gMN,GOR8s\u0019\u0016\fG-\u001a:Fa>\u001c\u0007NU3ta>t7/\u001a#bi\u0006LAaa\u0016\u0004Z\tqQ\t]8dQ\u0016sGm\u00144gg\u0016$(\u0002BB*\u0003WAaa!\u0018;\u0001\u0004\u0011\u0016A\u0001;q\u0011\u001d\tyH\u000fa\u0001\u0003\u0003Cqaa\u0019;\u0001\u0004\t9(A\u0005f]\u0012|eMZ:fiRQ1\u0011HB4\u0007S\u001aIha\u001f\t\r\ru3\b1\u0001S\u0011\u001d\u0019Yg\u000fa\u0001\u0007[\nQ!\u001a:s_J\u0004Baa\u001c\u0004v5\u00111\u0011\u000f\u0006\u0004\u0007g\"\u0016\u0001\u00039s_R|7m\u001c7\n\t\r]4\u0011\u000f\u0002\u0007\u000bJ\u0014xN]:\t\u000f\u0005}4\b1\u0001\u0002\u0002\"911M\u001eA\u0002\u0005]\u0014AH1tg\u0016\u0014H\u000f\u0015:pG\u0016\u001c8\u000fU1si&$\u0018n\u001c8ECR\fw\u000b[3o)\u0011\t\u0019k!!\t\u000f\r\rE\b1\u0001\u0003\u0012\u0006i\u0011n\u001d*fCN\u001c\u0018n\u001a8j]\u001e\fAa\u001d;vERA\u00111UBE\u0007'\u001b)\nC\u0004\u0004\fv\u0002\ra!$\u0002\u0013A\f'\u000f^5uS>t\u0007cA3\u0004\u0010&\u00191\u0011\u00134\u0003\u0013A\u000b'\u000f^5uS>t\u0007b\u0002By{\u0001\u0007\u0011\u0011 \u0005\b\u0007/k\u0004\u0019ABM\u0003\rawn\u001a\t\u0005\u00077\u001by*\u0004\u0002\u0004\u001e*\u00191q\u0013\"\n\t\r\u00056Q\u0014\u0002\f\u0003\n\u001cHO]1di2{w-\u0001\u000flC\u001a\\\u0017mQ8oM&<gj\u001c+sk:\u001c\u0017\r^3P]\u001a+Go\u00195\u0016\u0005\u00055\b")
public class ReplicaFetcherThreadTest {
    private final TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
    private final FailedPartitions failedPartitions = new FailedPartitions();
    private final Uuid topicId1 = Uuid.randomUuid();
    private final Uuid topicId2 = Uuid.randomUuid();
    private final Map<String, Uuid> topicIds = (Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"topic1"), (Object)this.topicId1()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"topic2"), (Object)this.topicId2())}));
    private final java.util.List<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionStates = (java.util.List)CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq)new .colon.colon((Object)new UpdateMetadataRequestData.UpdateMetadataPartitionState().setTopicName("topic1").setPartitionIndex(0).setControllerEpoch(0).setLeader(0).setLeaderEpoch(0), (List)new .colon.colon((Object)new UpdateMetadataRequestData.UpdateMetadataPartitionState().setTopicName("topic2").setPartitionIndex(0).setControllerEpoch(0).setLeader(0).setLeaderEpoch(0), (List)Nil$.MODULE$))).asJava();
    private final UpdateMetadataRequest updateMetadataRequest = (UpdateMetadataRequest)new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(), 0, 0, 0L, this.partitionStates(), Collections.emptyList(), (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter(this.topicIds()).asJava()).build();
    private final ZkMetadataCache metadataCache = new ZkMetadataCache(0, ZkMetadataCache$.MODULE$.$lessinit$greater$default$2());

    public TopicPartition kafka$server$ReplicaFetcherThreadTest$$t1p0() {
        return this.kafka$server$ReplicaFetcherThreadTest$$t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private TopicPartition t2p1() {
        return this.t2p1;
    }

    public BrokerEndPoint brokerEndPoint() {
        return this.brokerEndPoint;
    }

    public FailedPartitions failedPartitions() {
        return this.failedPartitions;
    }

    private Uuid topicId1() {
        return this.topicId1;
    }

    private Uuid topicId2() {
        return this.topicId2;
    }

    private Map<String, Uuid> topicIds() {
        return this.topicIds;
    }

    private java.util.List<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionStates() {
        return this.partitionStates;
    }

    private UpdateMetadataRequest updateMetadataRequest() {
        return this.updateMetadataRequest;
    }

    private ZkMetadataCache metadataCache() {
        return this.metadataCache;
    }

    private InitialFetchState initialFetchState(Option<Uuid> topicId, long fetchOffset, int leaderEpoch) {
        BrokerEndPoint x$2 = new BrokerEndPoint(0, "localhost", 9092);
        return new InitialFetchState(topicId, x$2, leaderEpoch, fetchOffset);
    }

    private int initialFetchState$default$3() {
        return 1;
    }

    @AfterEach
    public void cleanup() {
        TestUtils$.MODULE$.clearYammerMetrics();
    }

    public ReplicaFetcherThread createReplicaFetcherThread(String name, int fetcherId, BrokerEndPoint sourceBroker, KafkaConfig brokerConfig, FailedPartitions failedPartitions, ReplicaManager replicaMgr, Metrics metrics, Time time, ReplicaQuota quota, Option<TierStateFetcher> tierStateFetcher, Option<BlockingSend> leaderEndpointBlockingSend, Option<LogContext> logContextOpt, Option<Object> fetchBackOffMsOpt, Option<Object> fetchBackOffMsMaxOpt) {
        Map x$14 = ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$12();
        return new ReplicaFetcherThread(name, fetcherId, sourceBroker, brokerConfig, failedPartitions, replicaMgr, metrics, time, quota, leaderEndpointBlockingSend, logContextOpt, x$14, fetchBackOffMsOpt, fetchBackOffMsMaxOpt);
    }

    public Option<BlockingSend> createReplicaFetcherThread$default$11() {
        return None$.MODULE$;
    }

    public Option<LogContext> createReplicaFetcherThread$default$12() {
        return None$.MODULE$;
    }

    public Option<Object> createReplicaFetcherThread$default$13() {
        return None$.MODULE$;
    }

    public Option<Object> createReplicaFetcherThread$default$14() {
        return None$.MODULE$;
    }

    @Test
    public void shouldSendLatestRequestVersionsByDefault() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        String x$1 = "bob";
        int x$2 = 0;
        BrokerEndPoint x$3 = this.brokerEndPoint();
        FailedPartitions x$5 = this.failedPartitions();
        None$ x$7 = None$.MODULE$;
        Metrics x$8 = new Metrics();
        SystemTime x$9 = new SystemTime();
        QuotaFactory.UnboundedQuota$ x$10 = QuotaFactory.UnboundedQuota$.MODULE$;
        None$ x$11 = None$.MODULE$;
        Option<LogContext> x$12 = this.createReplicaFetcherThread$default$12();
        Option<Object> x$13 = this.createReplicaFetcherThread$default$13();
        Option<Object> x$14 = this.createReplicaFetcherThread$default$14();
        ReplicaFetcherThread thread = this.createReplicaFetcherThread(x$1, x$2, x$3, config, x$5, replicaManager, x$8, (Time)x$9, (ReplicaQuota)x$10, (Option<TierStateFetcher>)x$7, (Option<BlockingSend>)x$11, x$12, x$13, x$14);
        Assertions.assertEquals((short)ApiKeys.FETCH.latestVersion(), (short)thread.fetchRequestVersion());
        Assertions.assertEquals((short)ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), (short)thread.offsetForLeaderEpochRequestVersion());
        Assertions.assertEquals((short)ApiKeys.LIST_OFFSETS.latestVersion(), (short)thread.listOffsetRequestVersion());
    }

    @Test
    public void testFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions() {
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)Mockito.mock(AbstractLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        int leaderEpoch = 5;
        Mockito.when((Object)partition.localLogOrException()).thenReturn((Object)log);
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)log.latestEpoch()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).thenReturn((Object)None$.MODULE$);
        Mockito.when((Object)log.endOffsetForEpoch(leaderEpoch)).thenReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch)));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        Mockito.when((Object)replicaManager.replicaAlterLogDirsManager()).thenReturn((Object)replicaAlterLogDirsManager);
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when((Object)quota.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference<None$>(None$.MODULE$));
        this.stub(partition, replicaManager, log);
        java.util.Map offsets = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12(), this.createReplicaFetcherThread$default$13(), this.createReplicaFetcherThread$default$14());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId2()), 0L, this.initialFetchState$default$3())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId2()), 0L, this.initialFetchState$default$3()))})));
        this.assertPartitionStates((AbstractFetcherThread)thread, false, true, false);
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        this.assertPartitionStates((AbstractFetcherThread)thread, true, false, false);
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)2, (int)mockNetwork.fetchCount());
        this.assertPartitionStates((AbstractFetcherThread)thread, true, false, false);
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)3, (int)mockNetwork.fetchCount());
        this.assertPartitionStates((AbstractFetcherThread)thread, true, false, false);
        ((Partition)Mockito.verify((Object)partition, (VerificationMode)Mockito.times((int)3))).truncateTo(ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
    }

    public void assertPartitionStates(AbstractFetcherThread fetcher, boolean shouldBeReadyForFetch, boolean shouldBeTruncatingLog, boolean shouldBeDelayed) {
        new .colon.colon((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), (List)new .colon.colon((Object)this.t1p1(), (List)new .colon.colon((Object)this.t2p1(), (List)Nil$.MODULE$))).foreach((Function1 & Serializable & scala.Serializable)tp -> {
            ReplicaFetcherThreadTest.$anonfun$assertPartitionStates$1(fetcher, shouldBeReadyForFetch, shouldBeTruncatingLog, shouldBeDelayed, tp);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void shouldHandleExceptionFromBlockingSend() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        BlockingSend mockBlockingSend = (BlockingSend)Mockito.mock(BlockingSend.class);
        Mockito.when((Object)mockBlockingSend.sendRequest((AbstractRequest.Builder)ArgumentMatchers.any())).thenThrow(new Throwable[]{new NullPointerException()});
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        String x$1 = "bob";
        int x$2 = 0;
        BrokerEndPoint x$3 = this.brokerEndPoint();
        FailedPartitions x$5 = this.failedPartitions();
        None$ x$7 = None$.MODULE$;
        Metrics x$8 = new Metrics();
        SystemTime x$9 = new SystemTime();
        Some x$11 = new Some((Object)mockBlockingSend);
        Option<LogContext> x$12 = this.createReplicaFetcherThread$default$12();
        Option<Object> x$13 = this.createReplicaFetcherThread$default$13();
        Option<Object> x$14 = this.createReplicaFetcherThread$default$14();
        Map result = this.createReplicaFetcherThread(x$1, x$2, x$3, config, x$5, replicaManager, x$8, (Time)x$9, null, (Option<TierStateFetcher>)x$7, (Option<BlockingSend>)x$11, x$12, x$13, x$14).fetchEpochEndOffsets((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(this.kafka$server$ReplicaFetcherThreadTest$$t1p0().partition()).setLeaderEpoch(0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(this.t1p1().partition()).setLeaderEpoch(0))})));
        Assertions.assertEquals((Object)((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}))), (Object)result, (String)"results from leader epoch request should have undefined offset");
        ((BlockingSend)Mockito.verify((Object)mockBlockingSend)).sendRequest((AbstractRequest.Builder)ArgumentMatchers.any());
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnlyIfLeaderEpochKnownToBothIbp26() {
        this.verifyFetchLeaderEpochOnFirstFetch((ApiVersion)KAFKA_2_6_IV0$.MODULE$, this.verifyFetchLeaderEpochOnFirstFetch$default$2());
    }

    @Test
    public void shouldNotFetchLeaderEpochOnFirstFetchWithTruncateOnFetch() {
        this.verifyFetchLeaderEpochOnFirstFetch(ApiVersion$.MODULE$.latestVersion(), 0);
    }

    public void verifyFetchLeaderEpochOnFirstFetch(ApiVersion ibp, int epochFetchCount) {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        props.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), ibp.version());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)Mockito.mock(AbstractLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        int leaderEpoch = 5;
        Mockito.when((Object)partition.localLogOrException()).thenReturn((Object)log);
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)log.latestEpoch()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch)));
        Mockito.when((Object)log.endOffsetForEpoch(leaderEpoch)).thenReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch)));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        Mockito.when((Object)replicaManager.replicaAlterLogDirsManager()).thenReturn((Object)replicaAlterLogDirsManager);
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        this.stub(partition, replicaManager, log);
        java.util.Map offsets = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)QuotaFactory.UnboundedQuota$.MODULE$, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12(), this.createReplicaFetcherThread$default$13(), this.createReplicaFetcherThread$default$14());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3()))})));
        thread.doWork();
        Assertions.assertEquals((int)epochFetchCount, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        thread.doWork();
        Assertions.assertEquals((int)epochFetchCount, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)2, (int)mockNetwork.fetchCount());
        thread.doWork();
        Assertions.assertEquals((int)epochFetchCount, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)3, (int)mockNetwork.fetchCount());
        if (epochFetchCount > 0) {
            ListOffsetsRequestData.ListOffsetsTopic topic = (ListOffsetsRequestData.ListOffsetsTopic)EasyMock.createMock(ListOffsetsRequestData.ListOffsetsTopic.class);
            ListOffsetsRequest.Builder listOffsetsRequest = thread.listOffsetRequestBuilder(topic);
            this.verifyOffsetRequestVersion(ibp, (OffsetsForLeaderEpochRequest.Builder)mockNetwork.lastUsedOffsetsForLeaderEpochRequest().get(), listOffsetsRequest);
        }
    }

    public int verifyFetchLeaderEpochOnFirstFetch$default$2() {
        return 1;
    }

    public void verifyOffsetRequestVersion(ApiVersion ibp, OffsetsForLeaderEpochRequest.Builder offsetForLeaderEpochRequest, ListOffsetsRequest.Builder listOffsetsRequest) {
        ApiVersion apiVersion = ibp;
        KAFKA_2_6_IV0$ kAFKA_2_6_IV0$ = KAFKA_2_6_IV0$.MODULE$;
        if (!(apiVersion != null ? !apiVersion.equals(kAFKA_2_6_IV0$) : kAFKA_2_6_IV0$ != null)) {
            Assertions.assertEquals((int)3, (int)offsetForLeaderEpochRequest.oldestAllowedVersion());
            Assertions.assertEquals((int)3, (int)offsetForLeaderEpochRequest.latestAllowedVersion());
            Assertions.assertEquals((int)0, (int)listOffsetsRequest.oldestAllowedVersion());
            Assertions.assertEquals((int)5, (int)listOffsetsRequest.latestAllowedVersion());
            return;
        }
        Assertions.assertEquals((short)ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), (short)offsetForLeaderEpochRequest.oldestAllowedVersion());
        Assertions.assertEquals((short)ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), (short)offsetForLeaderEpochRequest.latestAllowedVersion());
        Assertions.assertEquals((int)0, (int)listOffsetsRequest.oldestAllowedVersion());
        Assertions.assertEquals((short)ApiKeys.LIST_OFFSETS.latestVersion(), (short)listOffsetsRequest.latestAllowedVersion());
    }

    public void verifyMarkReplicaThrottle(ReplicaManager replicaManager, int times) {
        ((ReplicaManager)Mockito.verify((Object)replicaManager, (VerificationMode)Mockito.times((int)times))).markFollowerReplicaThrottle();
    }

    public int verifyMarkReplicaThrottle$default$2() {
        return 1;
    }

    @Test
    public void shouldThrottleFollowerReplica() {
        LazyRef Quota$module = new LazyRef();
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        props.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), KAFKA_2_6_IV0$.MODULE$.version());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)Mockito.mock(AbstractLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        int leaderEpoch = 5;
        Mockito.when((Object)partition.localLogOrException()).thenReturn((Object)log);
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)log.latestEpoch()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch)));
        Mockito.when((Object)log.endOffsetForEpoch(leaderEpoch)).thenReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch)));
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        Mockito.when((Object)replicaManager.replicaAlterLogDirsManager()).thenReturn((Object)replicaAlterLogDirsManager);
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        this.stub(partition, replicaManager, log);
        java.util.Map offsets = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setLeaderEpoch(leaderEpoch).setEndOffset(100L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(1).setLeaderEpoch(leaderEpoch).setEndOffset(1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), this.Quota$2(Quota$module), (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12(), this.createReplicaFetcherThread$default$13(), this.createReplicaFetcherThread$default$14());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3()))})));
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        Assertions.assertEquals((Object)new Some((Object)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{this.t1p1()}))), (Object)mockNetwork.lastFetchRequest().map((Function1 & Serializable & scala.Serializable)x$1 -> (scala.collection.mutable.Set)CollectionConverters$.MODULE$.asScalaSetConverter(x$1.fetchData().keySet()).asScala()));
        ((Partition)Mockito.verify((Object)partition, (VerificationMode)Mockito.times((int)2))).truncateTo(ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
        this.verifyMarkReplicaThrottle(replicaManager, 1);
    }

    @Test
    public void testFollowerIsThrottledOnLowDisk() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)Mockito.mock(AbstractLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        int leaderEpoch = 5;
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        Mockito.when((Object)replicaManager.replicaAlterLogDirsManager()).thenReturn((Object)replicaAlterLogDirsManager);
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        this.stub(partition, replicaManager, log);
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)quotaManager.isQuotaExceeded())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)quotaManager.isThrottled((TopicPartition)EasyMock.anyObject(TopicPartition.class)))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        AtomicReference<Some> retVal = new AtomicReference<Some>(new Some((Object)BoxesRunTime.boxToLong((long)42L)));
        Mockito.when((Object)quotaManager.lastSignalledQuotaOptRef()).thenReturn(retVal, (Object[])new AtomicReference[]{retVal, retVal, null});
        java.util.Map offsets = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(leaderEpoch).setEndOffset(100L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(leaderEpoch).setEndOffset(1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("audi", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quotaManager, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12(), this.createReplicaFetcherThread$default$13(), this.createReplicaFetcherThread$default$14());
        Map partitionMap = (Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)PartitionFetchState$.MODULE$.apply((Option)new Some((Object)this.topicId1()), 0L, (Option)new Some((Object)BoxesRunTime.boxToLong((long)0L)), leaderEpoch, (ReplicaState)Fetching$.MODULE$, (Option)None$.MODULE$, 0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)PartitionFetchState$.MODULE$.apply((Option)new Some((Object)this.topicId1()), 0L, (Option)None$.MODULE$, leaderEpoch, (ReplicaState)Fetching$.MODULE$, (Option)None$.MODULE$, 0))}));
        thread.buildFetch(partitionMap);
        DiskUsageBasedThrottler$.MODULE$.registerListener((DiskUsageBasedThrottleListener)quotaManager);
        thread.buildFetch(partitionMap);
        DiskUsageBasedThrottler$.MODULE$.deRegisterListener((DiskUsageBasedThrottleListener)quotaManager);
        thread.buildFetch(partitionMap);
        this.verifyMarkReplicaThrottle(replicaManager, 2);
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponse() {
        ArgumentCaptor truncateToCapture = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)Mockito.mock(AbstractLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        int leaderEpoch = 5;
        int initialLEO = 200;
        Mockito.when((Object)partition.localLogOrException()).thenReturn((Object)log);
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).thenReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 1)));
        Mockito.when((Object)log.latestEpoch()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch)));
        Mockito.when((Object)log.endOffsetForEpoch(leaderEpoch)).thenReturn((Object)new Some((Object)new OffsetAndEpoch((long)initialLEO, leaderEpoch)));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)initialLEO));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.localLogOrException((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)log);
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        Mockito.when((Object)replicaManager.replicaAlterLogDirsManager()).thenReturn((Object)replicaAlterLogDirsManager);
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when((Object)quota.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference<None$>(None$.MODULE$));
        this.stub(partition, replicaManager, log);
        java.util.Map offsetsReply = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), leaderEpoch, 156L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t2p1(), leaderEpoch, 172L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12(), this.createReplicaFetcherThread$default$13(), this.createReplicaFetcherThread$default$14());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId2()), 0L, this.initialFetchState$default$3()))})));
        thread.doWork();
        ((Partition)Mockito.verify((Object)partition, (VerificationMode)Mockito.times((int)2))).truncateTo(BoxesRunTime.unboxToLong((Object)truncateToCapture.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getAllValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)156)), (String)new StringBuilder(58).append("Expected ").append(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(truncateToCapture.getAllValues()).append(")").toString());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getAllValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)172)), (String)new StringBuilder(58).append("Expected ").append(this.t2p1()).append(" to truncate to offset 172 (truncation offsets: ").append(truncateToCapture.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponseIfFollowerHasNoMoreEpochs() {
        ArgumentCaptor truncateToCapture = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)Mockito.mock(AbstractLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        int leaderEpochAtFollower = 5;
        int leaderEpochAtLeader = 4;
        int initialLEO = 200;
        Mockito.when((Object)partition.localLogOrException()).thenReturn((Object)log);
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).thenReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 3)));
        Mockito.when((Object)log.latestEpoch()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpochAtFollower)));
        Mockito.when((Object)log.endOffsetForEpoch(leaderEpochAtLeader)).thenReturn((Object)None$.MODULE$);
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)initialLEO));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.localLogOrException((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)log);
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        Mockito.when((Object)replicaManager.replicaAlterLogDirsManager()).thenReturn((Object)replicaAlterLogDirsManager);
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when((Object)quota.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference<None$>(None$.MODULE$));
        this.stub(partition, replicaManager, log);
        java.util.Map offsetsReply = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), leaderEpochAtLeader, 156L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t2p1(), leaderEpochAtLeader, 202L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12(), this.createReplicaFetcherThread$default$13(), this.createReplicaFetcherThread$default$14());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId2()), 0L, this.initialFetchState$default$3()))})));
        thread.doWork();
        ((Partition)Mockito.verify((Object)partition, (VerificationMode)Mockito.times((int)2))).truncateTo(BoxesRunTime.unboxToLong((Object)truncateToCapture.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getAllValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)156)), (String)new StringBuilder(58).append("Expected ").append(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(truncateToCapture.getAllValues()).append(")").toString());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getAllValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)initialLEO)), (String)new StringBuilder(55).append("Expected ").append(this.t2p1()).append(" to truncate to offset ").append(initialLEO).append(" (truncation offsets: ").append(truncateToCapture.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower() {
        ArgumentCaptor truncateToCapture = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)Mockito.mock(AbstractLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        int initialLEO = 200;
        Mockito.when((Object)partition.localLogOrException()).thenReturn((Object)log);
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).thenReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 2)));
        Mockito.when((Object)log.latestEpoch()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5)));
        Mockito.when((Object)log.endOffsetForEpoch(4)).thenReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3)));
        Mockito.when((Object)log.endOffsetForEpoch(3)).thenReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3)));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)initialLEO));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.localLogOrException((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)log);
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        Mockito.when((Object)replicaManager.replicaAlterLogDirsManager()).thenReturn((Object)replicaAlterLogDirsManager);
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when((Object)quota.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference<None$>(None$.MODULE$));
        this.stub(partition, replicaManager, log);
        java.util.Map offsets = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), 4, 155L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), 4, 143L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12(), this.createReplicaFetcherThread$default$13(), this.createReplicaFetcherThread$default$14());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3()))})));
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)0, (int)mockNetwork.fetchCount());
        java.util.Map nextOffsets = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), 3, 101L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), 3, 102L))}))).asJava();
        mockNetwork.setOffsetsForNextResponse(nextOffsets);
        thread.doWork();
        Assertions.assertEquals((int)2, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        Assertions.assertTrue((mockNetwork.lastUsedOffsetForLeaderEpochVersion() >= 3 ? 1 : 0) != 0, (String)"OffsetsForLeaderEpochRequest version.");
        thread.doWork();
        Assertions.assertEquals((int)2, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)2, (int)mockNetwork.fetchCount());
        ((Partition)Mockito.verify((Object)partition, (VerificationMode)Mockito.times((int)4))).truncateTo(BoxesRunTime.unboxToLong((Object)truncateToCapture.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getAllValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)102)), (String)new StringBuilder(58).append("Expected ").append(this.t1p1()).append(" to truncate to offset 102 (truncation offsets: ").append(truncateToCapture.getAllValues()).append(")").toString());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getAllValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)101)), (String)new StringBuilder(58).append("Expected ").append(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 101 (truncation offsets: ").append(truncateToCapture.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower() {
        ArgumentCaptor truncateToCapture = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager quota = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)Mockito.mock(AbstractLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        int initialLEO = 200;
        ObjectRef latestLogEpoch = ObjectRef.create((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5)));
        Mockito.when((Object)partition.localLogOrException()).thenReturn((Object)log);
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).thenReturn((Object)BoxesRunTime.boxToLong((long)115L));
        Mockito.when((Object)log.latestEpoch()).thenAnswer(x$2 -> (Option)latestLogEpoch$1.elem);
        Mockito.when((Object)log.endOffsetForEpoch(4)).thenReturn((Object)new Some((Object)new OffsetAndEpoch(149L, 4)));
        Mockito.when((Object)log.endOffsetForEpoch(3)).thenReturn((Object)new Some((Object)new OffsetAndEpoch(129L, 2)));
        Mockito.when((Object)log.endOffsetForEpoch(2)).thenReturn((Object)new Some((Object)new OffsetAndEpoch(119L, 1)));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)initialLEO));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.localLogOrException((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)log);
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        Mockito.when((Object)replicaManager.replicaAlterLogDirsManager()).thenReturn((Object)replicaAlterLogDirsManager);
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when((Object)quota.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference<None$>(None$.MODULE$));
        this.stub(partition, replicaManager, log);
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(Collections.emptyMap(), this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = new ReplicaFetcherThread(this, config, replicaManager, quota, mockNetwork){

            public Option<LogAppendInfo> processPartitionData(TopicPartition topicPartition, long fetchOffset, FetchResponseData.PartitionData partitionData) {
                return None$.MODULE$;
            }
        };
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), initialLEO, this.initialFetchState$default$3())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), initialLEO, this.initialFetchState$default$3()))})));
        scala.collection.immutable.Set partitions = (scala.collection.immutable.Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), this.t1p1()}));
        thread.doWork();
        Assertions.assertEquals((int)0, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        partitions.foreach((Function1 & Serializable & scala.Serializable)tp -> {
            ReplicaFetcherThreadTest.$anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$2(thread, tp);
            return BoxedUnit.UNIT;
        });
        mockNetwork.setFetchPartitionDataForNextResponse((Map<TopicPartition, FetchResponseData.PartitionData>)((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)ReplicaFetcherThreadTest.partitionData$1(this.kafka$server$ReplicaFetcherThreadTest$$t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(140L))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)ReplicaFetcherThreadTest.partitionData$1(this.t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(141L)))}))));
        mockNetwork.setIdsForNextResponse(this.topicIds());
        latestLogEpoch.elem = new Some((Object)BoxesRunTime.boxToInteger((int)4));
        thread.doWork();
        Assertions.assertEquals((int)0, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)2, (int)mockNetwork.fetchCount());
        ((Partition)Mockito.verify((Object)partition, (VerificationMode)Mockito.times((int)2))).truncateTo(BoxesRunTime.unboxToLong((Object)truncateToCapture.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getAllValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)140)), (String)new StringBuilder(58).append("Expected ").append(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 140 (truncation offsets: ").append(truncateToCapture.getAllValues()).append(")").toString());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getAllValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)141)), (String)new StringBuilder(58).append("Expected ").append(this.t1p1()).append(" to truncate to offset 141 (truncation offsets: ").append(truncateToCapture.getAllValues()).append(")").toString());
        partitions.foreach((Function1 & Serializable & scala.Serializable)tp -> {
            ReplicaFetcherThreadTest.$anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$3(thread, tp);
            return BoxedUnit.UNIT;
        });
        mockNetwork.setFetchPartitionDataForNextResponse((Map<TopicPartition, FetchResponseData.PartitionData>)((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)ReplicaFetcherThreadTest.partitionData$1(this.kafka$server$ReplicaFetcherThreadTest$$t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(130L))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)ReplicaFetcherThreadTest.partitionData$1(this.t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(131L)))}))));
        mockNetwork.setIdsForNextResponse(this.topicIds());
        thread.doWork();
        Assertions.assertEquals((int)0, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)3, (int)mockNetwork.fetchCount());
        ((Partition)Mockito.verify((Object)partition, (VerificationMode)Mockito.times((int)4))).truncateTo(BoxesRunTime.unboxToLong((Object)truncateToCapture.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getAllValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)129)), (String)new StringBuilder(57).append("Expected to truncate to offset 129 (truncation offsets: ").append(truncateToCapture.getAllValues()).append(")").toString());
        partitions.foreach((Function1 & Serializable & scala.Serializable)tp -> {
            ReplicaFetcherThreadTest.$anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$4(thread, tp);
            return BoxedUnit.UNIT;
        });
        mockNetwork.setFetchPartitionDataForNextResponse((Map<TopicPartition, FetchResponseData.PartitionData>)((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)ReplicaFetcherThreadTest.partitionData$1(this.kafka$server$ReplicaFetcherThreadTest$$t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(120L))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)ReplicaFetcherThreadTest.partitionData$1(this.t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(121L)))}))));
        mockNetwork.setIdsForNextResponse(this.topicIds());
        latestLogEpoch.elem = None$.MODULE$;
        thread.doWork();
        Assertions.assertEquals((int)0, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)4, (int)mockNetwork.fetchCount());
        ((Partition)Mockito.verify((Object)partition, (VerificationMode)Mockito.times((int)6))).truncateTo(BoxesRunTime.unboxToLong((Object)truncateToCapture.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getAllValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)119)), (String)new StringBuilder(57).append("Expected to truncate to offset 119 (truncation offsets: ").append(truncateToCapture.getAllValues()).append(")").toString());
        partitions.foreach((Function1 & Serializable & scala.Serializable)tp -> {
            ReplicaFetcherThreadTest.$anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(thread, tp);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        ArgumentCaptor truncateToCapture = ArgumentCaptor.forClass(Long.TYPE);
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        props.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.11.0");
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        ReplicationQuotaManager quota = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)Mockito.mock(AbstractLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        int initialLEO = 200;
        Mockito.when((Object)partition.localLogOrException()).thenReturn((Object)log);
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).thenReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 2)));
        Mockito.when((Object)log.latestEpoch()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5)));
        Mockito.when((Object)log.endOffsetForEpoch(4)).thenReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3)));
        Mockito.when((Object)log.endOffsetForEpoch(3)).thenReturn((Object)new Some((Object)new OffsetAndEpoch(120L, 3)));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)initialLEO));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.localLogOrException((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)log);
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        Mockito.when((Object)replicaManager.replicaAlterLogDirsManager()).thenReturn((Object)replicaAlterLogDirsManager);
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when((Object)quota.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference<None$>(None$.MODULE$));
        this.stub(partition, replicaManager, log);
        java.util.Map offsets = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), -1, 155L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), -1, 143L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12(), this.createReplicaFetcherThread$default$13(), this.createReplicaFetcherThread$default$14());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3()))})));
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)1, (int)mockNetwork.fetchCount());
        Assertions.assertEquals((int)0, (int)mockNetwork.lastUsedOffsetForLeaderEpochVersion(), (String)"OffsetsForLeaderEpochRequest version.");
        thread.doWork();
        Assertions.assertEquals((int)1, (int)mockNetwork.epochFetchCount());
        Assertions.assertEquals((int)2, (int)mockNetwork.fetchCount());
        ((Partition)Mockito.verify((Object)partition, (VerificationMode)Mockito.times((int)2))).truncateTo(BoxesRunTime.unboxToLong((Object)truncateToCapture.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getAllValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)155)), (String)new StringBuilder(58).append("Expected ").append(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).append(" to truncate to offset 155 (truncation offsets: ").append(truncateToCapture.getAllValues()).append(")").toString());
        Assertions.assertTrue((boolean)((SeqLike)CollectionConverters$.MODULE$.asScalaBufferConverter(truncateToCapture.getAllValues()).asScala()).contains((Object)BoxesRunTime.boxToInteger((int)143)), (String)new StringBuilder(58).append("Expected ").append(this.t1p1()).append(" to truncate to offset 143 (truncation offsets: ").append(truncateToCapture.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfLeaderReturnsUndefinedOffset() {
        ArgumentCaptor truncated = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)Mockito.mock(AbstractLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        int initialFetchOffset = 100;
        Mockito.when((Object)partition.localLogOrException()).thenReturn((Object)log);
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).thenReturn((Object)BoxesRunTime.boxToLong((long)initialFetchOffset));
        Mockito.when((Object)log.latestEpoch()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5)));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        Mockito.when((Object)replicaManager.replicaAlterLogDirsManager()).thenReturn((Object)replicaAlterLogDirsManager);
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when((Object)quota.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference<None$>(None$.MODULE$));
        this.stub(partition, replicaManager, log);
        java.util.Map offsetsReply = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), -1, -1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12(), this.createReplicaFetcherThread$default$13(), this.createReplicaFetcherThread$default$14());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), initialFetchOffset, this.initialFetchState$default$3()))})));
        thread.doWork();
        ((Partition)Mockito.verify((Object)partition)).truncateTo(BoxesRunTime.unboxToLong((Object)truncated.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals((long)initialFetchOffset, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldPollIndefinitelyIfLeaderReturnsAnyException() {
        ArgumentCaptor truncated = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)Mockito.mock(AbstractLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        int leaderEpoch = 5;
        int highWatermark = 100;
        int initialLeo = 300;
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).thenReturn((Object)BoxesRunTime.boxToLong((long)highWatermark));
        Mockito.when((Object)partition.localLogOrException()).thenReturn((Object)log);
        Mockito.when((Object)log.latestEpoch()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch)));
        Mockito.when((Object)log.endOffsetForEpoch(leaderEpoch)).thenReturn((Object)new Some((Object)new OffsetAndEpoch((long)initialLeo, leaderEpoch)));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)initialLeo));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.localLogOrException((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)log);
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        Mockito.when((Object)replicaManager.replicaAlterLogDirsManager()).thenReturn((Object)replicaAlterLogDirsManager);
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when((Object)quota.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference<None$>(None$.MODULE$));
        this.stub(partition, replicaManager, log);
        java.util.Map offsetsReply = (java.util.Map)CollectionConverters$.MODULE$.mutableMapAsJavaMapConverter((scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), Errors.NOT_LEADER_OR_FOLLOWER, -1, -1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), (Option<LogContext>)None$.MODULE$, (Option<Object>)new Some((Object)BoxesRunTime.boxToLong((long)100L)), this.createReplicaFetcherThread$default$14());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3()))})));
        int[] expectedBackoffTimes = new int[]{100, 200, 400, 800};
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)x -> {
            thread.doWork();
            Assertions.assertEquals((long)expectedBackoffTimes[x], (long)((DelayedItem)((PartitionFetchState)thread.fetchState(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).get()).delay().get()).delayMs());
        });
        ((Partition)Mockito.verify((Object)partition, (VerificationMode)Mockito.never())).truncateTo(ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
        offsetsReply.put(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), leaderEpoch, 156L));
        thread.doWork();
        ((Partition)Mockito.verify((Object)partition)).truncateTo(BoxesRunTime.unboxToLong((Object)truncated.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals((long)156L, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldMovePartitionsOutOfTruncatingLogState() {
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager quota = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)Mockito.mock(AbstractLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        int leaderEpoch = 4;
        Mockito.when((Object)partition.localLogOrException()).thenReturn((Object)log);
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)log.latestEpoch()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch)));
        Mockito.when((Object)log.endOffsetForEpoch(leaderEpoch)).thenReturn((Object)new Some((Object)new OffsetAndEpoch(0L, leaderEpoch)));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        Mockito.when((Object)replicaManager.replicaAlterLogDirsManager()).thenReturn((Object)replicaAlterLogDirsManager);
        Mockito.when((Object)quota.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference<None$>(None$.MODULE$));
        this.stub(partition, replicaManager, log);
        java.util.Map offsetsReply = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), leaderEpoch, 1L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), leaderEpoch, 1L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12(), this.createReplicaFetcherThread$default$13(), this.createReplicaFetcherThread$default$14());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3()))})));
        Assertions.assertEquals((Object)Option$.MODULE$.apply((Object)Truncating$.MODULE$), (Object)thread.fetchState(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).map((Function1 & Serializable & scala.Serializable)x$3 -> x$3.state()));
        Assertions.assertEquals((Object)Option$.MODULE$.apply((Object)Truncating$.MODULE$), (Object)thread.fetchState(this.t1p1()).map((Function1 & Serializable & scala.Serializable)x$4 -> x$4.state()));
        thread.doWork();
        Assertions.assertEquals((Object)Option$.MODULE$.apply((Object)Fetching$.MODULE$), (Object)thread.fetchState(this.kafka$server$ReplicaFetcherThreadTest$$t1p0()).map((Function1 & Serializable & scala.Serializable)x$5 -> x$5.state()));
        Assertions.assertEquals((Object)Option$.MODULE$.apply((Object)Fetching$.MODULE$), (Object)thread.fetchState(this.t1p1()).map((Function1 & Serializable & scala.Serializable)x$6 -> x$6.state()));
        ((Partition)Mockito.verify((Object)partition, (VerificationMode)Mockito.times((int)2))).truncateTo(0L, false);
    }

    @Test
    public void shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest() {
        KafkaConfig config = this.kafkaConfigNoTruncateOnFetch();
        ArgumentCaptor truncateToCapture = ArgumentCaptor.forClass(Long.TYPE);
        int initialLEO = 100;
        ReplicationQuotaManager quota = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager)Mockito.mock(ReplicaAlterLogDirsManager.class);
        AbstractLog log = (AbstractLog)Mockito.mock(AbstractLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Mockito.when((Object)partition.localLogOrException()).thenReturn((Object)log);
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.highWatermark())).thenReturn((Object)BoxesRunTime.boxToLong((long)(initialLEO - 2)));
        Mockito.when((Object)log.latestEpoch()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)5)));
        Mockito.when((Object)log.endOffsetForEpoch(5)).thenReturn((Object)new Some((Object)new OffsetAndEpoch((long)initialLEO, 5)));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)initialLEO));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.localLogOrException((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)log);
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        Mockito.when((Object)replicaManager.replicaAlterLogDirsManager()).thenReturn((Object)replicaAlterLogDirsManager);
        Mockito.when((Object)quota.lastSignalledQuotaOptRef()).thenReturn(new AtomicReference<None$>(None$.MODULE$));
        this.stub(partition, replicaManager, log);
        java.util.Map offsetsReply = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.newOffsetForLeaderPartitionResult(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), 5, 52L)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.newOffsetForLeaderPartitionResult(this.t1p1(), 5, 49L))}))).asJava();
        ReplicaFetcherMockBlockingSend mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, this.brokerEndPoint(), (Time)new SystemTime());
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), (ReplicaQuota)quota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockNetwork), this.createReplicaFetcherThread$default$12(), this.createReplicaFetcherThread$default$13(), this.createReplicaFetcherThread$default$14());
        thread.addPartitions((Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState((Option<Uuid>)new Some((Object)this.topicId1()), 0L, this.initialFetchState$default$3()))})));
        TopicPartition partitionThatBecameLeader = this.kafka$server$ReplicaFetcherThreadTest$$t1p0();
        mockNetwork.setEpochRequestCallback((Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> thread.removePartitions((Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{partitionThatBecameLeader}))));
        thread.doWork();
        ((Partition)Mockito.verify((Object)partition)).truncateTo(BoxesRunTime.unboxToLong((Object)truncateToCapture.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals((long)49L, (long)BoxesRunTime.unboxToLong((Object)truncateToCapture.getValue()));
    }

    @Test
    public void shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        BlockingSend mockBlockingSend = (BlockingSend)Mockito.mock(BlockingSend.class);
        mockBlockingSend.initiateClose();
        Mockito.when((Object)BoxedUnit.UNIT).thenThrow(new Throwable[]{new IllegalArgumentException()});
        mockBlockingSend.close();
        Mockito.when((Object)BoxedUnit.UNIT).thenThrow(new Throwable[]{new IllegalStateException()});
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        String x$1 = "bob";
        int x$2 = 0;
        BrokerEndPoint x$3 = this.brokerEndPoint();
        FailedPartitions x$5 = this.failedPartitions();
        None$ x$7 = None$.MODULE$;
        Metrics x$8 = new Metrics();
        SystemTime x$9 = new SystemTime();
        Some x$11 = new Some((Object)mockBlockingSend);
        Option<LogContext> x$12 = this.createReplicaFetcherThread$default$12();
        Option<Object> x$13 = this.createReplicaFetcherThread$default$13();
        Option<Object> x$14 = this.createReplicaFetcherThread$default$14();
        ReplicaFetcherThread thread = this.createReplicaFetcherThread(x$1, x$2, x$3, config, x$5, replicaManager, x$8, (Time)x$9, null, (Option<TierStateFetcher>)x$7, (Option<BlockingSend>)x$11, x$12, x$13, x$14);
        thread.start();
        thread.initiateShutdown();
        thread.awaitShutdown();
        ((BlockingSend)Mockito.verify((Object)mockBlockingSend)).initiateClose();
        ((BlockingSend)Mockito.verify((Object)mockBlockingSend)).close();
    }

    @Test
    public void shouldUpdateReassignmentBytesInMetrics() {
        this.assertProcessPartitionDataWhen(true);
    }

    @Test
    public void shouldNotUpdateReassignmentBytesInMetricsWhenNoReassignmentsInProgress() {
        this.assertProcessPartitionDataWhen(false);
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testBuildFetch() {
        void var22_22;
        void var14_14;
        TopicIdPartition tid1p0 = new TopicIdPartition(this.topicId1(), this.kafka$server$ReplicaFetcherThreadTest$$t1p0());
        TopicIdPartition tid1p1 = new TopicIdPartition(this.topicId1(), this.t1p1());
        TopicIdPartition tid2p1 = new TopicIdPartition(this.topicId2(), this.t2p1());
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        BlockingSend mockBlockingSend = (BlockingSend)Mockito.mock(BlockingSend.class);
        ReplicaQuota replicaQuota = (ReplicaQuota)Mockito.mock(ReplicaQuota.class);
        AbstractLog log = (AbstractLog)Mockito.mock(AbstractLog.class);
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when((Object)replicaManager.localLogOrException((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)log);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaQuota.isThrottled((TopicPartition)ArgumentMatchers.any()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)false));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)log.logStartOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        ReplicaFetcherThread thread = new ReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), replicaQuota, (Option)new Some((Object)mockBlockingSend), ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$11(), ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$12(), ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$13(), ReplicaFetcherThread$.MODULE$.$lessinit$greater$default$14());
        int leaderEpoch = 1;
        Map partitionMap = (Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.kafka$server$ReplicaFetcherThreadTest$$t1p0()), (Object)new PartitionFetchState((Option)new Some((Object)this.topicId1()), 150L, (Option)None$.MODULE$, leaderEpoch, (Option)None$.MODULE$, (ReplicaState)Fetching$.MODULE$, (Option)None$.MODULE$, PartitionFetchState$.MODULE$.apply$default$8())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new PartitionFetchState((Option)new Some((Object)this.topicId1()), 155L, (Option)None$.MODULE$, leaderEpoch, (Option)None$.MODULE$, (ReplicaState)Fetching$.MODULE$, (Option)None$.MODULE$, PartitionFetchState$.MODULE$.apply$default$8())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)new PartitionFetchState((Option)new Some((Object)this.topicId2()), 160L, (Option)None$.MODULE$, leaderEpoch, (Option)None$.MODULE$, (ReplicaState)Fetching$.MODULE$, (Option)None$.MODULE$, PartitionFetchState$.MODULE$.apply$default$8()))}));
        AbstractFetcherThread.ResultWithPartitions resultWithPartitions = thread.buildFetch(partitionMap);
        if (resultWithPartitions == null) {
            throw new MatchError(null);
        }
        Option fetchRequestOpt = (Option)resultWithPartitions.result();
        Assertions.assertTrue((boolean)var14_14.isDefined());
        FetchRequest.Builder fetchRequestBuilder = ((AbstractFetcherThread.ReplicaFetch)var14_14.get()).fetchRequest();
        Map partitionDataMap = (Map)partitionMap.map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            if (x0$1 == null) {
                throw new MatchError(null);
            }
            TopicPartition tp = (TopicPartition)x0$1._1();
            PartitionFetchState state = (PartitionFetchState)x0$1._2();
            Tuple2 tuple2 = new Tuple2((Object)tp, (Object)new FetchRequest.PartitionData((Uuid)state.topicId().get(), state.fetchOffset(), 0L, Predef$.MODULE$.Integer2int(config.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(state.currentLeaderEpoch())), Optional.empty()));
            return tuple2;
        }, scala.collection.Map$.MODULE$.canBuildFrom());
        Assertions.assertEquals((Object)CollectionConverters$.MODULE$.mapAsJavaMapConverter(partitionDataMap).asJava(), (Object)fetchRequestBuilder.fetchData());
        Assertions.assertEquals((int)0, (int)fetchRequestBuilder.replaced().size());
        Assertions.assertEquals((int)0, (int)fetchRequestBuilder.removed().size());
        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> responseData = new LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>();
        responseData.put(tid1p0, new FetchResponseData.PartitionData());
        responseData.put(tid1p1, new FetchResponseData.PartitionData());
        responseData.put(tid2p1, new FetchResponseData.PartitionData());
        FetchResponse fetchResponse = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, responseData);
        thread.fetchSessionHandler().handleResponse(fetchResponse, ApiKeys.FETCH.latestVersion());
        Uuid newTopicId = Uuid.randomUuid();
        Map partitionMap2 = (Map)scala.collection.Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new PartitionFetchState((Option)new Some((Object)this.topicId1()), 155L, (Option)None$.MODULE$, leaderEpoch, (Option)None$.MODULE$, (ReplicaState)Fetching$.MODULE$, (Option)None$.MODULE$, PartitionFetchState$.MODULE$.apply$default$8())), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t2p1()), (Object)new PartitionFetchState((Option)new Some((Object)newTopicId), 160L, (Option)None$.MODULE$, leaderEpoch, (Option)None$.MODULE$, (ReplicaState)Fetching$.MODULE$, (Option)None$.MODULE$, PartitionFetchState$.MODULE$.apply$default$8()))}));
        AbstractFetcherThread.ResultWithPartitions resultWithPartitions2 = thread.buildFetch(partitionMap2);
        if (resultWithPartitions2 == null) {
            throw new MatchError(null);
        }
        Option fetchRequestOpt2 = (Option)resultWithPartitions2.result();
        Map partitionDataMap2 = (Map)((TraversableLike)partitionMap2.drop(1)).map((Function1 & Serializable & scala.Serializable)x0$2 -> {
            if (x0$2 == null) {
                throw new MatchError(null);
            }
            TopicPartition tp = (TopicPartition)x0$2._1();
            PartitionFetchState state = (PartitionFetchState)x0$2._2();
            Tuple2 tuple2 = new Tuple2((Object)tp, (Object)new FetchRequest.PartitionData((Uuid)state.topicId().get(), state.fetchOffset(), 0L, Predef$.MODULE$.Integer2int(config.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(state.currentLeaderEpoch())), Optional.empty()));
            return tuple2;
        }, scala.collection.Map$.MODULE$.canBuildFrom());
        Assertions.assertTrue((boolean)var22_22.isDefined());
        FetchRequest.Builder fetchRequestBuilder2 = ((AbstractFetcherThread.ReplicaFetch)var22_22.get()).fetchRequest();
        Assertions.assertEquals((Object)CollectionConverters$.MODULE$.mapAsJavaMapConverter(partitionDataMap2).asJava(), (Object)fetchRequestBuilder2.fetchData());
        Assertions.assertEquals(Collections.singletonList(tid2p1), (Object)fetchRequestBuilder2.replaced());
        Assertions.assertEquals(Collections.singletonList(tid1p0), (Object)fetchRequestBuilder2.removed());
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition tp, int leaderEpoch, long endOffset) {
        return this.newOffsetForLeaderPartitionResult(tp, Errors.NONE, leaderEpoch, endOffset);
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition tp, Errors error, int leaderEpoch, long endOffset) {
        return new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(tp.partition()).setErrorCode(error.code()).setLeaderEpoch(leaderEpoch).setEndOffset(endOffset);
    }

    private void assertProcessPartitionDataWhen(boolean isReassigning) {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(props);
        BlockingSend mockBlockingSend = (BlockingSend)Mockito.mock(BlockingSend.class);
        AbstractLog log = (AbstractLog)Mockito.mock(AbstractLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        Mockito.when((Object)partition.localLogOrException()).thenReturn((Object)log);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)partition.isReassigning())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)isReassigning));
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)partition.isAddingLocalReplica())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)isReassigning));
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Mockito.when((Object)replicaManager.getPartitionOrException((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)partition);
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
        Mockito.when((Object)replicaManager.brokerTopicStats()).thenReturn((Object)brokerTopicStats);
        ReplicaQuota replicaQuota = (ReplicaQuota)Mockito.mock(ReplicaQuota.class);
        ReplicaFetcherThread thread = this.createReplicaFetcherThread("bob", 0, this.brokerEndPoint(), config, this.failedPartitions(), replicaManager, new Metrics(), (Time)new SystemTime(), replicaQuota, (Option<TierStateFetcher>)None$.MODULE$, (Option<BlockingSend>)new Some((Object)mockBlockingSend), this.createReplicaFetcherThread$default$12(), this.createReplicaFetcherThread$default$13(), this.createReplicaFetcherThread$default$14());
        MemoryRecords records = MemoryRecords.withRecords((CompressionType)CompressionType.NONE, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))});
        FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setPartitionIndex(this.kafka$server$ReplicaFetcherThreadTest$$t1p0().partition()).setLastStableOffset(0L).setLogStartOffset(0L).setRecords((BaseRecords)records);
        thread.processPartitionData(this.kafka$server$ReplicaFetcherThreadTest$$t1p0(), 0L, partitionData);
        if (isReassigning) {
            Assertions.assertEquals((long)records.sizeInBytes(), (long)((Meter)brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        } else {
            Assertions.assertEquals((long)0L, (long)((Meter)brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        }
        Assertions.assertEquals((long)records.sizeInBytes(), (long)((Meter)brokerTopicStats.allTopicsStats().replicationBytesInRate().get()).count());
    }

    public void stub(Partition partition, ReplicaManager replicaManager, AbstractLog log) {
        Mockito.when((Object)replicaManager.localLogOrException(this.kafka$server$ReplicaFetcherThreadTest$$t1p0())).thenReturn((Object)log);
        Mockito.when((Object)replicaManager.getPartitionOrException(this.kafka$server$ReplicaFetcherThreadTest$$t1p0())).thenReturn((Object)partition);
        Mockito.when((Object)replicaManager.localLogOrException(this.t1p1())).thenReturn((Object)log);
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p1())).thenReturn((Object)partition);
        Mockito.when((Object)replicaManager.localLogOrException(this.t2p1())).thenReturn((Object)log);
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t2p1())).thenReturn((Object)partition);
        Mockito.when((Object)partition.getLinkedLeaderEpoch()).thenReturn((Object)None$.MODULE$);
    }

    private KafkaConfig kafkaConfigNoTruncateOnFetch() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        props.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), KAFKA_2_6_IV0$.MODULE$.version());
        return KafkaConfig$.MODULE$.fromProps(props);
    }

    public static final /* synthetic */ void $anonfun$assertPartitionStates$1(AbstractFetcherThread fetcher$1, boolean shouldBeReadyForFetch$1, boolean shouldBeTruncatingLog$1, boolean shouldBeDelayed$1, TopicPartition tp) {
        Assertions.assertTrue((boolean)fetcher$1.fetchState(tp).isDefined());
        PartitionFetchState fetchState = (PartitionFetchState)fetcher$1.fetchState(tp).get();
        Assertions.assertEquals((Object)BoxesRunTime.boxToBoolean((boolean)shouldBeReadyForFetch$1), (Object)BoxesRunTime.boxToBoolean((boolean)fetchState.isReadyForFetch()), (String)new StringBuilder(39).append("Partition ").append(tp).append(" should").append((Object)(!shouldBeReadyForFetch$1 ? " NOT" : "")).append(" be ready for fetching").toString());
        Assertions.assertEquals((Object)BoxesRunTime.boxToBoolean((boolean)shouldBeTruncatingLog$1), (Object)BoxesRunTime.boxToBoolean((boolean)fetchState.isTruncating()), (String)new StringBuilder(39).append("Partition ").append(tp).append(" should").append((Object)(!shouldBeTruncatingLog$1 ? " NOT" : "")).append(" be truncating its log").toString());
        Assertions.assertEquals((Object)BoxesRunTime.boxToBoolean((boolean)shouldBeDelayed$1), (Object)BoxesRunTime.boxToBoolean((boolean)fetchState.isDelayed()), (String)new StringBuilder(28).append("Partition ").append(tp).append(" should").append((Object)(!shouldBeDelayed$1 ? " NOT" : "")).append(" be delayed").toString());
    }

    private final /* synthetic */ ReplicaFetcherThreadTest$Quota$1$ Quota$lzycompute$1(LazyRef Quota$module$1) {
        synchronized (Quota$module$1) {
            ReplicaFetcherThreadTest$Quota$1$ replicaFetcherThreadTest$Quota$1$ = Quota$module$1.initialized() ? (ReplicaFetcherThreadTest$Quota$1$)Quota$module$1.value() : (ReplicaFetcherThreadTest$Quota$1$)Quota$module$1.initialize((Object)new ReplicaFetcherThreadTest$Quota$1$(this));
            return replicaFetcherThreadTest$Quota$1$;
        }
    }

    private final ReplicaFetcherThreadTest$Quota$1$ Quota$2(LazyRef Quota$module$1) {
        if (Quota$module$1.initialized()) {
            return (ReplicaFetcherThreadTest$Quota$1$)Quota$module$1.value();
        }
        return this.Quota$lzycompute$1(Quota$module$1);
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$2(ReplicaFetcherThread thread$1, TopicPartition tp) {
        Assertions.assertEquals((Object)Fetching$.MODULE$, (Object)((PartitionFetchState)thread$1.fetchState(tp).get()).state());
    }

    private static final FetchResponseData.PartitionData partitionData$1(int partition, FetchResponseData.EpochEndOffset divergingEpoch) {
        return new FetchResponseData.PartitionData().setPartitionIndex(partition).setLastStableOffset(0L).setLogStartOffset(0L).setDivergingEpoch(divergingEpoch);
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$3(ReplicaFetcherThread thread$1, TopicPartition tp) {
        Assertions.assertEquals((Object)Fetching$.MODULE$, (Object)((PartitionFetchState)thread$1.fetchState(tp).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$4(ReplicaFetcherThread thread$1, TopicPartition tp) {
        Assertions.assertEquals((Object)Fetching$.MODULE$, (Object)((PartitionFetchState)thread$1.fetchState(tp).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(ReplicaFetcherThread thread$1, TopicPartition tp) {
        Assertions.assertEquals((Object)Fetching$.MODULE$, (Object)((PartitionFetchState)thread$1.fetchState(tp).get()).state());
    }

    public ReplicaFetcherThreadTest() {
        this.metadataCache().updateMetadata(0, this.updateMetadataRequest());
    }
}

