package kafka.server;

import com.yammer.metrics.core.Meter;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.LogManager;
import kafka.log.UnifiedLog;
import kafka.server.AbstractFetcherThread;
import kafka.server.epoch.util.MockBlockingSender;
import kafka.server.metadata.ZkMetadataCache;
import kafka.server.metadata.ZkMetadataCache$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordConversionStats;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.storage.internals.log.LogAppendInfo;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.SeqLike;
import scala.collection.TraversableLike;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

/* compiled from: ReplicaFetcherThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\rub\u0001\u0002\u001e<\u0001\u0001CQa\u0012\u0001\u0005\u0002!Cqa\u0013\u0001C\u0002\u0013%A\n\u0003\u0004Y\u0001\u0001\u0006I!\u0014\u0005\b3\u0002\u0011\r\u0011\"\u0003M\u0011\u0019Q\u0006\u0001)A\u0005\u001b\"91\f\u0001b\u0001\n\u0013a\u0005B\u0002/\u0001A\u0003%Q\nC\u0004^\u0001\t\u0007I\u0011\u00020\t\r\t\u0004\u0001\u0015!\u0003`\u0011\u001d\u0019\u0007A1A\u0005\nyCa\u0001\u001a\u0001!\u0002\u0013y\u0006bB3\u0001\u0005\u0004%IA\u001a\u0005\u0007k\u0002\u0001\u000b\u0011B4\t\u000fY\u0004!\u0019!C\u0005o\"1a\u0010\u0001Q\u0001\naD\u0001b \u0001C\u0002\u0013%\u0011\u0011\u0001\u0005\t\u0003\u0013\u0001\u0001\u0015!\u0003\u0002\u0004!I\u00111\u0002\u0001C\u0002\u0013%\u0011Q\u0002\u0005\t\u0003_\u0001\u0001\u0015!\u0003\u0002\u0010!I\u0011\u0011\u0007\u0001C\u0002\u0013%\u00111\u0007\u0005\t\u0003\u0003\u0002\u0001\u0015!\u0003\u00026!I\u00111\t\u0001A\u0002\u0013%\u0011Q\t\u0005\n\u0003'\u0002\u0001\u0019!C\u0005\u0003+B\u0001\"!\u0019\u0001A\u0003&\u0011q\t\u0005\b\u0003G\u0002A\u0011BA3\u0011%\tY\tAI\u0001\n\u0013\ti\tC\u0004\u0002$\u0002!\t!!*\t\u000f\u0005u\u0006\u0001\"\u0003\u0002@\"9!Q\u0002\u0001\u0005\u0002\u0005\u0015\u0006b\u0002B\f\u0001\u0011\u0005\u0011Q\u0015\u0005\b\u00057\u0001A\u0011\u0001B\u000f\u0011\u001d\u0011Y\u0004\u0001C\u0001\u0003KCqAa\u0010\u0001\t\u0003\t)\u000bC\u0004\u0003D\u0001!\t!!*\t\u000f\t\u001d\u0003\u0001\"\u0003\u0003J!I!q\f\u0001\u0012\u0002\u0013%\u0011Q\u0012\u0005\b\u0005C\u0002A\u0011AAS\u0011\u001d\u0011)\u0007\u0001C\u0001\u0003KCqA!\u001b\u0001\t\u0003\t)\u000bC\u0004\u0003n\u0001!\t!!*\t\u000f\tE\u0004\u0001\"\u0001\u0002&\"9!Q\u000f\u0001\u0005\u0002\u0005\u0015\u0006b\u0002B=\u0001\u0011\u0005\u0011Q\u0015\u0005\b\u0005{\u0002A\u0011AAS\u0011\u001d\u0011\t\t\u0001C\u0001\u0003KCqA!\"\u0001\t\u0003\t)\u000bC\u0004\u0003\n\u0002!\t!!*\t\u000f\t5\u0005\u0001\"\u0001\u0002&\"9!\u0011\u0013\u0001\u0005\u0002\u0005\u0015\u0006b\u0002BK\u0001\u0011\u0005\u0011Q\u0015\u0005\b\u00053\u0003A\u0011AAS\u0011\u001d\u0011i\n\u0001C\u0001\u0005?CqA!3\u0001\t\u0013\u0011Y\rC\u0004\u0003J\u0002!IA!?\t\u000f\rE\u0001\u0001\"\u0003\u0004\u0014!91\u0011\u0004\u0001\u0005\u0002\rm\u0001bBB\u001d\u0001\u0011%11\b\u0002\u0019%\u0016\u0004H.[2b\r\u0016$8\r[3s)\"\u0014X-\u00193UKN$(B\u0001\u001f>\u0003\u0019\u0019XM\u001d<fe*\ta(A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001\t\u0005C\u0001\"F\u001b\u0005\u0019%\"\u0001#\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0019\u001b%AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002\u0013B\u0011!\nA\u0007\u0002w\u0005!A/\r91+\u0005i\u0005C\u0001(W\u001b\u0005y%B\u0001)R\u0003\u0019\u0019w.\\7p]*\u0011aH\u0015\u0006\u0003'R\u000ba!\u00199bG\",'\"A+\u0002\u0007=\u0014x-\u0003\u0002X\u001f\nqAk\u001c9jGB\u000b'\u000f^5uS>t\u0017!\u0002;2aB\u0002\u0013\u0001\u0002;2aF\nQ\u0001^\u0019qc\u0001\nA\u0001\u001e\u001aqc\u0005)AO\r92A\u0005AAo\u001c9jG&#\u0017'F\u0001`!\tq\u0005-\u0003\u0002b\u001f\n!Q+^5e\u0003%!x\u000e]5d\u0013\u0012\f\u0004%\u0001\u0005u_BL7-\u001333\u0003%!x\u000e]5d\u0013\u0012\u0014\u0004%\u0001\u0005u_BL7-\u00133t+\u00059\u0007\u0003\u00025l[~k\u0011!\u001b\u0006\u0003U\u000e\u000b!bY8mY\u0016\u001cG/[8o\u0013\ta\u0017NA\u0002NCB\u0004\"A\\:\u000e\u0003=T!\u0001]9\u0002\t1\fgn\u001a\u0006\u0002e\u0006!!.\u0019<b\u0013\t!xN\u0001\u0004TiJLgnZ\u0001\ni>\u0004\u0018nY%eg\u0002\naB\u0019:pW\u0016\u0014XI\u001c3Q_&tG/F\u0001y!\tIH0D\u0001{\u0015\tYX(A\u0004dYV\u001cH/\u001a:\n\u0005uT(A\u0004\"s_.,'/\u00128e!>Lg\u000e^\u0001\u0010EJ|7.\u001a:F]\u0012\u0004v.\u001b8uA\u0005\u0001b-Y5mK\u0012\u0004\u0016M\u001d;ji&|gn]\u000b\u0003\u0003\u0007\u00012ASA\u0003\u0013\r\t9a\u000f\u0002\u0011\r\u0006LG.\u001a3QCJ$\u0018\u000e^5p]N\f\u0011CZ1jY\u0016$\u0007+\u0019:uSRLwN\\:!\u0003=\u0001\u0018M\u001d;ji&|gn\u0015;bi\u0016\u001cXCAA\b!\u0019\t\t\"a\u0006\u0002\u001c5\u0011\u00111\u0003\u0006\u0004\u0003+\t\u0018\u0001B;uS2LA!!\u0007\u0002\u0014\t!A*[:u!\u0011\ti\"!\u000b\u000f\t\u0005}\u0011QE\u0007\u0003\u0003CQ1!a\tP\u0003\u001diWm]:bO\u0016LA!a\n\u0002\"\u0005IR\u000b\u001d3bi\u0016lU\r^1eCR\f'+Z9vKN$H)\u0019;b\u0013\u0011\tY#!\f\u00039U\u0003H-\u0019;f\u001b\u0016$\u0018\rZ1uCB\u000b'\u000f^5uS>t7\u000b^1uK*!\u0011qEA\u0011\u0003A\u0001\u0018M\u001d;ji&|gn\u0015;bi\u0016\u001c\b%A\u000bva\u0012\fG/Z'fi\u0006$\u0017\r^1SKF,Xm\u001d;\u0016\u0005\u0005U\u0002\u0003BA\u001c\u0003{i!!!\u000f\u000b\u0007\u0005mr*\u0001\u0005sKF,Xm\u001d;t\u0013\u0011\ty$!\u000f\u0003+U\u0003H-\u0019;f\u001b\u0016$\u0018\rZ1uCJ+\u0017/^3ti\u00061R\u000f\u001d3bi\u0016lU\r^1eCR\f'+Z9vKN$\b%A\u0007nKR\fG-\u0019;b\u0007\u0006\u001c\u0007.Z\u000b\u0003\u0003\u000f\u0002B!!\u0013\u0002P5\u0011\u00111\n\u0006\u0004\u0003\u001bZ\u0014\u0001C7fi\u0006$\u0017\r^1\n\t\u0005E\u00131\n\u0002\u00105.lU\r^1eCR\f7)Y2iK\u0006\tR.\u001a;bI\u0006$\u0018mQ1dQ\u0016|F%Z9\u0015\t\u0005]\u0013Q\f\t\u0004\u0005\u0006e\u0013bAA.\u0007\n!QK\\5u\u0011%\tyfFA\u0001\u0002\u0004\t9%A\u0002yIE\na\"\\3uC\u0012\fG/Y\"bG\",\u0007%A\tj]&$\u0018.\u00197GKR\u001c\u0007n\u0015;bi\u0016$\u0002\"a\u001a\u0002n\u0005]\u0014\u0011\u0011\t\u0004\u0015\u0006%\u0014bAA6w\t\t\u0012J\\5uS\u0006dg)\u001a;dQN#\u0018\r^3\t\u000f\u0005=\u0014\u00041\u0001\u0002r\u00059Ao\u001c9jG&#\u0007\u0003\u0002\"\u0002t}K1!!\u001eD\u0005\u0019y\u0005\u000f^5p]\"9\u0011\u0011P\rA\u0002\u0005m\u0014a\u00034fi\u000eDwJ\u001a4tKR\u00042AQA?\u0013\r\tyh\u0011\u0002\u0005\u0019>tw\rC\u0005\u0002\u0004f\u0001\n\u00111\u0001\u0002\u0006\u0006YA.Z1eKJ,\u0005o\\2i!\r\u0011\u0015qQ\u0005\u0004\u0003\u0013\u001b%aA%oi\u0006Y\u0012N\\5uS\u0006dg)\u001a;dQN#\u0018\r^3%I\u00164\u0017-\u001e7uIM*\"!a$+\t\u0005\u0015\u0015\u0011S\u0016\u0003\u0003'\u0003B!!&\u0002 6\u0011\u0011q\u0013\u0006\u0005\u00033\u000bY*A\u0005v]\u000eDWmY6fI*\u0019\u0011QT\"\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0002\"\u0006]%!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u000691\r\\3b]V\u0004HCAA,Q\rY\u0012\u0011\u0016\t\u0005\u0003W\u000bI,\u0004\u0002\u0002.*!\u0011qVAY\u0003\r\t\u0007/\u001b\u0006\u0005\u0003g\u000b),A\u0004kkBLG/\u001a:\u000b\u0007\u0005]F+A\u0003kk:LG/\u0003\u0003\u0002<\u00065&!C!gi\u0016\u0014X)Y2i\u0003i\u0019'/Z1uKJ+\u0007\u000f\\5dC\u001a+Go\u00195feRC'/Z1e)A\t\t-a2\u0002`\u0006\r\u0018Q^Ax\u0003s\u0014\u0019\u0001E\u0002K\u0003\u0007L1!!2<\u0005Q\u0011V\r\u001d7jG\u00064U\r^2iKJ$\u0006N]3bI\"9\u0011\u0011\u001a\u000fA\u0002\u0005-\u0017\u0001\u00028b[\u0016\u0004B!!4\u0002\\:!\u0011qZAl!\r\t\tnQ\u0007\u0003\u0003'T1!!6@\u0003\u0019a$o\\8u}%\u0019\u0011\u0011\\\"\u0002\rA\u0013X\rZ3g\u0013\r!\u0018Q\u001c\u0006\u0004\u00033\u001c\u0005bBAq9\u0001\u0007\u0011QQ\u0001\nM\u0016$8\r[3s\u0013\u0012Dq!!:\u001d\u0001\u0004\t9/\u0001\u0007ce>\\WM]\"p]\u001aLw\rE\u0002K\u0003SL1!a;<\u0005-Y\u0015MZ6b\u0007>tg-[4\t\r}d\u0002\u0019AA\u0002\u0011\u001d\t\t\u0010\ba\u0001\u0003g\f!B]3qY&\u001c\u0017-T4s!\rQ\u0015Q_\u0005\u0004\u0003o\\$A\u0004*fa2L7-Y'b]\u0006<WM\u001d\u0005\b\u0003wd\u0002\u0019AA\u007f\u0003\u0015\tXo\u001c;b!\rQ\u0015q`\u0005\u0004\u0005\u0003Y$\u0001\u0004*fa2L7-Y)v_R\f\u0007b\u0002B\u00039\u0001\u0007!qA\u0001\u001bY\u0016\fG-\u001a:F]\u0012\u0004x.\u001b8u\u00052|7m[5oON+g\u000e\u001a\t\u0004\u0015\n%\u0011b\u0001B\u0006w\ta!\t\\8dW&twmU3oI\u0006A3\u000f[8vY\u0012\u001cVM\u001c3MCR,7\u000f\u001e*fcV,7\u000f\u001e,feNLwN\\:Cs\u0012+g-Y;mi\"\u001aQD!\u0005\u0011\t\u0005-&1C\u0005\u0005\u0005+\tiK\u0001\u0003UKN$\u0018A\u0010;fgR4U\r^2i\u0019\u0016\fG-\u001a:Fa>\u001c\u0007NU3rk\u0016\u001cH/\u00134MCN$X\t]8dQ\u0012+g-\u001b8fI\u001a{'oU8nKB\u000b'\u000f^5uS>t7\u000fK\u0002\u001f\u0005#\tQ#Y:tKJ$\b+\u0019:uSRLwN\\*uCR,7\u000f\u0006\u0006\u0002X\t}!\u0011\u0006B\u001a\u0005oAqA!\t \u0001\u0004\u0011\u0019#A\u0004gKR\u001c\u0007.\u001a:\u0011\u0007)\u0013)#C\u0002\u0003(m\u0012Q#\u00112tiJ\f7\r\u001e$fi\u000eDWM\u001d+ie\u0016\fG\rC\u0004\u0003,}\u0001\rA!\f\u0002+MDw.\u001e7e\u0005\u0016\u0014V-\u00193z\r>\u0014h)\u001a;dQB\u0019!Ia\f\n\u0007\tE2IA\u0004C_>dW-\u00198\t\u000f\tUr\u00041\u0001\u0003.\u0005)2\u000f[8vY\u0012\u0014U\r\u0016:v]\u000e\fG/\u001b8h\u0019><\u0007b\u0002B\u001d?\u0001\u0007!QF\u0001\u0010g\"|W\u000f\u001c3CK\u0012+G.Y=fI\u0006)3\u000f[8vY\u0012D\u0015M\u001c3mK\u0016C8-\u001a9uS>tgI]8n\u00052|7m[5oON+g\u000e\u001a\u0015\u0004A\tE\u0011aQ:i_VdGMR3uG\"dU-\u00193fe\u0016\u0003xn\u00195P]\u001aK'o\u001d;GKR\u001c\u0007n\u00148ms&3G*Z1eKJ,\u0005o\\2i\u0017:|wO\u001c+p\u0005>$\b.\u00132qeYB3!\tB\t\u0003a\u001a\bn\\;mI:{GOR3uG\"dU-\u00193fe\u0016\u0003xn\u00195P]\u001aK'o\u001d;GKR\u001c\u0007nV5uQR\u0013XO\\2bi\u0016|eNR3uG\"D3A\tB\t\u0003\t2XM]5gs\u001a+Go\u00195MK\u0006$WM]#q_\u000eDwJ\u001c$jeN$h)\u001a;dQR1\u0011q\u000bB&\u00057BqA!\u0014$\u0001\u0004\u0011y%A\u0002jEB\u0004BA!\u0015\u0003X5\u0011!1\u000b\u0006\u0004!\nU#B\u0001\u001fR\u0013\u0011\u0011IFa\u0015\u0003\u001f5+G/\u00193bi\u00064VM]:j_:D\u0011B!\u0018$!\u0003\u0005\r!!\"\u0002\u001f\u0015\u0004xn\u00195GKR\u001c\u0007nQ8v]R\fAF^3sS\u001aLh)\u001a;dQ2+\u0017\rZ3s\u000bB|7\r[(o\r&\u00148\u000f\u001e$fi\u000eDG\u0005Z3gCVdG\u000f\n\u001a\u0002iMDw.\u001e7e)J,hnY1uKR{wJ\u001a4tKR\u001c\u0006/Z2jM&,G-\u00138Fa>\u001c\u0007n\u00144gg\u0016$(+Z:q_:\u001cX\rK\u0002&\u0005#\tQj\u001d5pk2$GK];oG\u0006$X\rV8PM\u001a\u001cX\r^*qK\u000eLg-[3e\u0013:,\u0005o\\2i\u001f\u001a47/\u001a;SKN\u0004xN\\:f\u0013\u001a4u\u000e\u001c7po\u0016\u0014\b*Y:O_6{'/Z#q_\u000eD7\u000fK\u0002'\u0005#\t!j\u001d5pk2$g)\u001a;dQ2+\u0017\rZ3s\u000bB|7\r[*fG>tG\rV5nK&3G*Z1eKJ\u0014V\r\u001d7jKN<\u0016\u000e\u001e5Fa>\u001c\u0007NT8u\u0017:|wO\u001c+p\r>dGn\\<fe\"\u001aqE!\u0005\u0002\u0003NDw.\u001e7e)J,hnY1uK&3G*Z1eKJ\u0014V\r\u001d7jKN<\u0016\u000e\u001e5ESZ,'oZ5oO\u0016\u0003xn\u00195O_R\\en\\<o)>4u\u000e\u001c7po\u0016\u0014\bf\u0001\u0015\u0003\u0012\u0005iC/Z:u)J,hnY1uK>sg)\u001a;dQ\u0012{Wm\u001d(piV\u0003H-\u0019;f\u0011&<\u0007nV1uKJl\u0017M]6)\u0007%\u0012\t\"A\u000fuKN$H*Y4JgV\u0003H-\u0019;fI^CWM\u001c(p%\u0016\u001cwN\u001d3tQ\rQ#\u0011C\u00014g\"|W\u000f\u001c3Vg\u0016dU-\u00193fe\u0016sGm\u00144gg\u0016$\u0018JZ%oi\u0016\u0014(I]8lKJ4VM]:j_:\u0014U\r\\8xeAB3a\u000bB\t\u0003\u0001\u001b\bn\\;mIR\u0013XO\\2bi\u0016$v.\u00138ji&\fGNR3uG\"|eMZ:fi&3G*Z1eKJ\u0014V\r^;s]N,f\u000eZ3gS:,Gm\u00144gg\u0016$\bf\u0001\u0017\u0003\u0012\u0005\t4\u000f[8vY\u0012\u0004v\u000e\u001c7J]\u0012,g-\u001b8ji\u0016d\u00170\u00134MK\u0006$WM\u001d*fiV\u0014hn]!os\u0016C8-\u001a9uS>t\u0007fA\u0017\u0003\u0012\u0005Y3\u000f[8vY\u0012luN^3QCJ$\u0018\u000e^5p]N|U\u000f^(g)J,hnY1uS:<Gj\\4Ti\u0006$X\rK\u0002/\u0005#\t\u0001h\u001d5pk2$g)\u001b7uKJ\u0004\u0016M\u001d;ji&|gn]'bI\u0016dU-\u00193fe\u0012+(/\u001b8h\u0019\u0016\fG-\u001a:Fa>\u001c\u0007NU3rk\u0016\u001cH\u000fK\u00020\u0005#\t\u0001j\u001d5pk2$7)\u0019;dQ\u0016C8-\u001a9uS>tgI]8n\u00052|7m[5oON+g\u000eZ,iK:\u001c\u0006.\u001e;uS:<Gi\\<o%\u0016\u0004H.[2b\r\u0016$8\r[3s)\"\u0014X-\u00193)\u0007A\u0012\t\"\u0001\u0014tQ>,H\u000eZ+qI\u0006$XMU3bgNLwM\\7f]R\u0014\u0015\u0010^3t\u0013:lU\r\u001e:jGND3!\rB\t\u0003\u0019\u001b\bn\\;mI:{G/\u00169eCR,'+Z1tg&<g.\\3oi\nKH/Z:J]6+GO]5dg^CWM\u001c(p%\u0016\f7o]5h]6,g\u000e^:J]B\u0013xn\u001a:fgND3A\rB\t\u00039!Xm\u001d;Ck&dGMR3uG\"D3a\rB\t\u00039\"Xm\u001d;M_\u000e\fGNR3uG\"\u001cu.\u001c9mKRLwN\\%g\u0011&<\u0007nV1uKJl\u0017M]6Va\u0012\fG/\u001a3\u0015\t\u0005]#\u0011\u0015\u0005\b\u0005G#\u0004\u0019\u0001B\u0017\u0003QA\u0017n\u001a5XCR,'/\\1sWV\u0003H-\u0019;fI\":AGa*\u00038\ne\u0006\u0003\u0002BU\u0005gk!Aa+\u000b\t\t5&qV\u0001\taJ|g/\u001b3fe*!!\u0011WAY\u0003\u0019\u0001\u0018M]1ng&!!Q\u0017BV\u0005-1\u0016\r\\;f'>,(oY3\u0002\u0011\t|w\u000e\\3b]NdCAa/\u0003>f\t\u0011!G\u0001\u0001Q\r!$\u0011\u0019\t\u0005\u0005\u0007\u0014)-\u0004\u0002\u00030&!!q\u0019BX\u0005E\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f^\u0001\"]\u0016<xJ\u001a4tKR4uN\u001d'fC\u0012,'\u000fU1si&$\u0018n\u001c8SKN,H\u000e\u001e\u000b\t\u0005\u001b\u0014yOa=\u0003vB!!q\u001aBu\u001d\u0011\u0011\tN!:\u000f\t\tM'1\u001d\b\u0005\u0005+\u0014\tO\u0004\u0003\u0003X\n}g\u0002\u0002Bm\u0005;tA!!5\u0003\\&\tQ+\u0003\u0002T)&\u0011aHU\u0005\u0003!FK1!a\tP\u0013\u0011\u00119/!\t\u0002A=3gm]3u\r>\u0014H*Z1eKJ,\u0005o\\2i%\u0016\u001c\bo\u001c8tK\u0012\u000bG/Y\u0005\u0005\u0005W\u0014iO\u0001\bFa>\u001c\u0007.\u00128e\u001f\u001a47/\u001a;\u000b\t\t\u001d\u0018\u0011\u0005\u0005\u0007\u0005c,\u0004\u0019A'\u0002\u0005Q\u0004\bbBABk\u0001\u0007\u0011Q\u0011\u0005\b\u0005o,\u0004\u0019AA>\u0003%)g\u000eZ(gMN,G\u000f\u0006\u0006\u0003N\nm(Q`B\u0007\u0007\u001fAaA!=7\u0001\u0004i\u0005b\u0002B��m\u0001\u00071\u0011A\u0001\u0006KJ\u0014xN\u001d\t\u0005\u0007\u0007\u0019I!\u0004\u0002\u0004\u0006)\u00191qA(\u0002\u0011A\u0014x\u000e^8d_2LAaa\u0003\u0004\u0006\t1QI\u001d:peNDq!a!7\u0001\u0004\t)\tC\u0004\u0003xZ\u0002\r!a\u001f\u0002=\u0005\u001c8/\u001a:u!J|7-Z:t!\u0006\u0014H/\u001b;j_:$\u0015\r^1XQ\u0016tG\u0003BA,\u0007+Aqaa\u00068\u0001\u0004\u0011i#A\u0007jgJ+\u0017m]:jO:LgnZ\u0001\u0005gR,(\r\u0006\u0005\u0002X\ru1qEB\u0016\u0011\u001d\u0019y\u0002\u000fa\u0001\u0007C\t\u0011\u0002]1si&$\u0018n\u001c8\u0011\u0007e\u001c\u0019#C\u0002\u0004&i\u0014\u0011\u0002U1si&$\u0018n\u001c8\t\u000f\r%\u0002\b1\u0001\u0002t\u0006q!/\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\bbBB\u0017q\u0001\u00071qF\u0001\u0004Y><\u0007\u0003BB\u0019\u0007ki!aa\r\u000b\u0007\r5R(\u0003\u0003\u00048\rM\"AC+oS\u001aLW\r\u001a'pO\u0006a2.\u00194lC\u000e{gNZ5h\u001d>$&/\u001e8dCR,wJ\u001c$fi\u000eDWCAAt\u0001")
/* loaded from: input_file:kafka/server/ReplicaFetcherThreadTest.class */
public class ReplicaFetcherThreadTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final Uuid topicId1 = Uuid.randomUuid();
    private final Uuid topicId2 = Uuid.randomUuid();
    private final Map<String, Uuid> topicIds = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic1"), topicId1()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic2"), topicId2())}));
    private final BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
    private final FailedPartitions kafka$server$ReplicaFetcherThreadTest$$failedPartitions = new FailedPartitions();
    private final List<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionStates = (List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new UpdateMetadataRequestData.UpdateMetadataPartitionState().setTopicName("topic1").setPartitionIndex(0).setControllerEpoch(0).setLeader(0).setLeaderEpoch(0), new $colon.colon(new UpdateMetadataRequestData.UpdateMetadataPartitionState().setTopicName("topic2").setPartitionIndex(0).setControllerEpoch(0).setLeader(0).setLeaderEpoch(0), Nil$.MODULE$))).asJava();
    private final UpdateMetadataRequest updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(), 0, 0, 0, partitionStates(), Collections.emptyList(), (java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(topicIds()).asJava()).build();
    private ZkMetadataCache metadataCache = new ZkMetadataCache(0, MetadataVersion.latest(), BrokerFeatures$.MODULE$.createEmpty(), ZkMetadataCache$.MODULE$.$lessinit$greater$default$4());

    private TopicPartition t1p0() {
        return this.t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private TopicPartition t2p1() {
        return this.t2p1;
    }

    private Uuid topicId1() {
        return this.topicId1;
    }

    private Uuid topicId2() {
        return this.topicId2;
    }

    private Map<String, Uuid> topicIds() {
        return this.topicIds;
    }

    private BrokerEndPoint brokerEndPoint() {
        return this.brokerEndPoint;
    }

    public FailedPartitions kafka$server$ReplicaFetcherThreadTest$$failedPartitions() {
        return this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions;
    }

    private List<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionStates() {
        return this.partitionStates;
    }

    private UpdateMetadataRequest updateMetadataRequest() {
        return this.updateMetadataRequest;
    }

    private ZkMetadataCache metadataCache() {
        return this.metadataCache;
    }

    private void metadataCache_$eq(ZkMetadataCache zkMetadataCache) {
        this.metadataCache = zkMetadataCache;
    }

    private InitialFetchState initialFetchState(Option<Uuid> option, long j, int i) {
        return new InitialFetchState(option, new BrokerEndPoint(0, "localhost", 9092), i, j);
    }

    private int initialFetchState$default$3() {
        return 1;
    }

    @AfterEach
    public void cleanup() {
        TestUtils$.MODULE$.clearYammerMetrics();
    }

    private ReplicaFetcherThread createReplicaFetcherThread(String str, int i, KafkaConfig kafkaConfig, FailedPartitions failedPartitions, ReplicaManager replicaManager, ReplicaQuota replicaQuota, BlockingSend blockingSend) {
        LogContext logContext = new LogContext(new StringBuilder(51).append("[ReplicaFetcher replicaId=").append(kafkaConfig.brokerId()).append(", leaderId=").append(blockingSend.brokerEndPoint().id()).append(", fetcherId=").append(i).append("] ").toString());
        return new ReplicaFetcherThread(str, new RemoteLeaderEndPoint(logContext.logPrefix(), blockingSend, new FetchSessionHandler(logContext, blockingSend.brokerEndPoint().id()), kafkaConfig, replicaManager, replicaQuota, () -> {
            return kafkaConfig.interBrokerProtocolVersion();
        }, () -> {
            return 1L;
        }), kafkaConfig, failedPartitions, replicaManager, replicaQuota, logContext.logPrefix(), () -> {
            return kafkaConfig.interBrokerProtocolVersion();
        });
    }

    @Test
    public void shouldSendLatestRequestVersionsByDefault() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        Mockito.when(((ReplicaManager) Mockito.mock(ReplicaManager.class)).brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Assertions.assertEquals(ApiKeys.FETCH.latestVersion(), fromProps.interBrokerProtocolVersion().fetchRequestVersion());
        Assertions.assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), fromProps.interBrokerProtocolVersion().offsetForLeaderEpochRequestVersion());
        Assertions.assertEquals(ApiKeys.LIST_OFFSETS.latestVersion(), fromProps.interBrokerProtocolVersion().listOffsetRequestVersion());
    }

    @Test
    public void testFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions() {
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicaQuota replicaQuota = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5))).thenReturn(new Some(BoxesRunTime.boxToInteger(5))).thenReturn(None$.MODULE$);
        Mockito.when(unifiedLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(0L, 5)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, unifiedLog);
        MockBlockingSender mockBlockingSender = new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), newOffsetForLeaderPartitionResult(t1p0(), 5, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 1L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, kafkaConfigNoTruncateOnFetch, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, replicaQuota, mockBlockingSender);
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some<>(topicId2()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(new Some<>(topicId2()), 0L, initialFetchState$default$3()))})));
        assertPartitionStates(createReplicaFetcherThread, false, true, false);
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        assertPartitionStates(createReplicaFetcherThread, true, false, false);
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(2, mockBlockingSender.fetchCount());
        assertPartitionStates(createReplicaFetcherThread, true, false, false);
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(3, mockBlockingSender.fetchCount());
        assertPartitionStates(createReplicaFetcherThread, true, false, false);
        ((Partition) Mockito.verify(partition, Mockito.times(3))).truncateTo(ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
    }

    public void assertPartitionStates(AbstractFetcherThread abstractFetcherThread, boolean z, boolean z2, boolean z3) {
        new $colon.colon(t1p0(), new $colon.colon(t1p1(), new $colon.colon(t2p1(), Nil$.MODULE$))).foreach(topicPartition -> {
            $anonfun$assertPartitionStates$1(abstractFetcherThread, z, z2, z3, topicPartition);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void shouldHandleExceptionFromBlockingSend() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        Mockito.when(blockingSend.sendRequest((AbstractRequest.Builder) ArgumentMatchers.any())).thenThrow(new Throwable[]{new NullPointerException()});
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Assertions.assertEquals(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), newOffsetForLeaderPartitionResult(t1p0(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))})), createReplicaFetcherThread("bob", 0, fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, null, blockingSend).leader().fetchEpochEndOffsets(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p0().partition()).setLeaderEpoch(0)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p1().partition()).setLeaderEpoch(0))}))), "results from leader epoch request should have undefined offset");
        ((BlockingSend) Mockito.verify(blockingSend)).sendRequest((AbstractRequest.Builder) ArgumentMatchers.any());
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnlyIfLeaderEpochKnownToBothIbp26() {
        verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.IBP_2_6_IV0, verifyFetchLeaderEpochOnFirstFetch$default$2());
    }

    @Test
    public void shouldNotFetchLeaderEpochOnFirstFetchWithTruncateOnFetch() {
        verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.latest(), 0);
    }

    private void verifyFetchLeaderEpochOnFirstFetch(MetadataVersion metadataVersion, int i) {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21());
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), metadataVersion.version());
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        metadataCache_$eq(new ZkMetadataCache(0, metadataVersion, BrokerFeatures$.MODULE$.createEmpty(), ZkMetadataCache$.MODULE$.$lessinit$greater$default$4()));
        metadataCache().updateMetadata(0, updateMetadataRequest());
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(unifiedLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(0L, 5)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, unifiedLog);
        MockBlockingSender mockBlockingSender = new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), newOffsetForLeaderPartitionResult(t1p0(), 5, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 1L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, QuotaFactory$UnboundedQuota$.MODULE$, mockBlockingSender);
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(i, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(i, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(2, mockBlockingSender.fetchCount());
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(i, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(3, mockBlockingSender.fetchCount());
    }

    private int verifyFetchLeaderEpochOnFirstFetch$default$2() {
        return 1;
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponse() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 1));
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(unifiedLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(200, 5)));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(200));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, unifiedLog);
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, kafkaConfigNoTruncateOnFetch, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, replicationQuotaManager, new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), newOffsetForLeaderPartitionResult(t1p0(), 5, 156L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), newOffsetForLeaderPartitionResult(t2p1(), 5, 172L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(new Some(topicId2()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(156)), new StringBuilder(58).append("Expected ").append(t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(172)), new StringBuilder(58).append("Expected ").append(t2p1()).append(" to truncate to offset 172 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponseIfFollowerHasNoMoreEpochs() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 3));
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(unifiedLog.endOffsetForEpoch(4)).thenReturn(None$.MODULE$);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(200));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, unifiedLog);
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, kafkaConfigNoTruncateOnFetch, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, replicationQuotaManager, new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), newOffsetForLeaderPartitionResult(t1p0(), 4, 156L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), newOffsetForLeaderPartitionResult(t2p1(), 4, 202L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), initialFetchState(new Some(topicId2()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(156)), new StringBuilder(58).append("Expected ").append(t1p0()).append(" to truncate to offset 156 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(200)), new StringBuilder(55).append("Expected ").append(t2p1()).append(" to truncate to offset ").append(200).append(" (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicaQuota replicaQuota = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 2));
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(unifiedLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(unifiedLog.endOffsetForEpoch(3)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(200));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, unifiedLog);
        MockBlockingSender mockBlockingSender = new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), newOffsetForLeaderPartitionResult(t1p0(), 4, 155L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 4, 143L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, kafkaConfigNoTruncateOnFetch, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, replicaQuota, mockBlockingSender);
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(0, mockBlockingSender.fetchCount());
        mockBlockingSender.setOffsetsForNextResponse((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), newOffsetForLeaderPartitionResult(t1p0(), 3, 101L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 3, 102L))}))).asJava());
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(2, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        Assertions.assertTrue(mockBlockingSender.lastUsedOffsetForLeaderEpochVersion() >= 3, "OffsetsForLeaderEpochRequest version.");
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(2, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(2, mockBlockingSender.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(4))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(102)), new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 102 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(101)), new StringBuilder(58).append("Expected ").append(t1p0()).append(" to truncate to offset 101 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        final KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        final ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        final ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ObjectRef create = ObjectRef.create(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(115L));
        Mockito.when(unifiedLog.latestEpoch()).thenAnswer(invocationOnMock -> {
            return (Option) create.elem;
        });
        Mockito.when(unifiedLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(149L, 4)));
        Mockito.when(unifiedLog.endOffsetForEpoch(3)).thenReturn(new Some(new OffsetAndEpoch(129L, 2)));
        Mockito.when(unifiedLog.endOffsetForEpoch(2)).thenReturn(new Some(new OffsetAndEpoch(119L, 1)));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(200));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, unifiedLog);
        MockBlockingSender mockBlockingSender = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint(), new SystemTime());
        final LogContext logContext = new LogContext(new StringBuilder(52).append("[ReplicaFetcher replicaId=").append(fromProps.brokerId()).append(", leaderId=").append(brokerEndPoint().id()).append(", fetcherId=0] ").toString());
        final RemoteLeaderEndPoint remoteLeaderEndPoint = new RemoteLeaderEndPoint(logContext.logPrefix(), mockBlockingSender, new FetchSessionHandler(logContext, brokerEndPoint().id()), fromProps, replicaManager, replicationQuotaManager, () -> {
            return fromProps.interBrokerProtocolVersion();
        }, () -> {
            return 1L;
        });
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread(this, remoteLeaderEndPoint, fromProps, replicaManager, replicationQuotaManager, logContext) { // from class: kafka.server.ReplicaFetcherThreadTest$$anon$1
            public Option<LogAppendInfo> processPartitionData(TopicPartition topicPartition, long j, FetchResponseData.PartitionData partitionData) {
                return None$.MODULE$;
            }

            {
                FailedPartitions kafka$server$ReplicaFetcherThreadTest$$failedPartitions = this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions();
                String logPrefix = logContext.logPrefix();
                ReplicaFetcherThreadTest$$anon$1$$anonfun$$lessinit$greater$1 replicaFetcherThreadTest$$anon$1$$anonfun$$lessinit$greater$1 = new ReplicaFetcherThreadTest$$anon$1$$anonfun$$lessinit$greater$1(null, fromProps);
            }
        };
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some(topicId1()), 200, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 200, initialFetchState$default$3()))})));
        Set apply = Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{t1p0(), t1p1()}));
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        apply.foreach(topicPartition -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$4(replicaFetcherThread, topicPartition);
            return BoxedUnit.UNIT;
        });
        mockBlockingSender.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), partitionData$1(t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(140L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(141L)))})));
        mockBlockingSender.setIdsForNextResponse(topicIds());
        create.elem = new Some(BoxesRunTime.boxToInteger(4));
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(2, mockBlockingSender.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(140)), new StringBuilder(58).append("Expected ").append(t1p0()).append(" to truncate to offset 140 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(141)), new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 141 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        apply.foreach(topicPartition2 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(replicaFetcherThread, topicPartition2);
            return BoxedUnit.UNIT;
        });
        mockBlockingSender.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), partitionData$1(t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(130L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(131L)))})));
        mockBlockingSender.setIdsForNextResponse(topicIds());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(3, mockBlockingSender.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(4))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(129)), new StringBuilder(57).append("Expected to truncate to offset 129 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        apply.foreach(topicPartition3 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$6(replicaFetcherThread, topicPartition3);
            return BoxedUnit.UNIT;
        });
        mockBlockingSender.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), partitionData$1(t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(120L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(121L)))})));
        mockBlockingSender.setIdsForNextResponse(topicIds());
        create.elem = None$.MODULE$;
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(4, mockBlockingSender.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(6))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(119)), new StringBuilder(57).append("Expected to truncate to offset 119 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        apply.foreach(topicPartition4 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$7(replicaFetcherThread, topicPartition4);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void testTruncateOnFetchDoesNotUpdateHighWatermark() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(130));
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(unifiedLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(149L, 4)));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(150));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.localLogOrException(t1p0())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(partition.appendRecordsToFollowerOrFutureReplica((MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToBoolean(ArgumentMatchers.any()))).thenReturn(None$.MODULE$);
        LogContext logContext = new LogContext(new StringBuilder(52).append("[ReplicaFetcher replicaId=").append(fromProps.brokerId()).append(", leaderId=").append(brokerEndPoint().id()).append(", fetcherId=0] ").toString());
        MockBlockingSender mockBlockingSender = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("fetcher-thread", new RemoteLeaderEndPoint(logContext.logPrefix(), mockBlockingSender, new FetchSessionHandler(logContext, brokerEndPoint().id()), fromProps, replicaManager, replicationQuotaManager, () -> {
            return fromProps.interBrokerProtocolVersion();
        }, () -> {
            return 1L;
        }), fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, replicationQuotaManager, logContext.logPrefix(), () -> {
            return fromProps.interBrokerProtocolVersion();
        });
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some(topicId1()), 150, initialFetchState$default$3()))})));
        mockBlockingSender.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new FetchResponseData.PartitionData().setPartitionIndex(t1p0().partition()).setLastStableOffset(0L).setLogStartOffset(0L).setHighWatermark(160L).setDivergingEpoch(new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(140L)))})));
        mockBlockingSender.setIdsForNextResponse(topicIds());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(1))).truncateTo(140L, false);
        ((UnifiedLog) Mockito.verify(unifiedLog, Mockito.times(0))).maybeUpdateHighWatermark(ArgumentMatchers.anyLong());
    }

    @Test
    public void testLagIsUpdatedWhenNoRecords() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(0)));
        Mockito.when(unifiedLog.endOffsetForEpoch(0)).thenReturn(new Some(new OffsetAndEpoch(0L, 0)));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(unifiedLog.maybeUpdateHighWatermark(0L)).thenReturn(None$.MODULE$);
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.localLogOrException(t1p0())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(partition.appendRecordsToFollowerOrFutureReplica((MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToBoolean(ArgumentMatchers.any()))).thenReturn(new Some(new LogAppendInfo(Optional.empty(), 0L, OptionalInt.empty(), -1L, -1L, -1L, -1L, RecordConversionStats.EMPTY, CompressionType.NONE, CompressionType.NONE, -1, 0, false, -1L)));
        LogContext logContext = new LogContext(new StringBuilder(52).append("[ReplicaFetcher replicaId=").append(fromProps.brokerId()).append(", leaderId=").append(brokerEndPoint().id()).append(", fetcherId=0] ").toString());
        MockBlockingSender mockBlockingSender = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("fetcher-thread", new RemoteLeaderEndPoint(logContext.logPrefix(), mockBlockingSender, new FetchSessionHandler(logContext, brokerEndPoint().id()), fromProps, replicaManager, replicationQuotaManager, () -> {
            return fromProps.interBrokerProtocolVersion();
        }, () -> {
            return 1L;
        }), fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, replicationQuotaManager, logContext.logPrefix(), () -> {
            return fromProps.interBrokerProtocolVersion();
        });
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3()))})));
        Assertions.assertEquals(None$.MODULE$, replicaFetcherThread.fetchState(t1p0()).flatMap(partitionFetchState -> {
            return partitionFetchState.lag();
        }));
        mockBlockingSender.setFetchPartitionDataForNextResponse((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new FetchResponseData.PartitionData().setPartitionIndex(t1p0().partition()).setLastStableOffset(0L).setLogStartOffset(0L).setHighWatermark(0L).setRecords(MemoryRecords.EMPTY))})));
        mockBlockingSender.setIdsForNextResponse(topicIds());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        Assertions.assertEquals(new Some(BoxesRunTime.boxToInteger(0)), replicaFetcherThread.fetchState(t1p0()).flatMap(partitionFetchState2 -> {
            return partitionFetchState2.lag();
        }));
    }

    @Test
    public void shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21());
        createBrokerConfig.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.11.0");
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
        ReplicaQuota replicaQuota = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(200 - 2));
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(unifiedLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(unifiedLog.endOffsetForEpoch(3)).thenReturn(new Some(new OffsetAndEpoch(120L, 3)));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(200));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, unifiedLog);
        MockBlockingSender mockBlockingSender = new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), newOffsetForLeaderPartitionResult(t1p0(), -1, 155L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), -1, 143L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, replicaQuota, mockBlockingSender);
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        Assertions.assertEquals(0, mockBlockingSender.lastUsedOffsetForLeaderEpochVersion(), "OffsetsForLeaderEpochRequest version.");
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(2, mockBlockingSender.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(155)), new StringBuilder(58).append("Expected ").append(t1p0()).append(" to truncate to offset 155 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(forClass.getAllValues()).asScala()).contains(BoxesRunTime.boxToInteger(143)), new StringBuilder(58).append("Expected ").append(t1p1()).append(" to truncate to offset 143 (truncation offsets: ").append(forClass.getAllValues()).append(")").toString());
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfLeaderReturnsUndefinedOffset() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(100));
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, unifiedLog);
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, kafkaConfigNoTruncateOnFetch, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, replicationQuotaManager, new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), newOffsetForLeaderPartitionResult(t1p0(), -1, -1L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some(topicId1()), 100, initialFetchState$default$3()))})));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition)).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(100, BoxesRunTime.unboxToLong(forClass.getValue()));
    }

    @Test
    public void shouldPollIndefinitelyIfLeaderReturnsAnyException() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(100));
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(unifiedLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(300, 5)));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(300));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, unifiedLog);
        java.util.Map map = (java.util.Map) CollectionConverters$.MODULE$.mutableMapAsJavaMapConverter(scala.collection.mutable.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), newOffsetForLeaderPartitionResult(t1p0(), Errors.NOT_LEADER_OR_FOLLOWER, -1, -1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))}))).asJava();
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, kafkaConfigNoTruncateOnFetch, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, replicationQuotaManager, new MockBlockingSender(map, brokerEndPoint(), new SystemTime()));
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3()))})));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(i -> {
            createReplicaFetcherThread.doWork();
        });
        ((Partition) Mockito.verify(partition, Mockito.never())).truncateTo(ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
        map.put(t1p0(), newOffsetForLeaderPartitionResult(t1p0(), 5, 156L));
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition)).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(156L, BoxesRunTime.unboxToLong(forClass.getValue()));
    }

    @Test
    public void shouldMovePartitionsOutOfTruncatingLogState() {
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(4)));
        Mockito.when(unifiedLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(0L, 4)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        stub(partition, replicaManager, unifiedLog);
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, kafkaConfigNoTruncateOnFetch, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, replicationQuotaManager, new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), newOffsetForLeaderPartitionResult(t1p0(), 4, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 4, 1L))}))).asJava(), brokerEndPoint(), new SystemTime()));
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 0L, initialFetchState$default$3()))})));
        Assertions.assertEquals(Option$.MODULE$.apply(Truncating$.MODULE$), createReplicaFetcherThread.fetchState(t1p0()).map(partitionFetchState -> {
            return partitionFetchState.state();
        }));
        Assertions.assertEquals(Option$.MODULE$.apply(Truncating$.MODULE$), createReplicaFetcherThread.fetchState(t1p1()).map(partitionFetchState2 -> {
            return partitionFetchState2.state();
        }));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(Option$.MODULE$.apply(Fetching$.MODULE$), createReplicaFetcherThread.fetchState(t1p0()).map(partitionFetchState3 -> {
            return partitionFetchState3.state();
        }));
        Assertions.assertEquals(Option$.MODULE$.apply(Fetching$.MODULE$), createReplicaFetcherThread.fetchState(t1p1()).map(partitionFetchState4 -> {
            return partitionFetchState4.state();
        }));
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(0L, false);
    }

    @Test
    public void shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest() {
        KafkaConfig kafkaConfigNoTruncateOnFetch = kafkaConfigNoTruncateOnFetch();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        ReplicaQuota replicaQuota = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(100 - 2));
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(unifiedLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(100, 5)));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(100));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        stub(partition, replicaManager, unifiedLog);
        MockBlockingSender mockBlockingSender = new MockBlockingSender((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), newOffsetForLeaderPartitionResult(t1p0(), 5, 52L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 49L))}))).asJava(), brokerEndPoint(), new SystemTime());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, kafkaConfigNoTruncateOnFetch, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, replicaQuota, mockBlockingSender);
        createReplicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some<>(topicId1()), 0L, initialFetchState$default$3()))})));
        TopicPartition t1p0 = t1p0();
        mockBlockingSender.setEpochRequestCallback(() -> {
            createReplicaFetcherThread.removePartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{t1p0})));
        });
        createReplicaFetcherThread.doWork();
        ((Partition) Mockito.verify(partition)).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(49L, BoxesRunTime.unboxToLong(forClass.getValue()));
    }

    @Test
    public void shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        blockingSend.initiateClose();
        Mockito.when(BoxedUnit.UNIT).thenThrow(new Throwable[]{new IllegalArgumentException()});
        blockingSend.close();
        Mockito.when(BoxedUnit.UNIT).thenThrow(new Throwable[]{new IllegalStateException()});
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, null, blockingSend);
        createReplicaFetcherThread.start();
        createReplicaFetcherThread.initiateShutdown();
        createReplicaFetcherThread.awaitShutdown();
        ((BlockingSend) Mockito.verify(blockingSend)).initiateClose();
        ((BlockingSend) Mockito.verify(blockingSend)).close();
    }

    @Test
    public void shouldUpdateReassignmentBytesInMetrics() {
        assertProcessPartitionDataWhen(true);
    }

    @Test
    public void shouldNotUpdateReassignmentBytesInMetricsWhenNoReassignmentsInProgress() {
        assertProcessPartitionDataWhen(false);
    }

    @Test
    public void testBuildFetch() {
        TopicIdPartition topicIdPartition = new TopicIdPartition(topicId1(), t1p0());
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(topicId1(), t1p1());
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(topicId2(), t2p1());
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        ReplicaQuota replicaQuota = (ReplicaQuota) Mockito.mock(ReplicaQuota.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaQuota.isThrottled((TopicPartition) ArgumentMatchers.any()))).thenReturn(BoxesRunTime.boxToBoolean(false));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logStartOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        LogContext logContext = new LogContext(new StringBuilder(52).append("[ReplicaFetcher replicaId=").append(fromProps.brokerId()).append(", leaderId=").append(brokerEndPoint().id()).append(", fetcherId=0] ").toString());
        RemoteLeaderEndPoint remoteLeaderEndPoint = new RemoteLeaderEndPoint(logContext.logPrefix(), blockingSend, new FetchSessionHandler(logContext, brokerEndPoint().id()), fromProps, replicaManager, replicaQuota, () -> {
            return fromProps.interBrokerProtocolVersion();
        }, () -> {
            return 1L;
        });
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", remoteLeaderEndPoint, fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, replicaQuota, logContext.logPrefix(), () -> {
            return fromProps.interBrokerProtocolVersion();
        });
        Map apply = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new PartitionFetchState(new Some(topicId1()), 150L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new PartitionFetchState(new Some(topicId1()), 155L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), new PartitionFetchState(new Some(topicId2()), 160L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$))}));
        AbstractFetcherThread.ResultWithPartitions buildFetch = replicaFetcherThread.leader().buildFetch(apply);
        if (buildFetch == null) {
            throw new MatchError((Object) null);
        }
        Option option = (Option) buildFetch.result();
        Assertions.assertTrue(option.isDefined());
        FetchRequest.Builder fetchRequest = ((AbstractFetcherThread.ReplicaFetch) option.get()).fetchRequest();
        Assertions.assertEquals(CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map) apply.map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            TopicPartition topicPartition = (TopicPartition) tuple2._1();
            PartitionFetchState partitionFetchState = (PartitionFetchState) tuple2._2();
            return new Tuple2(topicPartition, new FetchRequest.PartitionData((Uuid) partitionFetchState.topicId().get(), partitionFetchState.fetchOffset(), 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(partitionFetchState.currentLeaderEpoch())), Optional.empty()));
        }, Map$.MODULE$.canBuildFrom())).asJava(), fetchRequest.fetchData());
        Assertions.assertEquals(0, fetchRequest.replaced().size());
        Assertions.assertEquals(0, fetchRequest.removed().size());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new FetchResponseData.PartitionData());
        linkedHashMap.put(topicIdPartition2, new FetchResponseData.PartitionData());
        linkedHashMap.put(topicIdPartition3, new FetchResponseData.PartitionData());
        remoteLeaderEndPoint.fetchSessionHandler().handleResponse(FetchResponse.of(Errors.NONE, 0, 123, linkedHashMap), ApiKeys.FETCH.latestVersion());
        Map apply2 = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new PartitionFetchState(new Some(topicId1()), 155L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), new PartitionFetchState(new Some(Uuid.randomUuid()), 160L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$))}));
        AbstractFetcherThread.ResultWithPartitions buildFetch2 = replicaFetcherThread.leader().buildFetch(apply2);
        if (buildFetch2 == null) {
            throw new MatchError((Object) null);
        }
        Option option2 = (Option) buildFetch2.result();
        Map map = (Map) ((TraversableLike) apply2.drop(1)).map(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError((Object) null);
            }
            TopicPartition topicPartition = (TopicPartition) tuple22._1();
            PartitionFetchState partitionFetchState = (PartitionFetchState) tuple22._2();
            return new Tuple2(topicPartition, new FetchRequest.PartitionData((Uuid) partitionFetchState.topicId().get(), partitionFetchState.fetchOffset(), 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(partitionFetchState.currentLeaderEpoch())), Optional.empty()));
        }, Map$.MODULE$.canBuildFrom());
        Assertions.assertTrue(option2.isDefined());
        FetchRequest.Builder fetchRequest2 = ((AbstractFetcherThread.ReplicaFetch) option2.get()).fetchRequest();
        Assertions.assertEquals(CollectionConverters$.MODULE$.mapAsJavaMapConverter(map).asJava(), fetchRequest2.fetchData());
        Assertions.assertEquals(Collections.singletonList(topicIdPartition3), fetchRequest2.replaced());
        Assertions.assertEquals(Collections.singletonList(topicIdPartition), fetchRequest2.removed());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLocalFetchCompletionIfHighWatermarkUpdated(boolean z) {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        Some some = z ? new Some(BoxesRunTime.boxToLong(100L)) : None$.MODULE$;
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Mockito.when(unifiedLog.maybeUpdateHighWatermark(100L)).thenReturn(some);
        Some some2 = new Some(Mockito.mock(LogAppendInfo.class));
        Partition partition = (Partition) Mockito.mock(Partition.class);
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(partition.appendRecordsToFollowerOrFutureReplica((MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToBoolean(ArgumentMatchers.any()))).thenReturn(some2);
        Buffer empty = Buffer$.MODULE$.empty();
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.getPartitionOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(partition);
        replicaManager.completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
            return empty.$plus$plus$eq((Seq) invocationOnMock.getArguments()[0]);
        });
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(new BrokerTopicStats(BrokerTopicStats$.MODULE$.$lessinit$greater$default$1()));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("replica-fetcher", 0, fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, (ReplicaQuota) Mockito.mock(ReplicaQuota.class), blockingSend);
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        TopicPartition topicPartition2 = new TopicPartition("testTopic", 1);
        FetchResponseData.PartitionData highWatermark = new FetchResponseData.PartitionData().setRecords(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))})).setHighWatermark(100L);
        createReplicaFetcherThread.processPartitionData(topicPartition, 0L, highWatermark.setPartitionIndex(0));
        createReplicaFetcherThread.processPartitionData(topicPartition2, 0L, highWatermark.setPartitionIndex(1));
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(0))).completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        createReplicaFetcherThread.doWork();
        if (z) {
            Assertions.assertEquals(new $colon.colon(topicPartition, new $colon.colon(topicPartition2, Nil$.MODULE$)), empty);
            ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(1))).completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        } else {
            ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(0))).completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        }
        Assertions.assertEquals(Buffer$.MODULE$.empty(), createReplicaFetcherThread.partitionsWithNewHighWatermark());
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition topicPartition, int i, long j) {
        return newOffsetForLeaderPartitionResult(topicPartition, Errors.NONE, i, j);
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition topicPartition, Errors errors, int i, long j) {
        return new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(topicPartition.partition()).setErrorCode(errors.code()).setLeaderEpoch(i).setEndOffset(j);
    }

    private void assertProcessPartitionDataWhen(boolean z) {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21()));
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        MemoryRecords withRecords = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))});
        Mockito.when(unifiedLog.maybeUpdateHighWatermark(0L)).thenReturn(None$.MODULE$);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToBoolean(partition.isReassigning())).thenReturn(BoxesRunTime.boxToBoolean(z));
        Mockito.when(BoxesRunTime.boxToBoolean(partition.isAddingLocalReplica())).thenReturn(BoxesRunTime.boxToBoolean(z));
        Mockito.when(partition.appendRecordsToFollowerOrFutureReplica(withRecords, false)).thenReturn(None$.MODULE$);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.getPartitionOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(partition);
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats(BrokerTopicStats$.MODULE$.$lessinit$greater$default$1());
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(brokerTopicStats);
        createReplicaFetcherThread("bob", 0, fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, (ReplicaQuota) Mockito.mock(ReplicaQuota.class), blockingSend).processPartitionData(t1p0(), 0L, new FetchResponseData.PartitionData().setPartitionIndex(t1p0().partition()).setLastStableOffset(0L).setLogStartOffset(0L).setRecords(withRecords));
        if (z) {
            Assertions.assertEquals(withRecords.sizeInBytes(), ((Meter) brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        } else {
            Assertions.assertEquals(0L, ((Meter) brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        }
        Assertions.assertEquals(withRecords.sizeInBytes(), ((Meter) brokerTopicStats.allTopicsStats().replicationBytesInRate().get()).count());
    }

    public void stub(Partition partition, ReplicaManager replicaManager, UnifiedLog unifiedLog) {
        Mockito.when(replicaManager.localLogOrException(t1p0())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(replicaManager.localLogOrException(t1p1())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.getPartitionOrException(t1p1())).thenReturn(partition);
        Mockito.when(replicaManager.localLogOrException(t2p1())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.getPartitionOrException(t2p1())).thenReturn(partition);
    }

    private KafkaConfig kafkaConfigNoTruncateOnFetch() {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21());
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), MetadataVersion.IBP_2_6_IV0.version());
        return KafkaConfig$.MODULE$.fromProps(createBrokerConfig);
    }

    public static final /* synthetic */ void $anonfun$assertPartitionStates$1(AbstractFetcherThread abstractFetcherThread, boolean z, boolean z2, boolean z3, TopicPartition topicPartition) {
        Assertions.assertTrue(abstractFetcherThread.fetchState(topicPartition).isDefined());
        PartitionFetchState partitionFetchState = (PartitionFetchState) abstractFetcherThread.fetchState(topicPartition).get();
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z), BoxesRunTime.boxToBoolean(partitionFetchState.isReadyForFetch()), new StringBuilder(39).append("Partition ").append(topicPartition).append(" should").append((Object) (!z ? " NOT" : "")).append(" be ready for fetching").toString());
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z2), BoxesRunTime.boxToBoolean(partitionFetchState.isTruncating()), new StringBuilder(39).append("Partition ").append(topicPartition).append(" should").append((Object) (!z2 ? " NOT" : "")).append(" be truncating its log").toString());
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z3), BoxesRunTime.boxToBoolean(partitionFetchState.isDelayed()), new StringBuilder(28).append("Partition ").append(topicPartition).append(" should").append((Object) (!z3 ? " NOT" : "")).append(" be delayed").toString());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$4(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    private static final FetchResponseData.PartitionData partitionData$1(int i, FetchResponseData.EpochEndOffset epochEndOffset) {
        return new FetchResponseData.PartitionData().setPartitionIndex(i).setLastStableOffset(0L).setLogStartOffset(0L).setDivergingEpoch(epochEndOffset);
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$6(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$7(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public ReplicaFetcherThreadTest() {
        metadataCache().updateMetadata(0, updateMetadataRequest());
    }
}
