package org.apache.kafka.server.share.persister;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.persister.PersisterStateManager;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/server/share/persister/PersisterStateManagerTest.class */
class PersisterStateManagerTest {
    private static final KafkaClient CLIENT = (KafkaClient) Mockito.mock(KafkaClient.class);
    private static final Time MOCK_TIME = new MockTime();
    private static final Timer MOCK_TIMER = new MockTimer(MOCK_TIME);
    private static final ShareCoordinatorMetadataCacheHelper CACHE_HELPER = (ShareCoordinatorMetadataCacheHelper) Mockito.mock(ShareCoordinatorMetadataCacheHelper.class);
    private static final int MAX_RPC_RETRY_ATTEMPTS = 5;
    public static final long REQUEST_BACKOFF_MS = 100;
    public static final long REQUEST_BACKOFF_MAX_MS = 3000;
    private static final String HOST = "localhost";
    private static final int PORT = 9092;
    private static Timer mockTimer;

    /* loaded from: input_file:org/apache/kafka/server/share/persister/PersisterStateManagerTest$PersisterStateManagerBuilder.class */
    private static class PersisterStateManagerBuilder {
        private KafkaClient client = PersisterStateManagerTest.CLIENT;
        private Time time = PersisterStateManagerTest.MOCK_TIME;
        private Timer timer = PersisterStateManagerTest.MOCK_TIMER;
        private ShareCoordinatorMetadataCacheHelper cacheHelper = PersisterStateManagerTest.CACHE_HELPER;

        private PersisterStateManagerBuilder() {
        }

        private PersisterStateManagerBuilder withKafkaClient(KafkaClient kafkaClient) {
            this.client = kafkaClient;
            return this;
        }

        private PersisterStateManagerBuilder withCacheHelper(ShareCoordinatorMetadataCacheHelper shareCoordinatorMetadataCacheHelper) {
            this.cacheHelper = shareCoordinatorMetadataCacheHelper;
            return this;
        }

        private PersisterStateManagerBuilder withTime(Time time) {
            this.time = time;
            return this;
        }

        private PersisterStateManagerBuilder withTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        public static PersisterStateManagerBuilder builder() {
            return new PersisterStateManagerBuilder();
        }

        public PersisterStateManager build() {
            return new PersisterStateManager(this.client, this.cacheHelper, this.time, this.timer);
        }
    }

    /* loaded from: input_file:org/apache/kafka/server/share/persister/PersisterStateManagerTest$TestStateHandler.class */
    private abstract class TestStateHandler extends PersisterStateManager.PersisterStateManagerHandler {
        private final CompletableFuture<TestHandlerResponse> result;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/kafka/server/share/persister/PersisterStateManagerTest$TestStateHandler$TestHandlerResponse.class */
        public class TestHandlerResponse extends WriteShareGroupStateResponse {
            public TestHandlerResponse(WriteShareGroupStateResponseData writeShareGroupStateResponseData) {
                super(writeShareGroupStateResponseData);
            }
        }

        /* loaded from: input_file:org/apache/kafka/server/share/persister/PersisterStateManagerTest$TestStateHandler$TestHandlerResponseData.class */
        private class TestHandlerResponseData extends WriteShareGroupStateResponseData {
            private TestHandlerResponseData() {
            }
        }

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        TestStateHandler(PersisterStateManager persisterStateManager, String str, Uuid uuid, int i, CompletableFuture<TestHandlerResponse> completableFuture, long j, long j2, int i2) {
            super(persisterStateManager, str, uuid, i, j, j2, i2);
            Objects.requireNonNull(persisterStateManager);
            this.result = completableFuture;
        }

