package kafka.server;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import kafka.api.Request$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.log.LogManager;
import kafka.log.UnifiedLog;
import kafka.server.AbstractFetcherThread;
import kafka.server.metadata.ZkMetadataCache;
import kafka.utils.DelayedItem;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.SeqLike;
import scala.collection.Set;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.MapLike;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: ReplicaAlterLogDirsThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\tUc\u0001B\u0014)\u00015BQ\u0001\u000e\u0001\u0005\u0002UBq\u0001\u000f\u0001C\u0002\u0013%\u0011\b\u0003\u0004F\u0001\u0001\u0006IA\u000f\u0005\b\r\u0002\u0011\r\u0011\"\u0003:\u0011\u00199\u0005\u0001)A\u0005u!9\u0001\n\u0001b\u0001\n\u0013I\u0005BB'\u0001A\u0003%!\nC\u0004O\u0001\t\u0007I\u0011B(\t\r\u0001\u0004\u0001\u0015!\u0003Q\u0011\u001d\t\u0007A1A\u0005\n\tDa\u0001\u001a\u0001!\u0002\u0013\u0019\u0007bB3\u0001\u0005\u0004%IA\u001a\u0005\u0007U\u0002\u0001\u000b\u0011B4\t\u000f-\u0004!\u0019!C\u0005Y\"1\u0001\u000f\u0001Q\u0001\n5Dq!\u001d\u0001C\u0002\u0013%!\u000fC\u0004\u0002\b\u0001\u0001\u000b\u0011B:\t\u0013\u0005%\u0001A1A\u0005\n\u0005-\u0001\u0002CA\r\u0001\u0001\u0006I!!\u0004\t\u0013\u0005m\u0001A1A\u0005\n\u0005u\u0001\u0002CA\u0016\u0001\u0001\u0006I!a\b\t\u000f\u00055\u0002\u0001\"\u0003\u00020!I\u00111\n\u0001\u0012\u0002\u0013%\u0011Q\n\u0005\b\u0003G\u0002A\u0011AA3\u0011\u001d\t\u0019\t\u0001C\u0001\u0003KBq!a\"\u0001\t\u0003\t)\u0007C\u0004\u0002\f\u0002!I!!$\t\u000f\u0005\r\u0007\u0001\"\u0001\u0002f!9\u0011q\u0019\u0001\u0005\u0002\u0005\u0015\u0004bBAf\u0001\u0011\u0005\u0011Q\r\u0005\b\u0003\u001f\u0004A\u0011AA3\u0011\u001d\t\u0019\u000e\u0001C\u0001\u0003KBq!a6\u0001\t\u0003\t)\u0007C\u0004\u0002\\\u0002!\t!!\u001a\t\u000f\u0005}\u0007\u0001\"\u0001\u0002f!9\u00111\u001d\u0001\u0005\u0002\u0005\u0015\u0004bBAt\u0001\u0011\u0005\u0011\u0011\u001e\u0005\b\u0005O\u0001A\u0011\u0001B\u0015\u0005u\u0011V\r\u001d7jG\u0006\fE\u000e^3s\u0019><G)\u001b:t)\"\u0014X-\u00193UKN$(BA\u0015+\u0003\u0019\u0019XM\u001d<fe*\t1&A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001q\u0003CA\u00183\u001b\u0005\u0001$\"A\u0019\u0002\u000bM\u001c\u0017\r\\1\n\u0005M\u0002$AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002mA\u0011q\u0007A\u0007\u0002Q\u0005!A/\r91+\u0005Q\u0004CA\u001eD\u001b\u0005a$BA\u001f?\u0003\u0019\u0019w.\\7p]*\u00111f\u0010\u0006\u0003\u0001\u0006\u000ba!\u00199bG\",'\"\u0001\"\u0002\u0007=\u0014x-\u0003\u0002Ey\tqAk\u001c9jGB\u000b'\u000f^5uS>t\u0017!\u0002;2aB\u0002\u0013\u0001\u0002;2aF\nQ\u0001^\u0019qc\u0001\nq\u0001^8qS\u000eLE-F\u0001K!\tY4*\u0003\u0002My\t!Q+^5e\u0003!!x\u000e]5d\u0013\u0012\u0004\u0013\u0001\u0003;pa&\u001c\u0017\nZ:\u0016\u0003A\u0003B!\u0015,Y\u00156\t!K\u0003\u0002T)\u0006I\u0011.\\7vi\u0006\u0014G.\u001a\u0006\u0003+B\n!bY8mY\u0016\u001cG/[8o\u0013\t9&KA\u0002NCB\u0004\"!\u00170\u000e\u0003iS!a\u0017/\u0002\t1\fgn\u001a\u0006\u0002;\u0006!!.\u0019<b\u0013\ty&L\u0001\u0004TiJLgnZ\u0001\ni>\u0004\u0018nY%eg\u0002\n!\u0002^8qS\u000et\u0015-\\3t+\u0005\u0019\u0007\u0003B)W\u0015b\u000b1\u0002^8qS\u000et\u0015-\\3tA\u00051A/\u001b32aB*\u0012a\u001a\t\u0003w!L!!\u001b\u001f\u0003!Q{\u0007/[2JIB\u000b'\u000f^5uS>t\u0017a\u0002;jIF\u0002\b\u0007I\u0001\u0011M\u0006LG.\u001a3QCJ$\u0018\u000e^5p]N,\u0012!\u001c\t\u0003o9L!a\u001c\u0015\u0003!\u0019\u000b\u0017\u000e\\3e!\u0006\u0014H/\u001b;j_:\u001c\u0018!\u00054bS2,G\rU1si&$\u0018n\u001c8tA\u0005y\u0001/\u0019:uSRLwN\\*uCR,7/F\u0001t!\r!x/_\u0007\u0002k*\u0011a\u000fX\u0001\u0005kRLG.\u0003\u0002yk\n!A*[:u!\rQ\u0018\u0011\u0001\b\u0003wzl\u0011\u0001 \u0006\u0003{r\nq!\\3tg\u0006<W-\u0003\u0002��y\u0006IR\u000b\u001d3bi\u0016lU\r^1eCR\f'+Z9vKN$H)\u0019;b\u0013\u0011\t\u0019!!\u0002\u00039U\u0003H-\u0019;f\u001b\u0016$\u0018\rZ1uCB\u000b'\u000f^5uS>t7\u000b^1uK*\u0011q\u0010`\u0001\u0011a\u0006\u0014H/\u001b;j_:\u001cF/\u0019;fg\u0002\nQ#\u001e9eCR,W*\u001a;bI\u0006$\u0018MU3rk\u0016\u001cH/\u0006\u0002\u0002\u000eA!\u0011qBA\u000b\u001b\t\t\tBC\u0002\u0002\u0014q\n\u0001B]3rk\u0016\u001cHo]\u0005\u0005\u0003/\t\tBA\u000bVa\u0012\fG/Z'fi\u0006$\u0017\r^1SKF,Xm\u001d;\u0002-U\u0004H-\u0019;f\u001b\u0016$\u0018\rZ1uCJ+\u0017/^3ti\u0002\nQ\"\\3uC\u0012\fG/Y\"bG\",WCAA\u0010!\u0011\t\t#a\n\u000e\u0005\u0005\r\"bAA\u0013Q\u0005AQ.\u001a;bI\u0006$\u0018-\u0003\u0003\u0002*\u0005\r\"a\u0004.l\u001b\u0016$\u0018\rZ1uC\u000e\u000b7\r[3\u0002\u001d5,G/\u00193bi\u0006\u001c\u0015m\u00195fA\u0005\t\u0012N\\5uS\u0006dg)\u001a;dQN#\u0018\r^3\u0015\r\u0005E\u0012qGA!!\r9\u00141G\u0005\u0004\u0003kA#!E%oSRL\u0017\r\u001c$fi\u000eD7\u000b^1uK\"9\u0011\u0011\b\fA\u0002\u0005m\u0012a\u00034fi\u000eDwJ\u001a4tKR\u00042aLA\u001f\u0013\r\ty\u0004\r\u0002\u0005\u0019>tw\rC\u0005\u0002DY\u0001\n\u00111\u0001\u0002F\u0005YA.Z1eKJ,\u0005o\\2i!\ry\u0013qI\u0005\u0004\u0003\u0013\u0002$aA%oi\u0006Y\u0012N\\5uS\u0006dg)\u001a;dQN#\u0018\r^3%I\u00164\u0017-\u001e7uII*\"!a\u0014+\t\u0005\u0015\u0013\u0011K\u0016\u0003\u0003'\u0002B!!\u0016\u0002`5\u0011\u0011q\u000b\u0006\u0005\u00033\nY&A\u0005v]\u000eDWmY6fI*\u0019\u0011Q\f\u0019\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0002b\u0005]#!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u0006a3\u000f[8vY\u0012tu\u000e^!eIB\u000b'\u000f^5uS>t\u0017J\u001a$viV\u0014X\rT8h\u0013Ntu\u000e\u001e#fM&tW\r\u001a\u000b\u0003\u0003O\u00022aLA5\u0013\r\tY\u0007\r\u0002\u0005+:LG\u000fK\u0002\u0019\u0003_\u0002B!!\u001d\u0002��5\u0011\u00111\u000f\u0006\u0005\u0003k\n9(A\u0002ba&TA!!\u001f\u0002|\u00059!.\u001e9ji\u0016\u0014(bAA?\u0003\u0006)!.\u001e8ji&!\u0011\u0011QA:\u0005\u0011!Vm\u001d;\u0002YMDw.\u001e7e+B$\u0017\r^3MK\u0006$WM]#q_\u000eD\u0017I\u001a;fe\u001a+gnY3e\u000bB|7\r[#se>\u0014\bfA\r\u0002p\u000513\u000f[8vY\u0012\u0014V\r\u001d7bG\u0016\u001cUO\u001d:f]Rdun\u001a#je^CWM\\\"bk\u001eDG/\u00169)\u0007i\ty'A\fn_\u000e\\g)\u001a;dQ\u001a\u0013x.\\\"veJ,g\u000e\u001e'pORa\u0011qMAH\u0003'\u000b)+a,\u0002:\"1\u0011\u0011S\u000eA\u0002\u001d\f\u0001\u0003^8qS\u000eLE\rU1si&$\u0018n\u001c8\t\u000f\u0005U5\u00041\u0001\u0002\u0018\u0006Y!/Z9vKN$H)\u0019;b!\u0011\tI*a(\u000f\t\u0005=\u00111T\u0005\u0005\u0003;\u000b\t\"\u0001\u0007GKR\u001c\u0007NU3rk\u0016\u001cH/\u0003\u0003\u0002\"\u0006\r&!\u0004)beRLG/[8o\t\u0006$\u0018M\u0003\u0003\u0002\u001e\u0006E\u0001bBAT7\u0001\u0007\u0011\u0011V\u0001\u0007G>tg-[4\u0011\u0007]\nY+C\u0002\u0002.\"\u00121bS1gW\u0006\u001cuN\u001c4jO\"9\u0011\u0011W\u000eA\u0002\u0005M\u0016A\u0004:fa2L7-Y'b]\u0006<WM\u001d\t\u0004o\u0005U\u0016bAA\\Q\tq!+\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\bbBA^7\u0001\u0007\u0011QX\u0001\re\u0016\u001c\bo\u001c8tK\u0012\u000bG/\u0019\t\u0004o\u0005}\u0016bAAaQ\t\u0011b)\u001a;dQB\u000b'\u000f^5uS>tG)\u0019;b\u0003\tJ7o];fg\u0016\u0003xn\u00195SKF,Xm\u001d;Ge>lGj\\2bYJ+\u0007\u000f\\5dC\"\u001aA$a\u001c\u0002{\u0019,Go\u00195Fa>\u001c\u0007n\u001d$s_6dU-\u00193feNCw.\u001e7e\u0011\u0006tG\r\\3Fq\u000e,\u0007\u000f^5p]\u001a\u0013x.\\$fi2{7-\u00197SKBd\u0017nY1)\u0007u\ty'A\u000ftQ>,H\u000e\u001a+sk:\u001c\u0017\r^3U_J+\u0007\u000f\\5dC>3gm]3uQ\rq\u0012qN\u0001.g\"|W\u000f\u001c3UeVt7-\u0019;f)>,e\u000eZ(gMN,Go\u00144MCJ<Wm\u001d;D_6lwN\\#q_\u000eD\u0007fA\u0010\u0002p\u0005\t5\u000f[8vY\u0012$&/\u001e8dCR,Gk\\%oSRL\u0017\r\u001c$fi\u000eDwJ\u001a4tKRLeMU3qY&\u001c\u0017MU3ukJt7/\u00168eK\u001aLg.\u001a3PM\u001a\u001cX\r\u001e\u0015\u0004A\u0005=\u0014aK:i_VdG\rU8mY&sG-\u001a4j]&$X\r\\=JMJ+\u0007\u000f\\5dC:{G/\u0011<bS2\f'\r\\3)\u0007\u0005\ny'\u0001\u0014tQ>,H\u000e\u001a$fi\u000eDG*Z1eKJ,\u0005o\\2i\u001f:4\u0015N]:u\r\u0016$8\r[(oYfD3AIA8\u0003q\u0019\bn\\;mI\u001a+Go\u00195P]\u0016\u0014V\r\u001d7jG\u0006\fE/\u0011+j[\u0016D3aIA8\u00035\u001a\bn\\;mI\u001a+Go\u00195O_:$U\r\\1zK\u0012\fe\u000e\u001a(p]R\u0013XO\\2bi&twMU3qY&\u001c\u0017m\u001d\u0015\u0004I\u0005=\u0014\u0001B:uk\n$B\"a;\u0003\n\te!Q\u0004B\u0011\u0005K\u0001b!!<\u0002t\u0006]XBAAx\u0015\r\t\t0Q\u0001\tK\u0006\u001c\u00180\\8dW&!\u0011Q_Ax\u0005MIU\t\u001f9fGR\fG/[8o'\u0016$H/\u001a:t!\u0015y\u0013\u0011`A\u007f\u0013\r\tY\u0010\r\u0002\u0007\u001fB$\u0018n\u001c8\u0011\t\u0005}(QA\u0007\u0003\u0005\u0003Q1Aa\u0001+\u0003\u001d\u0019G.^:uKJLAAa\u0002\u0003\u0002\tI\u0001+\u0019:uSRLwN\u001c\u0005\b\u0005\u0017)\u0003\u0019\u0001B\u0007\u0003\u001dawn\u001a+2aB\u0002BAa\u0004\u0003\u00165\u0011!\u0011\u0003\u0006\u0004\u0005'Q\u0013a\u00017pO&!!q\u0003B\t\u0005))f.\u001b4jK\u0012dun\u001a\u0005\b\u00057)\u0003\u0019\u0001B\u0007\u0003\u001dawn\u001a+2aFBqAa\b&\u0001\u0004\u0011i!A\u0005gkR,(/\u001a'pO\"9!1E\u0013A\u0002\u0005u\u0018!\u00039beRLG/[8o\u0011\u001d\t\t,\na\u0001\u0003g\u000bQc\u001d;vE^KG\u000f\u001b$fi\u000eDW*Z:tC\u001e,7\u000f\u0006\b\u0003,\t5\"q\u0006B\u0019\u0005g\u0011)Da\u000e\u0011\r\u00055\u00181_A4\u0011\u001d\u0011YA\na\u0001\u0005\u001bAqAa\u0007'\u0001\u0004\u0011i\u0001C\u0004\u0003 \u0019\u0002\rA!\u0004\t\u000f\t\rb\u00051\u0001\u0002~\"9\u0011\u0011\u0017\u0014A\u0002\u0005M\u0006b\u0002B\u001dM\u0001\u0007!1H\u0001\u0011e\u0016\u001c\bo\u001c8tK\u000e\u000bG\u000e\u001c2bG.\u0004b!!<\u0003>\t\u0005\u0013\u0002\u0002B \u0003_\u0014qaQ1qiV\u0014X\rE\u00040\u0005\u0007\u00129%a\u001a\n\u0007\t\u0015\u0003GA\u0005Gk:\u001cG/[8ocA1!\u0011\nB&\u0005\u001fj\u0011\u0001V\u0005\u0004\u0005\u001b\"&aA*fcB1qF!\u0015h\u0003{K1Aa\u00151\u0005\u0019!V\u000f\u001d7fe\u0001")
/* loaded from: input_file:kafka/server/ReplicaAlterLogDirsThreadTest.class */
public class ReplicaAlterLogDirsThreadTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final Uuid topicId = Uuid.randomUuid();
    private final Map<String, Uuid> topicIds = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic1"), topicId())}));
    private final Map<Uuid, String> topicNames = Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicId()), "topic1")}));
    private final TopicIdPartition tid1p0 = new TopicIdPartition(topicId(), t1p0());
    private final FailedPartitions failedPartitions = new FailedPartitions();
    private final List<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionStates = (List) CollectionConverters$.MODULE$.seqAsJavaListConverter(new $colon.colon(new UpdateMetadataRequestData.UpdateMetadataPartitionState().setTopicName("topic1").setPartitionIndex(0).setControllerEpoch(0).setLeader(0).setLeaderEpoch(0), Nil$.MODULE$)).asJava();
    private final UpdateMetadataRequest updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(), 0, 0, 0, partitionStates(), Collections.emptyList(), (java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(topicIds()).asJava()).build();
    private final ZkMetadataCache metadataCache = new ZkMetadataCache(0);

    private TopicPartition t1p0() {
        return this.t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private Uuid topicId() {
        return this.topicId;
    }

    private Map<String, Uuid> topicIds() {
        return this.topicIds;
    }

    private Map<Uuid, String> topicNames() {
        return this.topicNames;
    }

    private TopicIdPartition tid1p0() {
        return this.tid1p0;
    }

    private FailedPartitions failedPartitions() {
        return this.failedPartitions;
    }

    private List<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionStates() {
        return this.partitionStates;
    }

    private UpdateMetadataRequest updateMetadataRequest() {
        return this.updateMetadataRequest;
    }

    private ZkMetadataCache metadataCache() {
        return this.metadataCache;
    }

    private InitialFetchState initialFetchState(long j, int i) {
        return new InitialFetchState(new Some(topicId()), new BrokerEndPoint(0, "localhost", 9092), i, j);
    }

    private int initialFetchState$default$2() {
        return 1;
    }

    @Test
    public void shouldNotAddPartitionIfFutureLogIsNotDefined() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(false));
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, new BrokerTopicStats());
        Assertions.assertEquals(Predef$.MODULE$.Set().empty(), replicaAlterLogDirsThread.addPartitions(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, initialFetchState$default$2()))}))));
        Assertions.assertEquals(0, replicaAlterLogDirsThread.partitionCount());
        Assertions.assertEquals(None$.MODULE$, replicaAlterLogDirsThread.fetchState(t1p0()));
    }

    @Test
    public void shouldUpdateLeaderEpochAfterFencedEpochError() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Mockito.when(BoxesRunTime.boxToInteger(partition.partitionId())).thenReturn(BoxesRunTime.boxToInteger(0));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.futureLocalLogOrException(t1p0())).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(replicaManager.onlinePartition(t1p0())).thenReturn(new Some(partition));
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(BoxesRunTime.boxToBoolean(replicationQuotaManager.isQuotaExceeded())).thenReturn(BoxesRunTime.boxToBoolean(false));
        Mockito.when(partition.lastOffsetForLeaderEpoch(Optional.empty(), 5, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(0));
        Mockito.when(partition.futureLocalLogOrException()).thenReturn(unifiedLog);
        ((Partition) Mockito.doNothing().when(partition)).truncateTo(0L, true);
        Mockito.when(BoxesRunTime.boxToBoolean(partition.maybeReplaceCurrentWithFutureReplica())).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logStartOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(None$.MODULE$);
        mockFetchFromCurrentLog(tid1p0(), new FetchRequest.PartitionData(topicId(), 0L, 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(5 - 1))), fromProps, replicaManager, new FetchPartitionData(Errors.FENCED_LEADER_EPOCH, -1L, -1L, MemoryRecords.EMPTY, None$.MODULE$, None$.MODULE$, None$.MODULE$, None$.MODULE$, false));
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, new BrokerTopicStats());
        replicaAlterLogDirsThread.addPartitions(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 5 - 1))})));
        Assertions.assertTrue(replicaAlterLogDirsThread.fetchState(t1p0()).isDefined());
        Assertions.assertEquals(1, replicaAlterLogDirsThread.partitionCount());
        replicaAlterLogDirsThread.doWork();
        Assertions.assertTrue(failedPartitions().contains(t1p0()));
        Assertions.assertEquals(None$.MODULE$, replicaAlterLogDirsThread.fetchState(t1p0()));
        Assertions.assertEquals(0, replicaAlterLogDirsThread.partitionCount());
        replicaAlterLogDirsThread.addPartitions(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 5))})));
        Assertions.assertEquals(new Some(BoxesRunTime.boxToInteger(5)), replicaAlterLogDirsThread.fetchState(t1p0()).map(partitionFetchState -> {
            return BoxesRunTime.boxToInteger(partitionFetchState.currentLeaderEpoch());
        }));
        Assertions.assertEquals(1, replicaAlterLogDirsThread.partitionCount());
        mockFetchFromCurrentLog(tid1p0(), new FetchRequest.PartitionData(topicId(), 0L, 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(5))), fromProps, replicaManager, new FetchPartitionData(Errors.NONE, 0L, 0L, MemoryRecords.EMPTY, None$.MODULE$, None$.MODULE$, None$.MODULE$, None$.MODULE$, false));
        replicaAlterLogDirsThread.doWork();
        Assertions.assertFalse(failedPartitions().contains(t1p0()));
        Assertions.assertEquals(None$.MODULE$, replicaAlterLogDirsThread.fetchState(t1p0()));
        Assertions.assertEquals(0, replicaAlterLogDirsThread.partitionCount());
    }

    @Test
    public void shouldReplaceCurrentLogDirWhenCaughtUp() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Mockito.when(BoxesRunTime.boxToInteger(partition.partitionId())).thenReturn(BoxesRunTime.boxToInteger(0));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.futureLocalLogOrException(t1p0())).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(replicaManager.onlinePartition(t1p0())).thenReturn(new Some(partition));
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(BoxesRunTime.boxToBoolean(replicationQuotaManager.isQuotaExceeded())).thenReturn(BoxesRunTime.boxToBoolean(false));
        Mockito.when(partition.lastOffsetForLeaderEpoch(Optional.empty(), 5, false)).thenReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(0));
        Mockito.when(partition.futureLocalLogOrException()).thenReturn(unifiedLog);
        ((Partition) Mockito.doNothing().when(partition)).truncateTo(0L, true);
        Mockito.when(BoxesRunTime.boxToBoolean(partition.maybeReplaceCurrentWithFutureReplica())).thenReturn(BoxesRunTime.boxToBoolean(true));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logStartOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(None$.MODULE$);
        mockFetchFromCurrentLog(tid1p0(), new FetchRequest.PartitionData(topicId(), 0L, 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(5))), fromProps, replicaManager, new FetchPartitionData(Errors.NONE, 0L, 0L, MemoryRecords.EMPTY, None$.MODULE$, None$.MODULE$, None$.MODULE$, None$.MODULE$, false));
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, new BrokerTopicStats());
        replicaAlterLogDirsThread.addPartitions(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 5))})));
        Assertions.assertTrue(replicaAlterLogDirsThread.fetchState(t1p0()).isDefined());
        Assertions.assertEquals(1, replicaAlterLogDirsThread.partitionCount());
        replicaAlterLogDirsThread.doWork();
        Assertions.assertEquals(None$.MODULE$, replicaAlterLogDirsThread.fetchState(t1p0()));
        Assertions.assertEquals(0, replicaAlterLogDirsThread.partitionCount());
    }

    private void mockFetchFromCurrentLog(TopicIdPartition topicIdPartition, FetchRequest.PartitionData partitionData, KafkaConfig kafkaConfig, ReplicaManager replicaManager, FetchPartitionData fetchPartitionData) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Function1.class);
        replicaManager.fetchMessages(ArgumentMatchers.eq(0L), ArgumentMatchers.eq(Request$.MODULE$.FutureLocalReplicaId()), ArgumentMatchers.eq(0), BoxesRunTime.unboxToInt(ArgumentMatchers.eq(BoxesRunTime.boxToInteger(Predef$.MODULE$.Integer2int(kafkaConfig.replicaFetchResponseMaxBytes())))), ArgumentMatchers.eq(false), (Seq) ArgumentMatchers.eq(new $colon.colon(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topicIdPartition), partitionData), Nil$.MODULE$)), (ReplicaQuota) ArgumentMatchers.eq(QuotaFactory$UnboundedQuota$.MODULE$), (Function1) forClass.capture(), (IsolationLevel) ArgumentMatchers.eq(IsolationLevel.READ_UNCOMMITTED), (Option) ArgumentMatchers.eq(None$.MODULE$));
        Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
            $anonfun$mockFetchFromCurrentLog$1(forClass, topicIdPartition, fetchPartitionData, invocationOnMock);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void issuesEpochRequestFromLocalReplica() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        Partition partition2 = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(BoxesRunTime.boxToInteger(partition.partitionId())).andStubReturn(BoxesRunTime.boxToInteger(0));
        EasyMock.expect(BoxesRunTime.boxToInteger(partition.partitionId())).andStubReturn(BoxesRunTime.boxToInteger(1));
        EasyMock.expect(replicaManager.getPartitionOrException(t1p0())).andStubReturn(partition);
        EasyMock.expect(partition.lastOffsetForLeaderEpoch(Optional.empty(), 2, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(13)).anyTimes();
        EasyMock.expect(replicaManager.getPartitionOrException(t1p1())).andStubReturn(partition2);
        EasyMock.expect(partition2.lastOffsetForLeaderEpoch(Optional.empty(), 5, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(1).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(232)).anyTimes();
        EasyMock.replay(new Object[]{partition, partition2, replicaManager});
        Assertions.assertEquals(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(t1p0().partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(13)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(t1p1().partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(232))})), new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, (ReplicationQuotaManager) null, (BrokerTopicStats) null).fetchEpochEndOffsets(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p0().partition()).setLeaderEpoch(2)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p1().partition()).setLeaderEpoch(5))}))), "results from leader epoch request should have offset from local replica");
    }

    @Test
    public void fetchEpochsFromLeaderShouldHandleExceptionFromGetLocalReplica() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(BoxesRunTime.boxToInteger(partition.partitionId())).andStubReturn(BoxesRunTime.boxToInteger(0));
        EasyMock.expect(replicaManager.getPartitionOrException(t1p0())).andStubReturn(partition);
        EasyMock.expect(partition.lastOffsetForLeaderEpoch(Optional.empty(), 2, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(13)).anyTimes();
        EasyMock.expect(replicaManager.getPartitionOrException(t1p1())).andThrow(new KafkaStorageException()).once();
        EasyMock.replay(new Object[]{partition, replicaManager});
        Assertions.assertEquals(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(t1p0().partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(13)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(t1p1().partition()).setErrorCode(Errors.KAFKA_STORAGE_ERROR.code()))})), new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, (ReplicationQuotaManager) null, (BrokerTopicStats) null).fetchEpochEndOffsets(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p0().partition()).setLeaderEpoch(2)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p1().partition()).setLeaderEpoch(2))}))));
    }

    @Test
    public void shouldTruncateToReplicaOffset() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        Capture newCapture2 = EasyMock.newCapture(CaptureType.ALL);
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) EasyMock.createNiceMock(UnifiedLog.class);
        UnifiedLog unifiedLog2 = (UnifiedLog) EasyMock.createNiceMock(UnifiedLog.class);
        UnifiedLog unifiedLog3 = (UnifiedLog) EasyMock.createNiceMock(UnifiedLog.class);
        UnifiedLog unifiedLog4 = (UnifiedLog) EasyMock.createNiceMock(UnifiedLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        Partition partition2 = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        Capture<Function1<Seq<Tuple2<TopicIdPartition, FetchPartitionData>>, BoxedUnit>> newCapture3 = EasyMock.newCapture();
        EasyMock.expect(BoxesRunTime.boxToInteger(partition.partitionId())).andStubReturn(BoxesRunTime.boxToInteger(0));
        EasyMock.expect(BoxesRunTime.boxToInteger(partition2.partitionId())).andStubReturn(BoxesRunTime.boxToInteger(1));
        EasyMock.expect(replicaManager.metadataCache()).andStubReturn(metadataCache());
        EasyMock.expect(replicaManager.getPartitionOrException(t1p0())).andStubReturn(partition);
        EasyMock.expect(replicaManager.getPartitionOrException(t1p1())).andStubReturn(partition2);
        EasyMock.expect(replicaManager.futureLocalLogOrException(t1p0())).andStubReturn(unifiedLog3);
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).andStubReturn(BoxesRunTime.boxToBoolean(true));
        EasyMock.expect(replicaManager.futureLocalLogOrException(t1p1())).andStubReturn(unifiedLog4);
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p1()))).andStubReturn(BoxesRunTime.boxToBoolean(true));
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        partition2.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture2)), EasyMock.anyBoolean());
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(unifiedLog3.logEndOffset())).andReturn(BoxesRunTime.boxToLong(191)).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(unifiedLog4.logEndOffset())).andReturn(BoxesRunTime.boxToLong(191)).anyTimes();
        EasyMock.expect(unifiedLog3.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(2))).anyTimes();
        EasyMock.expect(unifiedLog3.endOffsetForEpoch(2)).andReturn(new Some(new OffsetAndEpoch(191, 2))).anyTimes();
        EasyMock.expect(partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 2, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(190)).anyTimes();
        EasyMock.expect(unifiedLog4.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(2))).anyTimes();
        EasyMock.expect(unifiedLog4.endOffsetForEpoch(2)).andReturn(new Some(new OffsetAndEpoch(191, 2))).anyTimes();
        EasyMock.expect(partition2.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 2, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(1).setErrorCode(Errors.NONE.code()).setLeaderEpoch(2).setEndOffset(192)).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        stubWithFetchMessages(unifiedLog, unifiedLog2, unifiedLog3, partition, replicaManager, newCapture3);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, partition2, unifiedLog, unifiedLog2, unifiedLog3, unifiedLog4});
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, (BrokerTopicStats) null);
        replicaAlterLogDirsThread.addPartitions(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, initialFetchState$default$2())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, initialFetchState$default$2()))})));
        replicaAlterLogDirsThread.doWork();
        Assertions.assertEquals(190, BoxesRunTime.unboxToLong(newCapture.getValue()));
        Assertions.assertEquals(191, BoxesRunTime.unboxToLong(newCapture2.getValue()));
    }

    @Test
    public void shouldTruncateToEndOffsetOfLargestCommonEpoch() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) EasyMock.createNiceMock(UnifiedLog.class);
        UnifiedLog unifiedLog2 = (UnifiedLog) EasyMock.createNiceMock(UnifiedLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        Capture<Function1<Seq<Tuple2<TopicIdPartition, FetchPartitionData>>, BoxedUnit>> newCapture2 = EasyMock.newCapture();
        EasyMock.expect(BoxesRunTime.boxToInteger(partition.partitionId())).andStubReturn(BoxesRunTime.boxToInteger(0));
        EasyMock.expect(replicaManager.metadataCache()).andStubReturn(metadataCache());
        EasyMock.expect(replicaManager.getPartitionOrException(t1p0())).andStubReturn(partition);
        EasyMock.expect(replicaManager.futureLocalLogOrException(t1p0())).andStubReturn(unifiedLog2);
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).andStubReturn(BoxesRunTime.boxToBoolean(true));
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.eq(true));
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToLong(unifiedLog2.logEndOffset())).andReturn(BoxesRunTime.boxToLong(195)).anyTimes();
        EasyMock.expect(unifiedLog2.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5))).once();
        EasyMock.expect(unifiedLog2.latestEpoch()).andReturn(new Some(BoxesRunTime.boxToInteger(5 - 2))).times(3);
        EasyMock.expect(partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 5, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5 - 1).setEndOffset(200)).anyTimes();
        EasyMock.expect(unifiedLog2.endOffsetForEpoch(5 - 1)).andReturn(new Some(new OffsetAndEpoch(195, 5 - 2))).anyTimes();
        EasyMock.expect(partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 5 - 2, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5 - 2).setEndOffset(190)).anyTimes();
        EasyMock.expect(unifiedLog2.endOffsetForEpoch(5 - 2)).andReturn(new Some(new OffsetAndEpoch(191, 5 - 2))).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        stubWithFetchMessages(unifiedLog, null, unifiedLog2, partition, replicaManager, newCapture2);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, unifiedLog, unifiedLog2});
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, (BrokerTopicStats) null);
        replicaAlterLogDirsThread.addPartitions(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, initialFetchState$default$2()))})));
        replicaAlterLogDirsThread.doWork();
        replicaAlterLogDirsThread.doWork();
        Assertions.assertTrue(((SeqLike) CollectionConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).contains(BoxesRunTime.boxToInteger(190)), new StringBuilder(48).append("Expected offset ").append(190).append(" in captured truncation offsets ").append(newCapture.getValues()).toString());
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfReplicaReturnsUndefinedOffset() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) EasyMock.createNiceMock(UnifiedLog.class);
        UnifiedLog unifiedLog2 = (UnifiedLog) EasyMock.createNiceMock(UnifiedLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        Capture<Function1<Seq<Tuple2<TopicIdPartition, FetchPartitionData>>, BoxedUnit>> newCapture2 = EasyMock.newCapture();
        EasyMock.expect(replicaManager.getPartitionOrException(t1p0())).andStubReturn(partition);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.eq(true));
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(replicaManager.metadataCache()).andStubReturn(metadataCache());
        EasyMock.expect(replicaManager.futureLocalLogOrException(t1p0())).andStubReturn(unifiedLog2);
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).andStubReturn(BoxesRunTime.boxToBoolean(true));
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(unifiedLog2.latestEpoch()).andReturn(None$.MODULE$).anyTimes();
        stubWithFetchMessages(unifiedLog, null, unifiedLog2, partition, replicaManager, newCapture2);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, unifiedLog, unifiedLog2});
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, (BrokerTopicStats) null);
        replicaAlterLogDirsThread.addPartitions(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(100, initialFetchState$default$2()))})));
        replicaAlterLogDirsThread.doWork();
        Assertions.assertEquals(100, BoxesRunTime.unboxToLong(newCapture.getValue()), "Expected future replica to truncate to initial fetch offset if replica returns UNDEFINED_EPOCH_OFFSET");
    }

    @Test
    public void shouldPollIndefinitelyIfReplicaNotAvailable() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) EasyMock.createNiceMock(UnifiedLog.class);
        UnifiedLog unifiedLog2 = (UnifiedLog) EasyMock.createNiceMock(UnifiedLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        Capture newCapture2 = EasyMock.newCapture();
        EasyMock.expect(BoxesRunTime.boxToInteger(partition.partitionId())).andStubReturn(BoxesRunTime.boxToInteger(0));
        EasyMock.expect(replicaManager.metadataCache()).andStubReturn(metadataCache());
        EasyMock.expect(replicaManager.getPartitionOrException(t1p0())).andStubReturn(partition);
        partition.truncateTo(BoxesRunTime.unboxToLong(EasyMock.capture(newCapture)), EasyMock.eq(true));
        EasyMock.expect(BoxedUnit.UNIT).once();
        EasyMock.expect(replicaManager.futureLocalLogOrException(t1p0())).andStubReturn(unifiedLog2);
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).andStubReturn(BoxesRunTime.boxToBoolean(true));
        EasyMock.expect(BoxesRunTime.boxToLong(unifiedLog2.logEndOffset())).andReturn(BoxesRunTime.boxToLong(290)).anyTimes();
        EasyMock.expect(unifiedLog2.latestEpoch()).andStubReturn(new Some(BoxesRunTime.boxToInteger(1)));
        EasyMock.expect(unifiedLog2.endOffsetForEpoch(1)).andReturn(new Some(new OffsetAndEpoch(290, 1)));
        EasyMock.expect(replicaManager.localLog(t1p0())).andReturn(new Some(unifiedLog)).anyTimes();
        EasyMock.expect(partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 1, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.REPLICA_NOT_AVAILABLE.code())).times(3).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(1).setEndOffset(300));
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        replicaManager.fetchMessages(EasyMock.anyLong(), EasyMock.anyInt(), EasyMock.anyInt(), EasyMock.anyInt(), BoxesRunTime.unboxToBoolean(EasyMock.anyObject()), (Seq) EasyMock.anyObject(), (ReplicaQuota) EasyMock.anyObject(), (Function1) EasyMock.capture(newCapture2), (IsolationLevel) EasyMock.anyObject(), (Option) EasyMock.anyObject());
        EasyMock.expect(BoxedUnit.UNIT).andAnswer(() -> {
            $anonfun$shouldPollIndefinitelyIfReplicaNotAvailable$1(newCapture2);
            return BoxedUnit.UNIT;
        }).anyTimes();
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, unifiedLog, unifiedLog2});
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, (BrokerTopicStats) null);
        replicaAlterLogDirsThread.addPartitions(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, initialFetchState$default$2()))})));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 2).foreach$mVc$sp(i -> {
            replicaAlterLogDirsThread.doWork();
        });
        Assertions.assertEquals(0, newCapture.getValues().size());
        replicaAlterLogDirsThread.doWork();
        Assertions.assertEquals(290, BoxesRunTime.unboxToLong(newCapture.getValue()));
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnly() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) EasyMock.createNiceMock(UnifiedLog.class);
        UnifiedLog unifiedLog2 = (UnifiedLog) EasyMock.createNiceMock(UnifiedLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        Capture<Function1<Seq<Tuple2<TopicIdPartition, FetchPartitionData>>, BoxedUnit>> newCapture = EasyMock.newCapture();
        EasyMock.expect(BoxesRunTime.boxToInteger(partition.partitionId())).andStubReturn(BoxesRunTime.boxToInteger(0));
        EasyMock.expect(replicaManager.metadataCache()).andStubReturn(metadataCache());
        EasyMock.expect(replicaManager.getPartitionOrException(t1p0())).andStubReturn(partition);
        EasyMock.expect(partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), 5, false)).andReturn(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(0).setErrorCode(Errors.NONE.code()).setLeaderEpoch(5).setEndOffset(213));
        partition.truncateTo(190, true);
        EasyMock.expect(BoxedUnit.UNIT).once();
        EasyMock.expect(replicaManager.futureLocalLogOrException(t1p0())).andStubReturn(unifiedLog2);
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).andStubReturn(BoxesRunTime.boxToBoolean(true));
        EasyMock.expect(unifiedLog2.latestEpoch()).andStubReturn(new Some(BoxesRunTime.boxToInteger(5)));
        EasyMock.expect(BoxesRunTime.boxToLong(unifiedLog2.logEndOffset())).andStubReturn(BoxesRunTime.boxToLong(190));
        EasyMock.expect(unifiedLog2.endOffsetForEpoch(5)).andReturn(new Some(new OffsetAndEpoch(190, 5)));
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        stubWithFetchMessages(unifiedLog, null, unifiedLog2, partition, replicaManager, newCapture);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, unifiedLog, unifiedLog2});
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, (BrokerTopicStats) null);
        replicaAlterLogDirsThread.addPartitions(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, initialFetchState$default$2()))})));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(i -> {
            replicaAlterLogDirsThread.doWork();
        });
        EasyMock.verify(new Object[]{partition});
    }

    @Test
    public void shouldFetchOneReplicaAtATime() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) EasyMock.createNiceMock(UnifiedLog.class);
        UnifiedLog unifiedLog2 = (UnifiedLog) EasyMock.createNiceMock(UnifiedLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.metadataCache()).andStubReturn(metadataCache());
        stub(unifiedLog, null, unifiedLog2, partition, replicaManager);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, unifiedLog});
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, (BrokerTopicStats) null);
        replicaAlterLogDirsThread.addPartitions(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, 1))})));
        AbstractFetcherThread.ResultWithPartitions buildFetch = replicaAlterLogDirsThread.buildFetch(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new PartitionFetchState(new Some(topicId()), 150L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new PartitionFetchState(new Some(topicId()), 160L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$))})));
        if (buildFetch == null) {
            throw new MatchError((Object) null);
        }
        Option option = (Option) buildFetch.result();
        Set partitionsWithError = buildFetch.partitionsWithError();
        Assertions.assertTrue(option.isDefined());
        FetchRequest.Builder fetchRequest = ((AbstractFetcherThread.ReplicaFetch) option.get()).fetchRequest();
        Assertions.assertFalse(fetchRequest.fetchData().isEmpty());
        Assertions.assertFalse(partitionsWithError.nonEmpty());
        FetchRequest build = fetchRequest.build();
        Assertions.assertEquals(0, build.minBytes());
        Seq seq = ((MapLike) CollectionConverters$.MODULE$.mapAsScalaMapConverter(build.fetchData((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(topicNames()).asJava())).asScala()).toSeq();
        Assertions.assertEquals(1, seq.length());
        Assertions.assertEquals(t1p0(), ((TopicIdPartition) ((Tuple2) seq.head())._1()).topicPartition(), "Expected fetch request for first partition");
        Assertions.assertEquals(150L, ((FetchRequest.PartitionData) ((Tuple2) seq.head())._2()).fetchOffset);
    }

    @Test
    public void shouldFetchNonDelayedAndNonTruncatingReplicas() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) EasyMock.createNiceMock(UnifiedLog.class);
        UnifiedLog unifiedLog2 = (UnifiedLog) EasyMock.createNiceMock(UnifiedLog.class);
        Partition partition = (Partition) EasyMock.createMock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(BoxesRunTime.boxToLong(unifiedLog2.logStartOffset())).andReturn(BoxesRunTime.boxToLong(123)).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        EasyMock.expect(replicaManager.metadataCache()).andStubReturn(metadataCache());
        stub(unifiedLog, null, unifiedLog2, partition, replicaManager);
        EasyMock.replay(new Object[]{replicaManager, logManager, replicationQuotaManager, partition, unifiedLog, unifiedLog2});
        ReplicaAlterLogDirsThread replicaAlterLogDirsThread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", new BrokerEndPoint(0, "localhost", 1000), fromProps, failedPartitions(), replicaManager, replicationQuotaManager, (BrokerTopicStats) null);
        replicaAlterLogDirsThread.addPartitions(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(0L, 1))})));
        AbstractFetcherThread.ResultWithPartitions buildFetch = replicaAlterLogDirsThread.buildFetch(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), PartitionFetchState$.MODULE$.apply(new Some(topicId()), 150L, None$.MODULE$, 1, Fetching$.MODULE$, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), PartitionFetchState$.MODULE$.apply(new Some(topicId()), 160L, None$.MODULE$, 1, Truncating$.MODULE$, None$.MODULE$))})));
        if (buildFetch == null) {
            throw new MatchError((Object) null);
        }
        Option option = (Option) buildFetch.result();
        Set partitionsWithError = buildFetch.partitionsWithError();
        Assertions.assertTrue(option.isDefined());
        AbstractFetcherThread.ReplicaFetch replicaFetch = (AbstractFetcherThread.ReplicaFetch) option.get();
        Assertions.assertFalse(replicaFetch.partitionData().isEmpty());
        Assertions.assertFalse(partitionsWithError.nonEmpty());
        Seq seq = ((MapLike) CollectionConverters$.MODULE$.mapAsScalaMapConverter(replicaFetch.fetchRequest().build().fetchData((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(topicNames()).asJava())).asScala()).toSeq();
        Assertions.assertEquals(1, seq.length());
        Assertions.assertEquals(t1p0(), ((TopicIdPartition) ((Tuple2) seq.head())._1()).topicPartition(), "Expected fetch request for non-truncating partition");
        Assertions.assertEquals(150L, ((FetchRequest.PartitionData) ((Tuple2) seq.head())._2()).fetchOffset);
        AbstractFetcherThread.ResultWithPartitions buildFetch2 = replicaAlterLogDirsThread.buildFetch(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), PartitionFetchState$.MODULE$.apply(new Some(topicId()), 140L, None$.MODULE$, 1, Fetching$.MODULE$, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new PartitionFetchState(new Some(topicId()), 160L, None$.MODULE$, 1, new Some(new DelayedItem(5000L)), Fetching$.MODULE$, None$.MODULE$))})));
        if (buildFetch2 == null) {
            throw new MatchError((Object) null);
        }
        Option option2 = (Option) buildFetch2.result();
        Set partitionsWithError2 = buildFetch2.partitionsWithError();
        Assertions.assertTrue(option2.isDefined());
        AbstractFetcherThread.ReplicaFetch replicaFetch2 = (AbstractFetcherThread.ReplicaFetch) option2.get();
        Assertions.assertFalse(replicaFetch2.partitionData().isEmpty());
        Assertions.assertFalse(partitionsWithError2.nonEmpty());
        Seq seq2 = ((MapLike) CollectionConverters$.MODULE$.mapAsScalaMapConverter(replicaFetch2.fetchRequest().build().fetchData((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(topicNames()).asJava())).asScala()).toSeq();
        Assertions.assertEquals(1, seq2.length());
        Assertions.assertEquals(t1p0(), ((TopicIdPartition) ((Tuple2) seq2.head())._1()).topicPartition(), "Expected fetch request for non-delayed partition");
        Assertions.assertEquals(140L, ((FetchRequest.PartitionData) ((Tuple2) seq2.head())._2()).fetchOffset);
        AbstractFetcherThread.ResultWithPartitions buildFetch3 = replicaAlterLogDirsThread.buildFetch(scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new PartitionFetchState(new Some(topicId()), 140L, None$.MODULE$, 1, new Some(new DelayedItem(5000L)), Fetching$.MODULE$, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new PartitionFetchState(new Some(topicId()), 160L, None$.MODULE$, 1, new Some(new DelayedItem(5000L)), Fetching$.MODULE$, None$.MODULE$))})));
        if (buildFetch3 == null) {
            throw new MatchError((Object) null);
        }
        Option option3 = (Option) buildFetch3.result();
        Set partitionsWithError3 = buildFetch3.partitionsWithError();
        Assertions.assertTrue(option3.isEmpty(), "Expected no fetch requests since all partitions are delayed");
        Assertions.assertFalse(partitionsWithError3.nonEmpty());
    }

    public IExpectationSetters<Option<Partition>> stub(UnifiedLog unifiedLog, UnifiedLog unifiedLog2, UnifiedLog unifiedLog3, Partition partition, ReplicaManager replicaManager) {
        EasyMock.expect(replicaManager.localLog(t1p0())).andReturn(new Some(unifiedLog)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException(t1p0())).andReturn(unifiedLog).anyTimes();
        EasyMock.expect(replicaManager.futureLocalLogOrException(t1p0())).andReturn(unifiedLog3).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p0()))).andStubReturn(BoxesRunTime.boxToBoolean(true));
        EasyMock.expect(replicaManager.onlinePartition(t1p0())).andReturn(new Some(partition)).anyTimes();
        EasyMock.expect(replicaManager.localLog(t1p1())).andReturn(new Some(unifiedLog2)).anyTimes();
        EasyMock.expect(replicaManager.localLogOrException(t1p1())).andReturn(unifiedLog2).anyTimes();
        EasyMock.expect(replicaManager.futureLocalLogOrException(t1p1())).andReturn(unifiedLog3).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToBoolean(replicaManager.futureLogExists(t1p1()))).andStubReturn(BoxesRunTime.boxToBoolean(true));
        return EasyMock.expect(replicaManager.onlinePartition(t1p1())).andReturn(new Some(partition)).anyTimes();
    }

    public IExpectationSetters<BoxedUnit> stubWithFetchMessages(UnifiedLog unifiedLog, UnifiedLog unifiedLog2, UnifiedLog unifiedLog3, Partition partition, ReplicaManager replicaManager, Capture<Function1<Seq<Tuple2<TopicIdPartition, FetchPartitionData>>, BoxedUnit>> capture) {
        stub(unifiedLog, unifiedLog2, unifiedLog3, partition, replicaManager);
        replicaManager.fetchMessages(EasyMock.anyLong(), EasyMock.anyInt(), EasyMock.anyInt(), EasyMock.anyInt(), BoxesRunTime.unboxToBoolean(EasyMock.anyObject()), (Seq) EasyMock.anyObject(), (ReplicaQuota) EasyMock.anyObject(), (Function1) EasyMock.capture(capture), (IsolationLevel) EasyMock.anyObject(), (Option) EasyMock.anyObject());
        return EasyMock.expect(BoxedUnit.UNIT).andAnswer(() -> {
            $anonfun$stubWithFetchMessages$1(capture);
            return BoxedUnit.UNIT;
        }).anyTimes();
    }

    public static final /* synthetic */ void $anonfun$mockFetchFromCurrentLog$1(ArgumentCaptor argumentCaptor, TopicIdPartition topicIdPartition, FetchPartitionData fetchPartitionData, InvocationOnMock invocationOnMock) {
        ((Function1) argumentCaptor.getValue()).apply(new $colon.colon(new Tuple2(topicIdPartition, fetchPartitionData), Nil$.MODULE$));
    }

    public static final /* synthetic */ void $anonfun$shouldPollIndefinitelyIfReplicaNotAvailable$1(Capture capture) {
        ((Function1) capture.getValue()).apply(Nil$.MODULE$);
    }

    public static final /* synthetic */ void $anonfun$stubWithFetchMessages$1(Capture capture) {
        ((Function1) capture.getValue()).apply(Nil$.MODULE$);
    }

    public ReplicaAlterLogDirsThreadTest() {
        metadataCache().updateMetadata(0, updateMetadataRequest());
    }
}