        protected void handleRequestResponse(ClientResponse clientResponse) {
            this.result.complete(new TestHandlerResponse(new TestHandlerResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partitionKey().partition()).setErrorMessage(Errors.NONE.message()).setErrorCode(Errors.NONE.code())))))));
        }

        protected boolean isResponseForRequest(ClientResponse clientResponse) {
            return true;
        }

        protected void findCoordinatorErrorResponse(Errors errors, Exception exc) {
            this.result.complete(new TestHandlerResponse(new TestHandlerResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(partitionKey().topicId()).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partitionKey().partition()).setErrorMessage(exc == null ? errors.message() : exc.getMessage()).setErrorCode(errors.code())))))));
        }

        protected String name() {
            return "TestStateHandler";
        }

        protected boolean isBatchable() {
            return false;
        }

        protected PersisterStateManager.RPCType rpcType() {
            return PersisterStateManager.RPCType.UNKNOWN;
        }

        protected CompletableFuture<TestHandlerResponse> result() {
            return this.result;
        }
    }

    PersisterStateManagerTest() {
    }

    private ShareCoordinatorMetadataCacheHelper getDefaultCacheHelper(final Node node) {
        return new ShareCoordinatorMetadataCacheHelper() { // from class: org.apache.kafka.server.share.persister.PersisterStateManagerTest.1
            public boolean containsTopic(String str) {
                return false;
            }

            public Node getShareCoordinator(SharePartitionKey sharePartitionKey, String str) {
                return Node.noNode();
            }

            public List<Node> getClusterNodes() {
                return Collections.singletonList(node);
            }
        };
    }

    private ShareCoordinatorMetadataCacheHelper getCoordinatorCacheHelper(final Node node) {
        return new ShareCoordinatorMetadataCacheHelper() { // from class: org.apache.kafka.server.share.persister.PersisterStateManagerTest.2
            public boolean containsTopic(String str) {
                return true;
            }

            public Node getShareCoordinator(SharePartitionKey sharePartitionKey, String str) {
                return node;
            }

            public List<Node> getClusterNodes() {
                return Collections.emptyList();
            }
        };
    }

    @BeforeEach
    public void setUp() {
        mockTimer = new SystemTimerReaper("persisterStateManagerTestTimer", new SystemTimer("persisterStateManagerTestTimer"));
    }

    @AfterEach
    public void tearDown() throws Exception {
        Utils.closeQuietly(mockTimer, "persisterStateManagerTestTimer");
    }

    @Test
    public void testFindCoordinatorFatalError() {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        Uuid randomUuid = Uuid.randomUuid();
        Node node = new Node(0, HOST, PORT);
        String asCoordinatorKey = SharePartitionKey.asCoordinatorKey("group1", randomUuid, 10);
        mockClient.prepareResponseFrom(abstractRequest -> {
            return (abstractRequest instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String) ((FindCoordinatorRequest) abstractRequest).data().coordinatorKeys().get(0)).equals(asCoordinatorKey);
        }, new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setKey(asCoordinatorKey).setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()).setHost(Node.noNode().host()).setNodeId(Node.noNode().id()).setPort(Node.noNode().port())))), node);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getDefaultCacheHelper(node)).build();
        build.start();
        TestStateHandler testStateHandler = (TestStateHandler) Mockito.spy(new TestStateHandler(build, "group1", randomUuid, 10, new CompletableFuture(), 100L, REQUEST_BACKOFF_MAX_MS, MAX_RPC_RETRY_ATTEMPTS) { // from class: org.apache.kafka.server.share.persister.PersisterStateManagerTest.3
            protected AbstractRequest.Builder<? extends AbstractRequest> requestBuilder() {
                return null;
            }
        });
        build.enqueue(testStateHandler);
        TestStateHandler.TestHandlerResponse testHandlerResponse = null;
        try {
            testHandlerResponse = testStateHandler.result().get();
        } catch (Exception e) {
            Assertions.fail("Failed to get result from future", e);
        }
        Assertions.assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(), ((WriteShareGroupStateResponseData.PartitionResult) ((WriteShareGroupStateResponseData.WriteStateResult) testHandlerResponse.data().results().get(0)).partitions().get(0)).errorCode());
        ((TestStateHandler) Mockito.verify(testStateHandler, Mockito.times(1))).findShareCoordinatorBuilder();
        try {
            build.stop();
        } catch (Exception e2) {
            Assertions.fail("Failed to stop state manager", e2);
        }
    }

    @Test
    public void testFindCoordinatorAttemptsExhausted() {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        Uuid randomUuid = Uuid.randomUuid();
        Node node = new Node(0, HOST, PORT);
        String asCoordinatorKey = SharePartitionKey.asCoordinatorKey("group1", randomUuid, 10);
        mockClient.prepareResponseFrom(abstractRequest -> {
            return (abstractRequest instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String) ((FindCoordinatorRequest) abstractRequest).data().coordinatorKeys().get(0)).equals(asCoordinatorKey);
        }, new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setKey(asCoordinatorKey).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()).setHost(Node.noNode().host()).setNodeId(Node.noNode().id()).setPort(Node.noNode().port())))), node);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            return (abstractRequest2 instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest2).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String) ((FindCoordinatorRequest) abstractRequest2).data().coordinatorKeys().get(0)).equals(asCoordinatorKey);
        }, new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setKey(asCoordinatorKey).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()).setHost(Node.noNode().host()).setNodeId(Node.noNode().id()).setPort(Node.noNode().port())))), node);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getDefaultCacheHelper(node)).build();
        build.start();
        TestStateHandler testStateHandler = (TestStateHandler) Mockito.spy(new TestStateHandler(build, "group1", randomUuid, 10, new CompletableFuture(), 100L, REQUEST_BACKOFF_MAX_MS, 2) { // from class: org.apache.kafka.server.share.persister.PersisterStateManagerTest.4
            protected AbstractRequest.Builder<? extends AbstractRequest> requestBuilder() {
                return null;
            }
        });
        build.enqueue(testStateHandler);
        TestStateHandler.TestHandlerResponse testHandlerResponse = null;
        try {
            testHandlerResponse = testStateHandler.result.get();
        } catch (Exception e) {
            Assertions.fail("Failed to get result from future", e);
        }
        Assertions.assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), ((WriteShareGroupStateResponseData.PartitionResult) ((WriteShareGroupStateResponseData.WriteStateResult) testHandlerResponse.data().results().get(0)).partitions().get(0)).errorCode());
        ((TestStateHandler) Mockito.verify(testStateHandler, Mockito.times(2))).findShareCoordinatorBuilder();
        try {
            build.stop();
        } catch (Exception e2) {
            Assertions.fail("Failed to stop state manager", e2);
        }
    }

    @Test
    public void testFindCoordinatorSuccess() {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        String str = "group1";
        Uuid randomUuid = Uuid.randomUuid();
        int i = 10;
        Node node = new Node(0, HOST, PORT);
        Node node2 = new Node(1, HOST, PORT);
        String asCoordinatorKey = SharePartitionKey.asCoordinatorKey("group1", randomUuid, 10);
        mockClient.prepareResponseFrom(abstractRequest -> {
            return (abstractRequest instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String) ((FindCoordinatorRequest) abstractRequest).data().coordinatorKeys().get(0)).equals(asCoordinatorKey);
        }, new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(PORT).setErrorCode(Errors.NONE.code())))), node);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            ReadShareGroupStateRequest readShareGroupStateRequest = (ReadShareGroupStateRequest) abstractRequest2;
            return readShareGroupStateRequest.data().groupId().equals(str) && ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((ReadShareGroupStateRequestData.PartitionData) ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), node2);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getDefaultCacheHelper(node)).build();
        build.start();
        CompletableFuture completableFuture = new CompletableFuture();
        Objects.requireNonNull(build);
        PersisterStateManager.ReadStateHandler readStateHandler = (PersisterStateManager.ReadStateHandler) Mockito.spy(new PersisterStateManager.ReadStateHandler(build, "group1", randomUuid, 10, 0, completableFuture, 100L, REQUEST_BACKOFF_MAX_MS, MAX_RPC_RETRY_ATTEMPTS, (Consumer) null));
        build.enqueue(readStateHandler);
        try {
        } catch (Exception e) {
            Assertions.fail("Failed to get result from future", e);
        }
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(1))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(0))).requestBuilder();
        try {
            build.stop();
        } catch (Exception e2) {
            Assertions.fail("Failed to stop state manager", e2);
        }
    }

    @Test
    public void testWriteStateRequestCoordinatorFoundSuccessfully() {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        String str = "group1";
        Uuid randomUuid = Uuid.randomUuid();
        int i = 10;
        List asList = Arrays.asList(new PersisterStateBatch(0L, 9L, (byte) 0, (short) 1), new PersisterStateBatch(10L, 19L, (byte) 1, (short) 1));
        Node node = new Node(0, HOST, PORT);
        Node node2 = new Node(1, HOST, PORT);
        String asCoordinatorKey = SharePartitionKey.asCoordinatorKey("group1", randomUuid, 10);
        mockClient.prepareResponseFrom(abstractRequest -> {
            return (abstractRequest instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String) ((FindCoordinatorRequest) abstractRequest).data().coordinatorKeys().get(0)).equals(asCoordinatorKey);
        }, new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(PORT).setErrorCode(Errors.NONE.code())))), node);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            WriteShareGroupStateRequest writeShareGroupStateRequest = (WriteShareGroupStateRequest) abstractRequest2;
            return writeShareGroupStateRequest.data().groupId().equals(str) && ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((WriteShareGroupStateRequestData.PartitionData) ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), node2);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getDefaultCacheHelper(node)).build();
        build.start();
        CompletableFuture completableFuture = new CompletableFuture();
        Objects.requireNonNull(build);
        PersisterStateManager.WriteStateHandler writeStateHandler = (PersisterStateManager.WriteStateHandler) Mockito.spy(new PersisterStateManager.WriteStateHandler(build, "group1", randomUuid, 10, 0, 0, 0L, asList, completableFuture, 100L, REQUEST_BACKOFF_MAX_MS, MAX_RPC_RETRY_ATTEMPTS));
        build.enqueue(writeStateHandler);
        WriteShareGroupStateResponse writeShareGroupStateResponse = null;
        try {
            writeShareGroupStateResponse = (WriteShareGroupStateResponse) writeStateHandler.result().get();
        } catch (Exception e) {
            Assertions.fail("Failed to get result from future", e);
        }
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult) ((WriteShareGroupStateResponseData.WriteStateResult) writeShareGroupStateResponse.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler) Mockito.verify(writeStateHandler, Mockito.times(1))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler) Mockito.verify(writeStateHandler, Mockito.times(0))).requestBuilder();
        Assertions.assertEquals(node2, writeStateHandler.getCoordinatorNode());
        Assertions.assertEquals(10, partitionResult.partition());
        Assertions.assertEquals(Errors.NONE.code(), partitionResult.errorCode());
        try {
            build.stop();
        } catch (Exception e2) {
            Assertions.fail("Failed to stop state manager", e2);
        }
    }

    @Test
    public void testWriteStateRequestRetryWithNotCoordinatorSuccessfulOnRetry() throws InterruptedException, ExecutionException {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        String str = "group1";
        Uuid randomUuid = Uuid.randomUuid();
        int i = 10;
        List asList = Arrays.asList(new PersisterStateBatch(0L, 9L, (byte) 0, (short) 1), new PersisterStateBatch(10L, 19L, (byte) 1, (short) 1));
        Node node = new Node(0, HOST, PORT);
        Node node2 = new Node(1, HOST, PORT);
        String asCoordinatorKey = SharePartitionKey.asCoordinatorKey("group1", randomUuid, 10);
        mockClient.prepareResponseFrom(abstractRequest -> {
            return (abstractRequest instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String) ((FindCoordinatorRequest) abstractRequest).data().coordinatorKeys().get(0)).equals(asCoordinatorKey);
        }, new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.NOT_COORDINATOR.code())))), node);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            return (abstractRequest2 instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest2).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String) ((FindCoordinatorRequest) abstractRequest2).data().coordinatorKeys().get(0)).equals(asCoordinatorKey);
        }, new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(PORT).setErrorCode(Errors.NONE.code())))), node);
        mockClient.prepareResponseFrom(abstractRequest3 -> {
            WriteShareGroupStateRequest writeShareGroupStateRequest = (WriteShareGroupStateRequest) abstractRequest3;
            return writeShareGroupStateRequest.data().groupId().equals(str) && ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((WriteShareGroupStateRequestData.PartitionData) ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), node2);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getDefaultCacheHelper(node)).build();
        build.start();
        CompletableFuture completableFuture = new CompletableFuture();
        Objects.requireNonNull(build);
        PersisterStateManager.WriteStateHandler writeStateHandler = (PersisterStateManager.WriteStateHandler) Mockito.spy(new PersisterStateManager.WriteStateHandler(build, "group1", randomUuid, 10, 0, 0, 0L, asList, completableFuture, 100L, REQUEST_BACKOFF_MAX_MS, MAX_RPC_RETRY_ATTEMPTS));
        build.enqueue(writeStateHandler);
        CompletableFuture result = writeStateHandler.result();
        Objects.requireNonNull(result);
        TestUtils.waitForCondition(result::isDone, 15000L, 10L, () -> {
            return "Failed to get result from future";
        });
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult) ((WriteShareGroupStateResponseData.WriteStateResult) ((WriteShareGroupStateResponse) result.get()).data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler) Mockito.verify(writeStateHandler, Mockito.times(2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler) Mockito.verify(writeStateHandler, Mockito.times(0))).requestBuilder();
        Assertions.assertEquals(node2, writeStateHandler.getCoordinatorNode());
        Assertions.assertEquals(10, partitionResult.partition());
        Assertions.assertEquals(Errors.NONE.code(), partitionResult.errorCode());
        try {
            build.stop();
        } catch (Exception e) {
            Assertions.fail("Failed to stop state manager", e);
        }
    }

    @Test
    public void testWriteStateRequestCoordinatorFoundOnRetry() {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        String str = "group1";
        Uuid randomUuid = Uuid.randomUuid();
        int i = 10;
        List asList = Arrays.asList(new PersisterStateBatch(0L, 9L, (byte) 0, (short) 1), new PersisterStateBatch(10L, 19L, (byte) 1, (short) 1));
        Node node = new Node(0, HOST, PORT);
        Node node2 = new Node(1, HOST, PORT);
        String asCoordinatorKey = SharePartitionKey.asCoordinatorKey("group1", randomUuid, 10);
        mockClient.prepareResponseFrom(abstractRequest -> {
            return (abstractRequest instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String) ((FindCoordinatorRequest) abstractRequest).data().coordinatorKeys().get(0)).equals(asCoordinatorKey);
        }, new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())))), node);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            return (abstractRequest2 instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest2).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String) ((FindCoordinatorRequest) abstractRequest2).data().coordinatorKeys().get(0)).equals(asCoordinatorKey);
        }, new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(PORT).setErrorCode(Errors.NONE.code())))), node);
        mockClient.prepareResponseFrom(abstractRequest3 -> {
            WriteShareGroupStateRequest writeShareGroupStateRequest = (WriteShareGroupStateRequest) abstractRequest3;
            return writeShareGroupStateRequest.data().groupId().equals(str) && ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((WriteShareGroupStateRequestData.PartitionData) ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), node2);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getDefaultCacheHelper(node)).build();
        build.start();
        CompletableFuture completableFuture = new CompletableFuture();
        Objects.requireNonNull(build);
        PersisterStateManager.WriteStateHandler writeStateHandler = (PersisterStateManager.WriteStateHandler) Mockito.spy(new PersisterStateManager.WriteStateHandler(build, "group1", randomUuid, 10, 0, 0, 0L, asList, completableFuture, 100L, REQUEST_BACKOFF_MAX_MS, MAX_RPC_RETRY_ATTEMPTS));
        build.enqueue(writeStateHandler);
        WriteShareGroupStateResponse writeShareGroupStateResponse = null;
        try {
            writeShareGroupStateResponse = (WriteShareGroupStateResponse) writeStateHandler.result().get();
        } catch (Exception e) {
            Assertions.fail("Failed to get result from future", e);
        }
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult) ((WriteShareGroupStateResponseData.WriteStateResult) writeShareGroupStateResponse.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler) Mockito.verify(writeStateHandler, Mockito.times(2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler) Mockito.verify(writeStateHandler, Mockito.times(0))).requestBuilder();
        Assertions.assertEquals(node2, writeStateHandler.getCoordinatorNode());
        Assertions.assertEquals(10, partitionResult.partition());
        Assertions.assertEquals(Errors.NONE.code(), partitionResult.errorCode());
        try {
            build.stop();
        } catch (Exception e2) {
            Assertions.fail("Failed to stop state manager", e2);
        }
    }

    @Test
    public void testWriteStateRequestWithCoordinatorNodeLookup() {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        String str = "group1";
        Uuid randomUuid = Uuid.randomUuid();
        int i = 10;
        List asList = Arrays.asList(new PersisterStateBatch(0L, 9L, (byte) 0, (short) 1), new PersisterStateBatch(10L, 19L, (byte) 1, (short) 1));
        Node node = new Node(1, HOST, PORT);
        mockClient.prepareResponseFrom(abstractRequest -> {
            WriteShareGroupStateRequest writeShareGroupStateRequest = (WriteShareGroupStateRequest) abstractRequest;
            return writeShareGroupStateRequest.data().groupId().equals(str) && ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((WriteShareGroupStateRequestData.PartitionData) ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), node);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getCoordinatorCacheHelper(node)).build();
        build.start();
        CompletableFuture completableFuture = new CompletableFuture();
        Objects.requireNonNull(build);
        PersisterStateManager.WriteStateHandler writeStateHandler = (PersisterStateManager.WriteStateHandler) Mockito.spy(new PersisterStateManager.WriteStateHandler(build, "group1", randomUuid, 10, 0, 0, 0L, asList, completableFuture, 100L, REQUEST_BACKOFF_MAX_MS, MAX_RPC_RETRY_ATTEMPTS));
        build.enqueue(writeStateHandler);
        WriteShareGroupStateResponse writeShareGroupStateResponse = null;
        try {
            writeShareGroupStateResponse = (WriteShareGroupStateResponse) writeStateHandler.result().get();
        } catch (Exception e) {
            Assertions.fail("Failed to get result from future", e);
        }
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult) ((WriteShareGroupStateResponseData.WriteStateResult) writeShareGroupStateResponse.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler) Mockito.verify(writeStateHandler, Mockito.times(0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler) Mockito.verify(writeStateHandler, Mockito.times(0))).requestBuilder();
        ((PersisterStateManager.WriteStateHandler) Mockito.verify(writeStateHandler, Mockito.times(1))).onComplete((ClientResponse) ArgumentMatchers.any());
        Assertions.assertEquals(node, writeStateHandler.getCoordinatorNode());
        Assertions.assertEquals(10, partitionResult.partition());
        Assertions.assertEquals(Errors.NONE.code(), partitionResult.errorCode());
        try {
            build.stop();
        } catch (Exception e2) {
            Assertions.fail("Failed to stop state manager", e2);
        }
    }

    @Test
    public void testWriteStateRequestWithRetryAndCoordinatorNodeLookup() {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        String str = "group1";
        Uuid randomUuid = Uuid.randomUuid();
        int i = 10;
        List asList = Arrays.asList(new PersisterStateBatch(0L, 9L, (byte) 0, (short) 1), new PersisterStateBatch(10L, 19L, (byte) 1, (short) 1));
        Node node = new Node(1, HOST, PORT);
        mockClient.prepareResponseFrom(abstractRequest -> {
            WriteShareGroupStateRequest writeShareGroupStateRequest = (WriteShareGroupStateRequest) abstractRequest;
            return writeShareGroupStateRequest.data().groupId().equals(str) && ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((WriteShareGroupStateRequestData.PartitionData) ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("")))))), node);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            WriteShareGroupStateRequest writeShareGroupStateRequest = (WriteShareGroupStateRequest) abstractRequest2;
            return writeShareGroupStateRequest.data().groupId().equals(str) && ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((WriteShareGroupStateRequestData.PartitionData) ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), node);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getCoordinatorCacheHelper(node)).build();
        build.start();
        CompletableFuture completableFuture = new CompletableFuture();
        Objects.requireNonNull(build);
        PersisterStateManager.WriteStateHandler writeStateHandler = (PersisterStateManager.WriteStateHandler) Mockito.spy(new PersisterStateManager.WriteStateHandler(build, "group1", randomUuid, 10, 0, 0, 0L, asList, completableFuture, 100L, REQUEST_BACKOFF_MAX_MS, MAX_RPC_RETRY_ATTEMPTS));
        build.enqueue(writeStateHandler);
        WriteShareGroupStateResponse writeShareGroupStateResponse = null;
        try {
            writeShareGroupStateResponse = (WriteShareGroupStateResponse) writeStateHandler.result().get();
        } catch (Exception e) {
            Assertions.fail("Failed to get result from future", e);
        }
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult) ((WriteShareGroupStateResponseData.WriteStateResult) writeShareGroupStateResponse.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler) Mockito.verify(writeStateHandler, Mockito.times(0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler) Mockito.verify(writeStateHandler, Mockito.times(0))).requestBuilder();
        ((PersisterStateManager.WriteStateHandler) Mockito.verify(writeStateHandler, Mockito.times(2))).onComplete((ClientResponse) ArgumentMatchers.any());
        Assertions.assertEquals(node, writeStateHandler.getCoordinatorNode());
        Assertions.assertEquals(10, partitionResult.partition());
        Assertions.assertEquals(Errors.NONE.code(), partitionResult.errorCode());
        try {
            build.stop();
        } catch (Exception e2) {
            Assertions.fail("Failed to stop state manager", e2);
        }
    }

    @Test
    public void testWriteStateRequestFailedMaxRetriesExhausted() {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        String str = "group1";
        Uuid randomUuid = Uuid.randomUuid();
        int i = 10;
        List asList = Arrays.asList(new PersisterStateBatch(0L, 9L, (byte) 0, (short) 1), new PersisterStateBatch(10L, 19L, (byte) 1, (short) 1));
        Node node = new Node(1, HOST, PORT);
        mockClient.prepareResponseFrom(abstractRequest -> {
            WriteShareGroupStateRequest writeShareGroupStateRequest = (WriteShareGroupStateRequest) abstractRequest;
            return writeShareGroupStateRequest.data().groupId().equals(str) && ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((WriteShareGroupStateRequestData.PartitionData) ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("")))))), node);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            WriteShareGroupStateRequest writeShareGroupStateRequest = (WriteShareGroupStateRequest) abstractRequest2;
            return writeShareGroupStateRequest.data().groupId().equals(str) && ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((WriteShareGroupStateRequestData.PartitionData) ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("")))))), node);
        mockClient.prepareResponseFrom(abstractRequest3 -> {
            WriteShareGroupStateRequest writeShareGroupStateRequest = (WriteShareGroupStateRequest) abstractRequest3;
            return writeShareGroupStateRequest.data().groupId().equals(str) && ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((WriteShareGroupStateRequestData.PartitionData) ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), node);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getCoordinatorCacheHelper(node)).build();
        build.start();
        CompletableFuture completableFuture = new CompletableFuture();
        Objects.requireNonNull(build);
        PersisterStateManager.WriteStateHandler writeStateHandler = (PersisterStateManager.WriteStateHandler) Mockito.spy(new PersisterStateManager.WriteStateHandler(build, "group1", randomUuid, 10, 0, 0, 0L, asList, completableFuture, 100L, REQUEST_BACKOFF_MAX_MS, 2));
        build.enqueue(writeStateHandler);
        WriteShareGroupStateResponse writeShareGroupStateResponse = null;
        try {
            writeShareGroupStateResponse = (WriteShareGroupStateResponse) writeStateHandler.result().get();
        } catch (Exception e) {
            Assertions.fail("Failed to get result from future", e);
        }
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult) ((WriteShareGroupStateResponseData.WriteStateResult) writeShareGroupStateResponse.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler) Mockito.verify(writeStateHandler, Mockito.times(0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler) Mockito.verify(writeStateHandler, Mockito.times(0))).requestBuilder();
        ((PersisterStateManager.WriteStateHandler) Mockito.verify(writeStateHandler, Mockito.times(2))).onComplete((ClientResponse) ArgumentMatchers.any());
        Assertions.assertEquals(node, writeStateHandler.getCoordinatorNode());
        Assertions.assertEquals(10, partitionResult.partition());
        Assertions.assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), partitionResult.errorCode());
        try {
            build.stop();
        } catch (Exception e2) {
            Assertions.fail("Failed to stop state manager", e2);
        }
    }

    @Test
    public void testWriteStateRequestBatchingWithCoordinatorNodeLookup() throws ExecutionException, Exception {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        String str = "group1";
        Uuid randomUuid = Uuid.randomUuid();
        int i = 10;
        List asList = Arrays.asList(new PersisterStateBatch(0L, 9L, (byte) 0, (short) 1), new PersisterStateBatch(10L, 19L, (byte) 1, (short) 1));
        Node node = new Node(1, HOST, PORT);
        mockClient.prepareResponseFrom(abstractRequest -> {
            WriteShareGroupStateRequest writeShareGroupStateRequest = (WriteShareGroupStateRequest) abstractRequest;
            return writeShareGroupStateRequest.data().groupId().equals(str) && ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((WriteShareGroupStateRequestData.PartitionData) ((WriteShareGroupStateRequestData.WriteStateData) writeShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), node);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getCoordinatorCacheHelper(node)).build();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        build.setGenerateCallback(() -> {
            Map map = (Map) build.nodeRPCMap().get(node);
            if (map == null || !map.containsKey(PersisterStateManager.RPCType.WRITE) || !((Map) map.get(PersisterStateManager.RPCType.WRITE)).containsKey(str) || ((List) ((Map) map.get(PersisterStateManager.RPCType.WRITE)).get(str)).size() <= 2) {
                return;
            }
            atomicBoolean.set(true);
        });
        build.start();
        CompletableFuture completableFuture = new CompletableFuture();
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < MAX_RPC_RETRY_ATTEMPTS; i2++) {
            Objects.requireNonNull(build);
            PersisterStateManager.WriteStateHandler writeStateHandler = (PersisterStateManager.WriteStateHandler) Mockito.spy(new PersisterStateManager.WriteStateHandler(build, "group1", randomUuid, 10, 0, 0, 0L, asList, completableFuture, 100L, REQUEST_BACKOFF_MAX_MS, MAX_RPC_RETRY_ATTEMPTS));
            arrayList.add(writeStateHandler);
            build.enqueue(writeStateHandler);
        }
        CompletableFuture.allOf((CompletableFuture[]) arrayList.stream().map((v0) -> {
            return v0.result();
        }).toArray(i3 -> {
            return new CompletableFuture[i3];
        })).get();
        Objects.requireNonNull(atomicBoolean);
        TestUtils.waitForCondition(atomicBoolean::get, 15000L, 10L, () -> {
            return "unable to verify batching";
        });
    }

    @Test
    public void testReadStateRequestCoordinatorFoundSuccessfully() {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        String str = "group1";
        Uuid randomUuid = Uuid.randomUuid();
        int i = 10;
        Node node = new Node(0, HOST, PORT);
        Node node2 = new Node(1, HOST, PORT);
        String asCoordinatorKey = SharePartitionKey.asCoordinatorKey("group1", randomUuid, 10);
        mockClient.prepareResponseFrom(abstractRequest -> {
            return (abstractRequest instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String) ((FindCoordinatorRequest) abstractRequest).data().coordinatorKeys().get(0)).equals(asCoordinatorKey);
        }, new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(PORT).setErrorCode(Errors.NONE.code())))), node);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            ReadShareGroupStateRequest readShareGroupStateRequest = (ReadShareGroupStateRequest) abstractRequest2;
            return readShareGroupStateRequest.data().groupId().equals(str) && ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((ReadShareGroupStateRequestData.PartitionData) ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), node2);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getDefaultCacheHelper(node)).build();
        build.start();
        CompletableFuture completableFuture = new CompletableFuture();
        Objects.requireNonNull(build);
        PersisterStateManager.ReadStateHandler readStateHandler = (PersisterStateManager.ReadStateHandler) Mockito.spy(new PersisterStateManager.ReadStateHandler(build, "group1", randomUuid, 10, 0, completableFuture, 100L, REQUEST_BACKOFF_MAX_MS, MAX_RPC_RETRY_ATTEMPTS, (Consumer) null));
        build.enqueue(readStateHandler);
        ReadShareGroupStateResponse readShareGroupStateResponse = null;
        try {
            readShareGroupStateResponse = (ReadShareGroupStateResponse) readStateHandler.result().get();
        } catch (Exception e) {
            Assertions.fail("Failed to get result from future", e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult) ((ReadShareGroupStateResponseData.ReadStateResult) readShareGroupStateResponse.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(1))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(0))).requestBuilder();
        Assertions.assertEquals(node2, readStateHandler.getCoordinatorNode());
        Assertions.assertEquals(10, partitionResult.partition());
        Assertions.assertEquals(Errors.NONE.code(), partitionResult.errorCode());
        Assertions.assertEquals(1, partitionResult.stateEpoch());
        Assertions.assertEquals(0L, partitionResult.startOffset());
        try {
            build.stop();
        } catch (Exception e2) {
            Assertions.fail("Failed to stop state manager", e2);
        }
    }

    @Test
    public void testReadStateRequestIllegalStateCoordinatorFoundSuccessfully() {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        String str = "group1";
        Uuid randomUuid = Uuid.randomUuid();
        int i = 10;
        Node node = new Node(0, HOST, PORT);
        Node node2 = new Node(1, HOST, PORT);
        String asCoordinatorKey = SharePartitionKey.asCoordinatorKey("group1", randomUuid, 10);
        mockClient.prepareResponseFrom(abstractRequest -> {
            return (abstractRequest instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String) ((FindCoordinatorRequest) abstractRequest).data().coordinatorKeys().get(0)).equals(asCoordinatorKey);
        }, new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(PORT).setErrorCode(Errors.NONE.code())))), node);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            ReadShareGroupStateRequest readShareGroupStateRequest = (ReadShareGroupStateRequest) abstractRequest2;
            return readShareGroupStateRequest.data().groupId().equals(str) && ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((ReadShareGroupStateRequestData.PartitionData) ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(Uuid.randomUuid()).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(500).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), node2);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getDefaultCacheHelper(node)).build();
        build.start();
        CompletableFuture completableFuture = new CompletableFuture();
        Objects.requireNonNull(build);
        PersisterStateManager.ReadStateHandler readStateHandler = (PersisterStateManager.ReadStateHandler) Mockito.spy(new PersisterStateManager.ReadStateHandler(build, "group1", randomUuid, 10, 0, completableFuture, 100L, REQUEST_BACKOFF_MAX_MS, MAX_RPC_RETRY_ATTEMPTS, (Consumer) null));
        build.enqueue(readStateHandler);
        ReadShareGroupStateResponse readShareGroupStateResponse = null;
        try {
            readShareGroupStateResponse = (ReadShareGroupStateResponse) readStateHandler.result().get();
        } catch (Exception e) {
            Assertions.fail("Failed to get result from future", e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult) ((ReadShareGroupStateResponseData.ReadStateResult) readShareGroupStateResponse.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(1))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(0))).requestBuilder();
        Assertions.assertEquals(node2, readStateHandler.getCoordinatorNode());
        Assertions.assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(), partitionResult.errorCode());
        try {
            build.stop();
        } catch (Exception e2) {
            Assertions.fail("Failed to stop state manager", e2);
        }
    }

    @Test
    public void testReadStateRequestRetryWithNotCoordinatorSuccessfulOnRetry() throws ExecutionException, InterruptedException {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        String str = "group1";
        Uuid randomUuid = Uuid.randomUuid();
        int i = 10;
        Node node = new Node(0, HOST, PORT);
        Node node2 = new Node(1, HOST, PORT);
        String asCoordinatorKey = SharePartitionKey.asCoordinatorKey("group1", randomUuid, 10);
        mockClient.prepareResponseFrom(abstractRequest -> {
            return (abstractRequest instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String) ((FindCoordinatorRequest) abstractRequest).data().coordinatorKeys().get(0)).equals(asCoordinatorKey);
        }, new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.NOT_COORDINATOR.code())))), node);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            return (abstractRequest2 instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest2).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String) ((FindCoordinatorRequest) abstractRequest2).data().coordinatorKeys().get(0)).equals(asCoordinatorKey);
        }, new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(PORT).setErrorCode(Errors.NONE.code())))), node);
        mockClient.prepareResponseFrom(abstractRequest3 -> {
            ReadShareGroupStateRequest readShareGroupStateRequest = (ReadShareGroupStateRequest) abstractRequest3;
            return readShareGroupStateRequest.data().groupId().equals(str) && ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((ReadShareGroupStateRequestData.PartitionData) ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), node2);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getDefaultCacheHelper(node)).build();
        build.start();
        CompletableFuture completableFuture = new CompletableFuture();
        Objects.requireNonNull(build);
        PersisterStateManager.ReadStateHandler readStateHandler = (PersisterStateManager.ReadStateHandler) Mockito.spy(new PersisterStateManager.ReadStateHandler(build, "group1", randomUuid, 10, 0, completableFuture, 100L, REQUEST_BACKOFF_MAX_MS, MAX_RPC_RETRY_ATTEMPTS, (Consumer) null));
        build.enqueue(readStateHandler);
        CompletableFuture result = readStateHandler.result();
        Objects.requireNonNull(result);
        TestUtils.waitForCondition(result::isDone, 15000L, 10L, () -> {
            return "Failed to get result from future";
        });
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult) ((ReadShareGroupStateResponseData.ReadStateResult) ((ReadShareGroupStateResponse) result.get()).data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(0))).requestBuilder();
        Assertions.assertEquals(node2, readStateHandler.getCoordinatorNode());
        Assertions.assertEquals(10, partitionResult.partition());
        Assertions.assertEquals(Errors.NONE.code(), partitionResult.errorCode());
        try {
            build.stop();
        } catch (Exception e) {
            Assertions.fail("Failed to stop state manager", e);
        }
    }

    @Test
    public void testReadStateRequestCoordinatorFoundOnRetry() {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        String str = "group1";
        Uuid randomUuid = Uuid.randomUuid();
        int i = 10;
        Node node = new Node(0, HOST, PORT);
        Node node2 = new Node(1, HOST, PORT);
        String asCoordinatorKey = SharePartitionKey.asCoordinatorKey("group1", randomUuid, 10);
        mockClient.prepareResponseFrom(abstractRequest -> {
            return (abstractRequest instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String) ((FindCoordinatorRequest) abstractRequest).data().coordinatorKeys().get(0)).equals(asCoordinatorKey);
        }, new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.NOT_COORDINATOR.code())))), node);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            return (abstractRequest2 instanceof FindCoordinatorRequest) && ((FindCoordinatorRequest) abstractRequest2).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String) ((FindCoordinatorRequest) abstractRequest2).data().coordinatorKeys().get(0)).equals(asCoordinatorKey);
        }, new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(PORT).setErrorCode(Errors.NONE.code())))), node);
        mockClient.prepareResponseFrom(abstractRequest3 -> {
            ReadShareGroupStateRequest readShareGroupStateRequest = (ReadShareGroupStateRequest) abstractRequest3;
            return readShareGroupStateRequest.data().groupId().equals(str) && ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((ReadShareGroupStateRequestData.PartitionData) ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), node2);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getDefaultCacheHelper(node)).build();
        build.start();
        CompletableFuture completableFuture = new CompletableFuture();
        Objects.requireNonNull(build);
        PersisterStateManager.ReadStateHandler readStateHandler = (PersisterStateManager.ReadStateHandler) Mockito.spy(new PersisterStateManager.ReadStateHandler(build, "group1", randomUuid, 10, 0, completableFuture, 100L, REQUEST_BACKOFF_MAX_MS, MAX_RPC_RETRY_ATTEMPTS, (Consumer) null));
        build.enqueue(readStateHandler);
        ReadShareGroupStateResponse readShareGroupStateResponse = null;
        try {
            readShareGroupStateResponse = (ReadShareGroupStateResponse) readStateHandler.result().get();
        } catch (Exception e) {
            Assertions.fail("Failed to get result from future", e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult) ((ReadShareGroupStateResponseData.ReadStateResult) readShareGroupStateResponse.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(0))).requestBuilder();
        Assertions.assertEquals(node2, readStateHandler.getCoordinatorNode());
        Assertions.assertEquals(10, partitionResult.partition());
        Assertions.assertEquals(Errors.NONE.code(), partitionResult.errorCode());
        Assertions.assertEquals(1, partitionResult.stateEpoch());
        Assertions.assertEquals(0L, partitionResult.startOffset());
        try {
            build.stop();
        } catch (Exception e2) {
            Assertions.fail("Failed to stop state manager", e2);
        }
    }

    @Test
    public void testReadStateRequestWithCoordinatorNodeLookup() {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        String str = "group1";
        Uuid randomUuid = Uuid.randomUuid();
        int i = 10;
        Node node = new Node(1, HOST, PORT);
        mockClient.prepareResponseFrom(abstractRequest -> {
            ReadShareGroupStateRequest readShareGroupStateRequest = (ReadShareGroupStateRequest) abstractRequest;
            return readShareGroupStateRequest.data().groupId().equals(str) && ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((ReadShareGroupStateRequestData.PartitionData) ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), node);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getCoordinatorCacheHelper(node)).build();
        build.start();
        CompletableFuture completableFuture = new CompletableFuture();
        Objects.requireNonNull(build);
        PersisterStateManager.ReadStateHandler readStateHandler = (PersisterStateManager.ReadStateHandler) Mockito.spy(new PersisterStateManager.ReadStateHandler(build, "group1", randomUuid, 10, 0, completableFuture, 100L, REQUEST_BACKOFF_MAX_MS, MAX_RPC_RETRY_ATTEMPTS, (Consumer) null));
        build.enqueue(readStateHandler);
        ReadShareGroupStateResponse readShareGroupStateResponse = null;
        try {
            readShareGroupStateResponse = (ReadShareGroupStateResponse) readStateHandler.result().get();
        } catch (Exception e) {
            Assertions.fail("Failed to get result from future", e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult) ((ReadShareGroupStateResponseData.ReadStateResult) readShareGroupStateResponse.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(0))).requestBuilder();
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(1))).onComplete((ClientResponse) ArgumentMatchers.any());
        Assertions.assertEquals(node, readStateHandler.getCoordinatorNode());
        Assertions.assertEquals(10, partitionResult.partition());
        Assertions.assertEquals(Errors.NONE.code(), partitionResult.errorCode());
        Assertions.assertEquals(1, partitionResult.stateEpoch());
        Assertions.assertEquals(0L, partitionResult.startOffset());
        try {
            build.stop();
        } catch (Exception e2) {
            Assertions.fail("Failed to stop state manager", e2);
        }
    }

    @Test
    public void testReadStateRequestRetryWithCoordinatorNodeLookup() {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        String str = "group1";
        Uuid randomUuid = Uuid.randomUuid();
        int i = 10;
        Node node = new Node(1, HOST, PORT);
        mockClient.prepareResponseFrom(abstractRequest -> {
            ReadShareGroupStateRequest readShareGroupStateRequest = (ReadShareGroupStateRequest) abstractRequest;
            return readShareGroupStateRequest.data().groupId().equals(str) && ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((ReadShareGroupStateRequestData.PartitionData) ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), node);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            ReadShareGroupStateRequest readShareGroupStateRequest = (ReadShareGroupStateRequest) abstractRequest2;
            return readShareGroupStateRequest.data().groupId().equals(str) && ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((ReadShareGroupStateRequestData.PartitionData) ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), node);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getCoordinatorCacheHelper(node)).build();
        build.start();
        CompletableFuture completableFuture = new CompletableFuture();
        Objects.requireNonNull(build);
        PersisterStateManager.ReadStateHandler readStateHandler = (PersisterStateManager.ReadStateHandler) Mockito.spy(new PersisterStateManager.ReadStateHandler(build, "group1", randomUuid, 10, 0, completableFuture, 100L, REQUEST_BACKOFF_MAX_MS, MAX_RPC_RETRY_ATTEMPTS, (Consumer) null));
        build.enqueue(readStateHandler);
        ReadShareGroupStateResponse readShareGroupStateResponse = null;
        try {
            readShareGroupStateResponse = (ReadShareGroupStateResponse) readStateHandler.result().get();
        } catch (Exception e) {
            Assertions.fail("Failed to get result from future", e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult) ((ReadShareGroupStateResponseData.ReadStateResult) readShareGroupStateResponse.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(0))).requestBuilder();
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(2))).onComplete((ClientResponse) ArgumentMatchers.any());
        Assertions.assertEquals(node, readStateHandler.getCoordinatorNode());
        Assertions.assertEquals(10, partitionResult.partition());
        Assertions.assertEquals(Errors.NONE.code(), partitionResult.errorCode());
        Assertions.assertEquals(1, partitionResult.stateEpoch());
        Assertions.assertEquals(0L, partitionResult.startOffset());
        try {
            build.stop();
        } catch (Exception e2) {
            Assertions.fail("Failed to stop state manager", e2);
        }
    }

    @Test
    public void testReadStateRequestFailureMaxRetriesExhausted() {
        KafkaClient mockClient = new MockClient(MOCK_TIME);
        String str = "group1";
        Uuid randomUuid = Uuid.randomUuid();
        int i = 10;
        Node node = new Node(1, HOST, PORT);
        mockClient.prepareResponseFrom(abstractRequest -> {
            ReadShareGroupStateRequest readShareGroupStateRequest = (ReadShareGroupStateRequest) abstractRequest;
            return readShareGroupStateRequest.data().groupId().equals(str) && ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((ReadShareGroupStateRequestData.PartitionData) ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), node);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            ReadShareGroupStateRequest readShareGroupStateRequest = (ReadShareGroupStateRequest) abstractRequest2;
            return readShareGroupStateRequest.data().groupId().equals(str) && ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).topicId() == randomUuid && ((ReadShareGroupStateRequestData.PartitionData) ((ReadShareGroupStateRequestData.ReadStateData) readShareGroupStateRequest.data().topics().get(0)).partitions().get(0)).partition() == i;
        }, new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(randomUuid).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(10).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), node);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withKafkaClient(mockClient).withTimer(mockTimer).withCacheHelper(getCoordinatorCacheHelper(node)).build();
        build.start();
        CompletableFuture completableFuture = new CompletableFuture();
        Objects.requireNonNull(build);
        PersisterStateManager.ReadStateHandler readStateHandler = (PersisterStateManager.ReadStateHandler) Mockito.spy(new PersisterStateManager.ReadStateHandler(build, "group1", randomUuid, 10, 0, completableFuture, 100L, REQUEST_BACKOFF_MAX_MS, 2, (Consumer) null));
        build.enqueue(readStateHandler);
        ReadShareGroupStateResponse readShareGroupStateResponse = null;
        try {
            readShareGroupStateResponse = (ReadShareGroupStateResponse) readStateHandler.result().get();
        } catch (Exception e) {
            Assertions.fail("Failed to get result from future", e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult) ((ReadShareGroupStateResponseData.ReadStateResult) readShareGroupStateResponse.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(0))).requestBuilder();
        ((PersisterStateManager.ReadStateHandler) Mockito.verify(readStateHandler, Mockito.times(2))).onComplete((ClientResponse) ArgumentMatchers.any());
        Assertions.assertEquals(node, readStateHandler.getCoordinatorNode());
        Assertions.assertEquals(10, partitionResult.partition());
        Assertions.assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), partitionResult.errorCode());
        try {
            build.stop();
        } catch (Exception e2) {
            Assertions.fail("Failed to stop state manager", e2);
        }
    }

    @Test
    public void testPersisterStateManagerClose() {
        KafkaClient kafkaClient = (KafkaClient) Mockito.mock(KafkaClient.class);
        Timer timer = (Timer) Mockito.mock(Timer.class);
        PersisterStateManager build = PersisterStateManagerBuilder.builder().withTimer(timer).withKafkaClient(kafkaClient).build();
        try {
            ((KafkaClient) Mockito.verify(kafkaClient, Mockito.times(0))).close();
            ((Timer) Mockito.verify(timer, Mockito.times(0))).close();
            build.start();
            build.stop();
            ((KafkaClient) Mockito.verify(kafkaClient, Mockito.times(1))).close();
            ((Timer) Mockito.verify(timer, Mockito.times(1))).close();
        } catch (Exception e) {
            Assertions.fail("unexpected exception", e);
        }
    }
}
