/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.share.persister;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.DeleteShareGroupStateRequest;
import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitializeShareGroupStateRequest;
import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest;
import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.persister.PersisterStateBatch;
import org.apache.kafka.server.share.persister.PersisterStateManager;
import org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

class PersisterStateManagerTest {
    private static final KafkaClient CLIENT = (KafkaClient)Mockito.mock(KafkaClient.class);
    private static final MockTime MOCK_TIME = new MockTime();
    private static final Timer MOCK_TIMER = new MockTimer(MOCK_TIME);
    private static final ShareCoordinatorMetadataCacheHelper CACHE_HELPER = (ShareCoordinatorMetadataCacheHelper)Mockito.mock(ShareCoordinatorMetadataCacheHelper.class);
    private static final int MAX_RPC_RETRY_ATTEMPTS = 5;
    public static final long REQUEST_BACKOFF_MS = 100L;
    public static final long REQUEST_BACKOFF_MAX_MS = 3000L;
    private static final String HOST = "localhost";
    private static final int PORT = 9092;
    private static Timer mockTimer;

    PersisterStateManagerTest() {
    }

    private ShareCoordinatorMetadataCacheHelper getDefaultCacheHelper(final Node suppliedNode) {
        return new ShareCoordinatorMetadataCacheHelper(){

            public boolean containsTopic(String topic) {
                return false;
            }

            public Node getShareCoordinator(SharePartitionKey key, String internalTopicName) {
                return Node.noNode();
            }

            public List<Node> getClusterNodes() {
                return List.of(suppliedNode);
            }
        };
    }

    private ShareCoordinatorMetadataCacheHelper getCoordinatorCacheHelper(final Node coordinatorNode) {
        return new ShareCoordinatorMetadataCacheHelper(){

            public boolean containsTopic(String topic) {
                return true;
            }

            public Node getShareCoordinator(SharePartitionKey key, String internalTopicName) {
                return coordinatorNode;
            }

            public List<Node> getClusterNodes() {
                return List.of();
            }
        };
    }

    @BeforeEach
    public void setUp() {
        mockTimer = new SystemTimerReaper("persisterStateManagerTestTimer", (Timer)new SystemTimer("persisterStateManagerTestTimer"));
    }

    @AfterEach
    public void tearDown() throws Exception {
        Utils.closeQuietly((AutoCloseable)mockTimer, (String)"persisterStateManagerTestTimer");
    }

    @Test
    public void testFindCoordinatorFatalError() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setKey(coordinatorKey).setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()).setHost(Node.noNode().host()).setNodeId(Node.noNode().id()).setPort(Node.noNode().port())))), suppliedNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        TestStateHandler handler = (TestStateHandler)((Object)Mockito.spy((Object)((Object)new TestStateHandler(stateManager, groupId, topicId, partition, future, 100L, 3000L, 5){

            protected AbstractRequest.Builder<? extends AbstractRequest> requestBuilder() {
                return null;
            }
        })));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        TestStateHandler.TestHandlerResponse result = null;
        try {
            result = handler.result().get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        Assertions.assertEquals((short)Errors.UNKNOWN_SERVER_ERROR.code(), (short)((WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0)).errorCode());
        ((TestStateHandler)((Object)Mockito.verify((Object)((Object)handler), (VerificationMode)Mockito.times((int)1)))).findShareCoordinatorBuilder();
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testFindCoordinatorNullResponse() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), null, suppliedNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        TestStateHandler handler = (TestStateHandler)((Object)Mockito.spy((Object)((Object)new TestStateHandler(stateManager, groupId, topicId, partition, future, 100L, 3000L, 5){

            protected AbstractRequest.Builder<? extends AbstractRequest> requestBuilder() {
                return null;
            }
        })));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        TestStateHandler.TestHandlerResponse result = null;
        try {
            result = handler.result().get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        Assertions.assertEquals((short)Errors.UNKNOWN_SERVER_ERROR.code(), (short)((WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0)).errorCode());
        ((TestStateHandler)((Object)Mockito.verify((Object)((Object)handler), (VerificationMode)Mockito.times((int)1)))).findShareCoordinatorBuilder();
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testFindCoordinatorDisconnect() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        client.setUnreachable(suppliedNode, CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS + 1L);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        TestStateHandler handler = (TestStateHandler)((Object)Mockito.spy((Object)((Object)new TestStateHandler(stateManager, groupId, topicId, partition, future, 100L, 3000L, 5){

            protected AbstractRequest.Builder<? extends AbstractRequest> requestBuilder() {
                return null;
            }
        })));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        TestStateHandler.TestHandlerResponse result = null;
        try {
            result = handler.result().get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        Assertions.assertEquals((short)Errors.NETWORK_EXCEPTION.code(), (short)((WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0)).errorCode());
        ((TestStateHandler)((Object)Mockito.verify((Object)((Object)handler), (VerificationMode)Mockito.times((int)1)))).findShareCoordinatorBuilder();
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testFindCoordinatorAttemptsExhausted() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setKey(coordinatorKey).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()).setHost(Node.noNode().host()).setNodeId(Node.noNode().id()).setPort(Node.noNode().port())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setKey(coordinatorKey).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()).setHost(Node.noNode().host()).setNodeId(Node.noNode().id()).setPort(Node.noNode().port())))), suppliedNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        int maxAttempts = 2;
        TestStateHandler handler = (TestStateHandler)((Object)Mockito.spy((Object)((Object)new TestStateHandler(stateManager, groupId, topicId, partition, future, 100L, 3000L, maxAttempts){

            protected AbstractRequest.Builder<? extends AbstractRequest> requestBuilder() {
                return null;
            }
        })));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        TestStateHandler.TestHandlerResponse result = null;
        try {
            result = handler.result.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        Assertions.assertEquals((short)Errors.COORDINATOR_NOT_AVAILABLE.code(), (short)((WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0)).errorCode());
        ((TestStateHandler)((Object)Mockito.verify((Object)((Object)handler), (VerificationMode)Mockito.times((int)2)))).findShareCoordinatorBuilder();
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testFindCoordinatorSuccess() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(List.of(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(List.of())))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateHandler handler = (PersisterStateManager.ReadStateHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        try {
            resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testWriteStateRequestCoordinatorFoundSuccessfully() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        List<PersisterStateBatch> stateBatches = List.of(new PersisterStateBatch(0L, 9L, 0, 1), new PersisterStateBatch(10L, 19L, 1, 1));
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(List.of(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.WriteStateHandler handler = (PersisterStateManager.WriteStateHandler)Mockito.spy((Object)new PersisterStateManager.WriteStateHandler(persisterStateManager, groupId, topicId, partition, 0, 0, 0L, stateBatches, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        WriteShareGroupStateResponse result = null;
        try {
            result = (WriteShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testWriteStateRequestRetryWithNotCoordinatorSuccessfulOnRetry() throws InterruptedException, ExecutionException {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        List<PersisterStateBatch> stateBatches = List.of(new PersisterStateBatch(0L, 9L, 0, 1), new PersisterStateBatch(10L, 19L, 1, 1));
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.NOT_COORDINATOR.code())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(List.of(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.WriteStateHandler handler = (PersisterStateManager.WriteStateHandler)Mockito.spy((Object)new PersisterStateManager.WriteStateHandler(persisterStateManager, groupId, topicId, partition, 0, 0, 0L, stateBatches, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        TestUtils.waitForCondition(resultFuture::isDone, (long)15000L, (long)10L, () -> "Failed to get result from future");
        WriteShareGroupStateResponse result = (WriteShareGroupStateResponse)resultFuture.get();
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testWriteStateRequestCoordinatorFoundOnRetry() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        List<PersisterStateBatch> stateBatches = List.of(new PersisterStateBatch(0L, 9L, 0, 1), new PersisterStateBatch(10L, 19L, 1, 1));
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(List.of(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.WriteStateHandler handler = (PersisterStateManager.WriteStateHandler)Mockito.spy((Object)new PersisterStateManager.WriteStateHandler(persisterStateManager, groupId, topicId, partition, 0, 0, 0L, stateBatches, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        WriteShareGroupStateResponse result = null;
        try {
            result = (WriteShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testWriteStateRequestWithCoordinatorNodeLookup() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        List<PersisterStateBatch> stateBatches = List.of(new PersisterStateBatch(0L, 9L, 0, 1), new PersisterStateBatch(10L, 19L, 1, 1));
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(List.of(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.WriteStateHandler handler = (PersisterStateManager.WriteStateHandler)Mockito.spy((Object)new PersisterStateManager.WriteStateHandler(persisterStateManager, groupId, topicId, partition, 0, 0, 0L, stateBatches, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        WriteShareGroupStateResponse result = null;
        try {
            result = (WriteShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testWriteStateRequestWithRetryAndCoordinatorNodeLookup() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        List<PersisterStateBatch> stateBatches = List.of(new PersisterStateBatch(0L, 9L, 0, 1), new PersisterStateBatch(10L, 19L, 1, 1));
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(List.of(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("")))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(List.of(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.WriteStateHandler handler = (PersisterStateManager.WriteStateHandler)Mockito.spy((Object)new PersisterStateManager.WriteStateHandler(persisterStateManager, groupId, topicId, partition, 0, 0, 0L, stateBatches, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        WriteShareGroupStateResponse result = null;
        try {
            result = (WriteShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testWriteStateRequestFailedMaxRetriesExhausted() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        List<PersisterStateBatch> stateBatches = List.of(new PersisterStateBatch(0L, 9L, 0, 1), new PersisterStateBatch(10L, 19L, 1, 1));
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(List.of(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("")))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(List.of(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("")))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(List.of(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.WriteStateHandler handler = (PersisterStateManager.WriteStateHandler)Mockito.spy((Object)new PersisterStateManager.WriteStateHandler(persisterStateManager, groupId, topicId, partition, 0, 0, 0L, stateBatches, future, 100L, 3000L, 2));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        WriteShareGroupStateResponse result = null;
        try {
            result = (WriteShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testWriteStateRequestBatchingWithCoordinatorNodeLookup() throws ExecutionException, Exception {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        List<PersisterStateBatch> stateBatches = List.of(new PersisterStateBatch(0L, 9L, 0, 1), new PersisterStateBatch(10L, 19L, 1, 1));
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(List.of(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        AtomicBoolean isBatchingSuccess = new AtomicBoolean(false);
        stateManager.setGenerateCallback(() -> {
            Map handlersPerType = (Map)stateManager.nodeRPCMap().get(coordinatorNode);
            if (handlersPerType != null && handlersPerType.containsKey(PersisterStateManager.RPCType.WRITE) && ((Map)handlersPerType.get(PersisterStateManager.RPCType.WRITE)).containsKey(groupId) && ((List)((Map)handlersPerType.get(PersisterStateManager.RPCType.WRITE)).get(groupId)).size() > 2) {
                isBatchingSuccess.set(true);
            }
        });
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        ArrayList<PersisterStateManager.WriteStateHandler> handlers = new ArrayList<PersisterStateManager.WriteStateHandler>();
        for (int i = 0; i < 5; ++i) {
            PersisterStateManager persisterStateManager = stateManager;
            Objects.requireNonNull(persisterStateManager);
            PersisterStateManager.WriteStateHandler handler = (PersisterStateManager.WriteStateHandler)Mockito.spy((Object)new PersisterStateManager.WriteStateHandler(persisterStateManager, groupId, topicId, partition, 0, 0, 0L, stateBatches, future, 100L, 3000L, 5));
            handlers.add(handler);
            stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        }
        CompletableFuture.allOf((CompletableFuture[])handlers.stream().map(PersisterStateManager.WriteStateHandler::result).toArray(CompletableFuture[]::new)).get();
        TestUtils.waitForCondition(isBatchingSuccess::get, (long)15000L, (long)10L, () -> "unable to verify batching");
    }

    @Test
    public void testReadStateRequestCoordinatorFoundSuccessfully() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(List.of(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(List.of())))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateHandler handler = (PersisterStateManager.ReadStateHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateResponse result = null;
        try {
            result = (ReadShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        Assertions.assertEquals((int)1, (int)partitionResult.stateEpoch());
        Assertions.assertEquals((long)0L, (long)partitionResult.startOffset());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateRequestIllegalStateCoordinatorFoundSuccessfully() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(List.of(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(Uuid.randomUuid()).setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult().setPartition(500).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(List.of())))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateHandler handler = (PersisterStateManager.ReadStateHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateResponse result = null;
        try {
            result = (ReadShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((short)Errors.UNKNOWN_SERVER_ERROR.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateRequestRetryWithNotCoordinatorSuccessfulOnRetry() throws ExecutionException, InterruptedException {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.NOT_COORDINATOR.code())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(List.of(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(List.of())))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateHandler handler = (PersisterStateManager.ReadStateHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        TestUtils.waitForCondition(resultFuture::isDone, (long)15000L, (long)10L, () -> "Failed to get result from future");
        ReadShareGroupStateResponse result = (ReadShareGroupStateResponse)resultFuture.get();
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateRequestCoordinatorFoundOnRetry() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.NOT_COORDINATOR.code())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(List.of(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(List.of())))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateHandler handler = (PersisterStateManager.ReadStateHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateResponse result = null;
        try {
            result = (ReadShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        Assertions.assertEquals((int)1, (int)partitionResult.stateEpoch());
        Assertions.assertEquals((long)0L, (long)partitionResult.startOffset());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateRequestWithCoordinatorNodeLookup() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(List.of(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(List.of())))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateHandler handler = (PersisterStateManager.ReadStateHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateResponse result = null;
        try {
            result = (ReadShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        Assertions.assertEquals((int)1, (int)partitionResult.stateEpoch());
        Assertions.assertEquals((long)0L, (long)partitionResult.startOffset());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateRequestRetryWithCoordinatorNodeLookup() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(List.of(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(List.of())))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(List.of(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(List.of())))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateHandler handler = (PersisterStateManager.ReadStateHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateResponse result = null;
        try {
            result = (ReadShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        Assertions.assertEquals((int)1, (int)partitionResult.stateEpoch());
        Assertions.assertEquals((long)0L, (long)partitionResult.startOffset());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateRequestFailureMaxRetriesExhausted() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(List.of(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(List.of())))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(List.of(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(List.of())))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateHandler handler = (PersisterStateManager.ReadStateHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 2, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateResponse result = null;
        try {
            result = (ReadShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateSummaryRequestCoordinatorFoundSuccessfully() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateSummaryRequestData.PartitionData)((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData().setResults(List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L)))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateSummaryHandler handler = (PersisterStateManager.ReadStateSummaryHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateSummaryHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateSummaryResponse result = null;
        try {
            result = (ReadShareGroupStateSummaryResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateSummaryResponseData.PartitionResult partitionResult = (ReadShareGroupStateSummaryResponseData.PartitionResult)((ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        Assertions.assertEquals((int)1, (int)partitionResult.stateEpoch());
        Assertions.assertEquals((long)0L, (long)partitionResult.startOffset());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateSummaryRequestIllegalStateCoordinatorFoundSuccessfully() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateSummaryRequestData.PartitionData)((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData().setResults(List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult().setTopicId(Uuid.randomUuid()).setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult().setPartition(500).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L)))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateSummaryHandler handler = (PersisterStateManager.ReadStateSummaryHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateSummaryHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateSummaryResponse result = null;
        try {
            result = (ReadShareGroupStateSummaryResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateSummaryResponseData.PartitionResult partitionResult = (ReadShareGroupStateSummaryResponseData.PartitionResult)((ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((short)Errors.UNKNOWN_SERVER_ERROR.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateSummaryRequestRetryWithNotCoordinatorSuccessfulOnRetry() throws ExecutionException, InterruptedException {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.NOT_COORDINATOR.code())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateSummaryRequestData.PartitionData)((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData().setResults(List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L)))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateSummaryHandler handler = (PersisterStateManager.ReadStateSummaryHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateSummaryHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        TestUtils.waitForCondition(resultFuture::isDone, (long)15000L, (long)10L, () -> "Failed to get result from future");
        ReadShareGroupStateSummaryResponse result = (ReadShareGroupStateSummaryResponse)resultFuture.get();
        ReadShareGroupStateSummaryResponseData.PartitionResult partitionResult = (ReadShareGroupStateSummaryResponseData.PartitionResult)((ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateSummaryRequestCoordinatorFoundOnRetry() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.NOT_COORDINATOR.code())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateSummaryRequestData.PartitionData)((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData().setResults(List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L)))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateSummaryHandler handler = (PersisterStateManager.ReadStateSummaryHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateSummaryHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateSummaryResponse result = null;
        try {
            result = (ReadShareGroupStateSummaryResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateSummaryResponseData.PartitionResult partitionResult = (ReadShareGroupStateSummaryResponseData.PartitionResult)((ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        Assertions.assertEquals((int)1, (int)partitionResult.stateEpoch());
        Assertions.assertEquals((long)0L, (long)partitionResult.startOffset());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateSummaryRequestWithCoordinatorNodeLookup() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateSummaryRequestData.PartitionData)((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData().setResults(List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L)))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateSummaryHandler handler = (PersisterStateManager.ReadStateSummaryHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateSummaryHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateSummaryResponse result = null;
        try {
            result = (ReadShareGroupStateSummaryResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateSummaryResponseData.PartitionResult partitionResult = (ReadShareGroupStateSummaryResponseData.PartitionResult)((ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        Assertions.assertEquals((int)1, (int)partitionResult.stateEpoch());
        Assertions.assertEquals((long)0L, (long)partitionResult.startOffset());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateSummaryRequestRetryWithCoordinatorNodeLookup() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateSummaryRequestData.PartitionData)((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData().setResults(List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L)))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateSummaryRequestData.PartitionData)((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData().setResults(List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L)))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateSummaryHandler handler = (PersisterStateManager.ReadStateSummaryHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateSummaryHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateSummaryResponse result = null;
        try {
            result = (ReadShareGroupStateSummaryResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateSummaryResponseData.PartitionResult partitionResult = (ReadShareGroupStateSummaryResponseData.PartitionResult)((ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        Assertions.assertEquals((int)1, (int)partitionResult.stateEpoch());
        Assertions.assertEquals((long)0L, (long)partitionResult.startOffset());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateSummaryRequestFailureMaxRetriesExhausted() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateSummaryRequestData.PartitionData)((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData().setResults(List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L)))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateSummaryRequest request = (ReadShareGroupStateSummaryRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateSummaryRequestData.PartitionData)((ReadShareGroupStateSummaryRequestData.ReadStateSummaryData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData().setResults(List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult().setTopicId(topicId).setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L)))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateSummaryHandler handler = (PersisterStateManager.ReadStateSummaryHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateSummaryHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 2, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateSummaryResponse result = null;
        try {
            result = (ReadShareGroupStateSummaryResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateSummaryResponseData.PartitionResult partitionResult = (ReadShareGroupStateSummaryResponseData.PartitionResult)((ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.ReadStateSummaryHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testDeleteStateRequestCoordinatorFoundSuccessfully() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            DeleteShareGroupStateRequest request = (DeleteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((DeleteShareGroupStateRequestData.PartitionData)((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new DeleteShareGroupStateResponse(new DeleteShareGroupStateResponseData().setResults(List.of(new DeleteShareGroupStateResponseData.DeleteStateResult().setTopicId(topicId).setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.DeleteStateHandler handler = (PersisterStateManager.DeleteStateHandler)Mockito.spy((Object)new PersisterStateManager.DeleteStateHandler(persisterStateManager, groupId, topicId, partition, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        DeleteShareGroupStateResponse result = null;
        try {
            result = (DeleteShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        DeleteShareGroupStateResponseData.PartitionResult partitionResult = (DeleteShareGroupStateResponseData.PartitionResult)((DeleteShareGroupStateResponseData.DeleteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.DeleteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).findShareCoordinatorBuilder();
        ((PersisterStateManager.DeleteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testDeleteStateRequestRetryWithNotCoordinatorSuccessfulOnRetry() throws InterruptedException, ExecutionException {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.NOT_COORDINATOR.code())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            DeleteShareGroupStateRequest request = (DeleteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((DeleteShareGroupStateRequestData.PartitionData)((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new DeleteShareGroupStateResponse(new DeleteShareGroupStateResponseData().setResults(List.of(new DeleteShareGroupStateResponseData.DeleteStateResult().setTopicId(topicId).setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.DeleteStateHandler handler = (PersisterStateManager.DeleteStateHandler)Mockito.spy((Object)new PersisterStateManager.DeleteStateHandler(persisterStateManager, groupId, topicId, partition, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        TestUtils.waitForCondition(resultFuture::isDone, (long)15000L, (long)10L, () -> "Failed to get result from future");
        DeleteShareGroupStateResponse result = (DeleteShareGroupStateResponse)resultFuture.get();
        DeleteShareGroupStateResponseData.PartitionResult partitionResult = (DeleteShareGroupStateResponseData.PartitionResult)((DeleteShareGroupStateResponseData.DeleteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.DeleteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.DeleteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testDeleteStateRequestCoordinatorFoundOnRetry() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            DeleteShareGroupStateRequest request = (DeleteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((DeleteShareGroupStateRequestData.PartitionData)((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new DeleteShareGroupStateResponse(new DeleteShareGroupStateResponseData().setResults(List.of(new DeleteShareGroupStateResponseData.DeleteStateResult().setTopicId(topicId).setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.DeleteStateHandler handler = (PersisterStateManager.DeleteStateHandler)Mockito.spy((Object)new PersisterStateManager.DeleteStateHandler(persisterStateManager, groupId, topicId, partition, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        DeleteShareGroupStateResponse result = null;
        try {
            result = (DeleteShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        DeleteShareGroupStateResponseData.PartitionResult partitionResult = (DeleteShareGroupStateResponseData.PartitionResult)((DeleteShareGroupStateResponseData.DeleteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.DeleteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.DeleteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testDeleteStateRequestWithCoordinatorNodeLookup() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            DeleteShareGroupStateRequest request = (DeleteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((DeleteShareGroupStateRequestData.PartitionData)((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new DeleteShareGroupStateResponse(new DeleteShareGroupStateResponseData().setResults(List.of(new DeleteShareGroupStateResponseData.DeleteStateResult().setTopicId(topicId).setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.DeleteStateHandler handler = (PersisterStateManager.DeleteStateHandler)Mockito.spy((Object)new PersisterStateManager.DeleteStateHandler(persisterStateManager, groupId, topicId, partition, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        DeleteShareGroupStateResponse result = null;
        try {
            result = (DeleteShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        DeleteShareGroupStateResponseData.PartitionResult partitionResult = (DeleteShareGroupStateResponseData.PartitionResult)((DeleteShareGroupStateResponseData.DeleteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.DeleteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.DeleteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.DeleteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testDeleteStateRequestWithRetryAndCoordinatorNodeLookup() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            DeleteShareGroupStateRequest request = (DeleteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((DeleteShareGroupStateRequestData.PartitionData)((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new DeleteShareGroupStateResponse(new DeleteShareGroupStateResponseData().setResults(List.of(new DeleteShareGroupStateResponseData.DeleteStateResult().setTopicId(topicId).setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("")))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            DeleteShareGroupStateRequest request = (DeleteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((DeleteShareGroupStateRequestData.PartitionData)((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new DeleteShareGroupStateResponse(new DeleteShareGroupStateResponseData().setResults(List.of(new DeleteShareGroupStateResponseData.DeleteStateResult().setTopicId(topicId).setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.DeleteStateHandler handler = (PersisterStateManager.DeleteStateHandler)Mockito.spy((Object)new PersisterStateManager.DeleteStateHandler(persisterStateManager, groupId, topicId, partition, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        DeleteShareGroupStateResponse result = null;
        try {
            result = (DeleteShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        DeleteShareGroupStateResponseData.PartitionResult partitionResult = (DeleteShareGroupStateResponseData.PartitionResult)((DeleteShareGroupStateResponseData.DeleteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.DeleteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.DeleteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.DeleteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testDeleteStateRequestFailedMaxRetriesExhausted() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            DeleteShareGroupStateRequest request = (DeleteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((DeleteShareGroupStateRequestData.PartitionData)((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new DeleteShareGroupStateResponse(new DeleteShareGroupStateResponseData().setResults(List.of(new DeleteShareGroupStateResponseData.DeleteStateResult().setTopicId(topicId).setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("")))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            DeleteShareGroupStateRequest request = (DeleteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((DeleteShareGroupStateRequestData.PartitionData)((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new DeleteShareGroupStateResponse(new DeleteShareGroupStateResponseData().setResults(List.of(new DeleteShareGroupStateResponseData.DeleteStateResult().setTopicId(topicId).setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("")))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            DeleteShareGroupStateRequest request = (DeleteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((DeleteShareGroupStateRequestData.PartitionData)((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new DeleteShareGroupStateResponse(new DeleteShareGroupStateResponseData().setResults(List.of(new DeleteShareGroupStateResponseData.DeleteStateResult().setTopicId(topicId).setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.DeleteStateHandler handler = (PersisterStateManager.DeleteStateHandler)Mockito.spy((Object)new PersisterStateManager.DeleteStateHandler(persisterStateManager, groupId, topicId, partition, future, 100L, 3000L, 2));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        DeleteShareGroupStateResponse result = null;
        try {
            result = (DeleteShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        DeleteShareGroupStateResponseData.PartitionResult partitionResult = (DeleteShareGroupStateResponseData.PartitionResult)((DeleteShareGroupStateResponseData.DeleteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.DeleteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.DeleteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.DeleteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testDeleteStateRequestBatchingWithCoordinatorNodeLookup() throws Exception {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            DeleteShareGroupStateRequest request = (DeleteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((DeleteShareGroupStateRequestData.PartitionData)((DeleteShareGroupStateRequestData.DeleteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new DeleteShareGroupStateResponse(new DeleteShareGroupStateResponseData().setResults(List.of(new DeleteShareGroupStateResponseData.DeleteStateResult().setTopicId(topicId).setPartitions(List.of(new DeleteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        AtomicBoolean isBatchingSuccess = new AtomicBoolean(false);
        stateManager.setGenerateCallback(() -> {
            Map handlersPerType = (Map)stateManager.nodeRPCMap().get(coordinatorNode);
            if (handlersPerType != null && handlersPerType.containsKey(PersisterStateManager.RPCType.DELETE) && ((Map)handlersPerType.get(PersisterStateManager.RPCType.DELETE)).containsKey(groupId) && ((List)((Map)handlersPerType.get(PersisterStateManager.RPCType.DELETE)).get(groupId)).size() > 2) {
                isBatchingSuccess.set(true);
            }
        });
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        ArrayList<PersisterStateManager.DeleteStateHandler> handlers = new ArrayList<PersisterStateManager.DeleteStateHandler>();
        for (int i = 0; i < 5; ++i) {
            PersisterStateManager persisterStateManager = stateManager;
            Objects.requireNonNull(persisterStateManager);
            PersisterStateManager.DeleteStateHandler handler = (PersisterStateManager.DeleteStateHandler)Mockito.spy((Object)new PersisterStateManager.DeleteStateHandler(persisterStateManager, groupId, topicId, partition, future, 100L, 3000L, 5));
            handlers.add(handler);
            stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        }
        CompletableFuture.allOf((CompletableFuture[])handlers.stream().map(PersisterStateManager.DeleteStateHandler::result).toArray(CompletableFuture[]::new)).get();
        TestUtils.waitForCondition(isBatchingSuccess::get, (long)15000L, (long)10L, () -> "unable to verify batching");
    }

    @Test
    public void testInitializeStateRequestCoordinatorFoundSuccessfully() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        int stateEpoch = 1;
        long startOffset = 10L;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((InitializeShareGroupStateRequestData.PartitionData)((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new InitializeShareGroupStateResponse(new InitializeShareGroupStateResponseData().setResults(List.of(new InitializeShareGroupStateResponseData.InitializeStateResult().setTopicId(topicId).setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.InitializeStateHandler handler = (PersisterStateManager.InitializeStateHandler)Mockito.spy((Object)new PersisterStateManager.InitializeStateHandler(persisterStateManager, groupId, topicId, partition, stateEpoch, startOffset, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        InitializeShareGroupStateResponse result = null;
        try {
            result = (InitializeShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        InitializeShareGroupStateResponseData.PartitionResult partitionResult = (InitializeShareGroupStateResponseData.PartitionResult)((InitializeShareGroupStateResponseData.InitializeStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.InitializeStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).findShareCoordinatorBuilder();
        ((PersisterStateManager.InitializeStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testInitializeStateRequestRetryWithNotCoordinatorSuccessfulOnRetry() throws InterruptedException, ExecutionException {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        int stateEpoch = 5;
        int startOffset = 11;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.NOT_COORDINATOR.code())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((InitializeShareGroupStateRequestData.PartitionData)((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new InitializeShareGroupStateResponse(new InitializeShareGroupStateResponseData().setResults(List.of(new InitializeShareGroupStateResponseData.InitializeStateResult().setTopicId(topicId).setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.InitializeStateHandler handler = (PersisterStateManager.InitializeStateHandler)Mockito.spy((Object)new PersisterStateManager.InitializeStateHandler(persisterStateManager, groupId, topicId, partition, stateEpoch, (long)startOffset, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        TestUtils.waitForCondition(resultFuture::isDone, (long)15000L, (long)10L, () -> "Failed to get result from future");
        InitializeShareGroupStateResponse result = (InitializeShareGroupStateResponse)resultFuture.get();
        InitializeShareGroupStateResponseData.PartitionResult partitionResult = (InitializeShareGroupStateResponseData.PartitionResult)((InitializeShareGroupStateResponseData.InitializeStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.InitializeStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.InitializeStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testInitializeStateRequestCoordinatorFoundOnRetry() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        int stateEpoch = 5;
        long startOffset = 12L;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(List.of(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((InitializeShareGroupStateRequestData.PartitionData)((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new InitializeShareGroupStateResponse(new InitializeShareGroupStateResponseData().setResults(List.of(new InitializeShareGroupStateResponseData.InitializeStateResult().setTopicId(topicId).setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.InitializeStateHandler handler = (PersisterStateManager.InitializeStateHandler)Mockito.spy((Object)new PersisterStateManager.InitializeStateHandler(persisterStateManager, groupId, topicId, partition, stateEpoch, startOffset, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        InitializeShareGroupStateResponse result = null;
        try {
            result = (InitializeShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        InitializeShareGroupStateResponseData.PartitionResult partitionResult = (InitializeShareGroupStateResponseData.PartitionResult)((InitializeShareGroupStateResponseData.InitializeStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.InitializeStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.InitializeStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testInitializeStateRequestWithCoordinatorNodeLookup() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        int stateEpoch = 5;
        long startOffset = 10L;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((InitializeShareGroupStateRequestData.PartitionData)((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new InitializeShareGroupStateResponse(new InitializeShareGroupStateResponseData().setResults(List.of(new InitializeShareGroupStateResponseData.InitializeStateResult().setTopicId(topicId).setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.InitializeStateHandler handler = (PersisterStateManager.InitializeStateHandler)Mockito.spy((Object)new PersisterStateManager.InitializeStateHandler(persisterStateManager, groupId, topicId, partition, stateEpoch, startOffset, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        InitializeShareGroupStateResponse result = null;
        try {
            result = (InitializeShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        InitializeShareGroupStateResponseData.PartitionResult partitionResult = (InitializeShareGroupStateResponseData.PartitionResult)((InitializeShareGroupStateResponseData.InitializeStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.InitializeStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.InitializeStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.InitializeStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testInitializeStateRequestWithRetryAndCoordinatorNodeLookup() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        int stateEpoch = 5;
        long startOffset = 10L;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((InitializeShareGroupStateRequestData.PartitionData)((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new InitializeShareGroupStateResponse(new InitializeShareGroupStateResponseData().setResults(List.of(new InitializeShareGroupStateResponseData.InitializeStateResult().setTopicId(topicId).setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("")))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((InitializeShareGroupStateRequestData.PartitionData)((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new InitializeShareGroupStateResponse(new InitializeShareGroupStateResponseData().setResults(List.of(new InitializeShareGroupStateResponseData.InitializeStateResult().setTopicId(topicId).setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.InitializeStateHandler handler = (PersisterStateManager.InitializeStateHandler)Mockito.spy((Object)new PersisterStateManager.InitializeStateHandler(persisterStateManager, groupId, topicId, partition, stateEpoch, startOffset, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        InitializeShareGroupStateResponse result = null;
        try {
            result = (InitializeShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        InitializeShareGroupStateResponseData.PartitionResult partitionResult = (InitializeShareGroupStateResponseData.PartitionResult)((InitializeShareGroupStateResponseData.InitializeStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.InitializeStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.InitializeStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.InitializeStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testInitializeStateRequestFailedMaxRetriesExhausted() {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        int stateEpoch = 5;
        long startOffset = 10L;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((InitializeShareGroupStateRequestData.PartitionData)((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new InitializeShareGroupStateResponse(new InitializeShareGroupStateResponseData().setResults(List.of(new InitializeShareGroupStateResponseData.InitializeStateResult().setTopicId(topicId).setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("")))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((InitializeShareGroupStateRequestData.PartitionData)((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new InitializeShareGroupStateResponse(new InitializeShareGroupStateResponseData().setResults(List.of(new InitializeShareGroupStateResponseData.InitializeStateResult().setTopicId(topicId).setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("")))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((InitializeShareGroupStateRequestData.PartitionData)((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new InitializeShareGroupStateResponse(new InitializeShareGroupStateResponseData().setResults(List.of(new InitializeShareGroupStateResponseData.InitializeStateResult().setTopicId(topicId).setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.InitializeStateHandler handler = (PersisterStateManager.InitializeStateHandler)Mockito.spy((Object)new PersisterStateManager.InitializeStateHandler(persisterStateManager, groupId, topicId, partition, stateEpoch, startOffset, future, 100L, 3000L, 2));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        InitializeShareGroupStateResponse result = null;
        try {
            result = (InitializeShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        InitializeShareGroupStateResponseData.PartitionResult partitionResult = (InitializeShareGroupStateResponseData.PartitionResult)((InitializeShareGroupStateResponseData.InitializeStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.InitializeStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.InitializeStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.InitializeStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testInitializeStateRequestBatchingWithCoordinatorNodeLookup() throws Exception {
        MockClient client = new MockClient((Time)MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        int stateEpoch = 5;
        long startOffset = 10L;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            InitializeShareGroupStateRequest request = (InitializeShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((InitializeShareGroupStateRequestData.PartitionData)((InitializeShareGroupStateRequestData.InitializeStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new InitializeShareGroupStateResponse(new InitializeShareGroupStateResponseData().setResults(List.of(new InitializeShareGroupStateResponseData.InitializeStateResult().setTopicId(topicId).setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        AtomicBoolean isBatchingSuccess = new AtomicBoolean(false);
        stateManager.setGenerateCallback(() -> {
            Map handlersPerType = (Map)stateManager.nodeRPCMap().get(coordinatorNode);
            if (handlersPerType != null && handlersPerType.containsKey(PersisterStateManager.RPCType.INITIALIZE) && ((Map)handlersPerType.get(PersisterStateManager.RPCType.INITIALIZE)).containsKey(groupId) && ((List)((Map)handlersPerType.get(PersisterStateManager.RPCType.INITIALIZE)).get(groupId)).size() > 2) {
                isBatchingSuccess.set(true);
            }
        });
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        ArrayList<PersisterStateManager.InitializeStateHandler> handlers = new ArrayList<PersisterStateManager.InitializeStateHandler>();
        for (int i = 0; i < 5; ++i) {
            PersisterStateManager persisterStateManager = stateManager;
            Objects.requireNonNull(persisterStateManager);
            PersisterStateManager.InitializeStateHandler handler = (PersisterStateManager.InitializeStateHandler)Mockito.spy((Object)new PersisterStateManager.InitializeStateHandler(persisterStateManager, groupId, topicId, partition, stateEpoch, startOffset, future, 100L, 3000L, 5));
            handlers.add(handler);
            stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        }
        CompletableFuture.allOf((CompletableFuture[])handlers.stream().map(PersisterStateManager.InitializeStateHandler::result).toArray(CompletableFuture[]::new)).get();
        TestUtils.waitForCondition(isBatchingSuccess::get, (long)15000L, (long)10L, () -> "unable to verify batching");
    }

    @Test
    public void testPersisterStateManagerClose() {
        KafkaClient client = (KafkaClient)Mockito.mock(KafkaClient.class);
        Timer timer = (Timer)Mockito.mock(Timer.class);
        PersisterStateManager psm = PersisterStateManagerBuilder.builder().withTimer(timer).withKafkaClient(client).build();
        try {
            ((KafkaClient)Mockito.verify((Object)client, (VerificationMode)Mockito.times((int)0))).close();
            ((Timer)Mockito.verify((Object)timer, (VerificationMode)Mockito.times((int)0))).close();
            psm.start();
            psm.stop();
            ((KafkaClient)Mockito.verify((Object)client, (VerificationMode)Mockito.times((int)1))).close();
            ((Timer)Mockito.verify((Object)timer, (VerificationMode)Mockito.times((int)1))).close();
        }
        catch (Exception e) {
            Assertions.fail((String)"unexpected exception", (Throwable)e);
        }
    }

    private static Stream<TestHolder> generatorDifferentStates() {
        return Stream.of(new TestHolder(true, false, false, Optional.empty()), new TestHolder(true, true, true, Optional.empty()), new TestHolder(true, false, true, Optional.empty()), new TestHolder(true, true, false, Optional.empty()), new TestHolder(false, true, false, Optional.of(Errors.NETWORK_EXCEPTION)), new TestHolder(false, false, true, Optional.of(Errors.REQUEST_TIMED_OUT)), new TestHolder(false, true, true, Optional.of(Errors.NETWORK_EXCEPTION)), new TestHolder(false, false, false, Optional.of(Errors.UNKNOWN_SERVER_ERROR)));
    }

    @ParameterizedTest
    @MethodSource(value={"generatorDifferentStates"})
    public void testNetworkErrorHandling(TestHolder holder) {
        KafkaClient client = (KafkaClient)Mockito.mock(KafkaClient.class);
        Timer timer = (Timer)Mockito.mock(Timer.class);
        PersisterStateManager psm = PersisterStateManagerBuilder.builder().withTimer(timer).withKafkaClient(client).build();
        SharePartitionKey key = SharePartitionKey.getInstance((String)"group", (Uuid)Uuid.randomUuid(), (int)1);
        CompletableFuture future = new CompletableFuture();
        TestStateHandler handler = (TestStateHandler)((Object)Mockito.spy((Object)((Object)new TestStateHandler(psm, key.groupId(), key.topicId(), key.partition(), future, 100L, 3000L, 5){

            protected AbstractRequest.Builder<? extends AbstractRequest> requestBuilder() {
                return null;
            }
        })));
        ClientResponse response = (ClientResponse)Mockito.mock(ClientResponse.class);
        Mockito.when((Object)response.hasResponse()).thenReturn((Object)holder.hasResponse);
        Mockito.when((Object)response.wasDisconnected()).thenReturn((Object)holder.wasDisconnected);
        Mockito.when((Object)response.wasTimedOut()).thenReturn((Object)holder.wasTimedOut);
        Assertions.assertEquals(holder.exp, (Object)handler.checkNetworkError(response, (err, exp) -> {}));
    }

    private static class PersisterStateManagerBuilder {
        private KafkaClient client = CLIENT;
        private Time time = MOCK_TIME;
        private Timer timer = MOCK_TIMER;
        private ShareCoordinatorMetadataCacheHelper cacheHelper = CACHE_HELPER;
        private int requestTimeoutMs = Math.toIntExact(CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS);

        private PersisterStateManagerBuilder() {
        }

        private PersisterStateManagerBuilder withKafkaClient(KafkaClient client) {
            this.client = client;
            return this;
        }

        private PersisterStateManagerBuilder withCacheHelper(ShareCoordinatorMetadataCacheHelper cacheHelper) {
            this.cacheHelper = cacheHelper;
            return this;
        }

        private PersisterStateManagerBuilder withTime(Time time) {
            this.time = time;
            return this;
        }

        private PersisterStateManagerBuilder withTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        public static PersisterStateManagerBuilder builder() {
            return new PersisterStateManagerBuilder();
        }

        public PersisterStateManager build() {
            return new PersisterStateManager(this.client, this.cacheHelper, this.time, this.timer);
        }
    }

    private abstract class TestStateHandler
    extends PersisterStateManager.PersisterStateManagerHandler {
        private final CompletableFuture<TestHandlerResponse> result;

        TestStateHandler(PersisterStateManager stateManager, String groupId, Uuid topicId, int partition, CompletableFuture<TestHandlerResponse> result, long backoffMs, long backoffMaxMs, int maxFindCoordAttempts) {
            PersisterStateManager persisterStateManager = stateManager;
            Objects.requireNonNull(persisterStateManager);
            super(persisterStateManager, groupId, topicId, partition, backoffMs, backoffMaxMs, maxFindCoordAttempts);
            this.result = result;
        }

        protected void handleRequestResponse(ClientResponse response) {
            this.result.complete(new TestHandlerResponse(new TestHandlerResponseData().setResults(List.of(new WriteShareGroupStateResponseData.WriteStateResult().setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult().setPartition(this.partitionKey().partition()).setErrorMessage(Errors.NONE.message()).setErrorCode(Errors.NONE.code())))))));
        }

        protected boolean isResponseForRequest(ClientResponse response) {
            return true;
        }

        protected void findCoordinatorErrorResponse(Errors error, Exception exception) {
            this.result.complete(new TestHandlerResponse(new TestHandlerResponseData().setResults(List.of(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(this.partitionKey().topicId()).setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult().setPartition(this.partitionKey().partition()).setErrorMessage(exception == null ? error.message() : exception.getMessage()).setErrorCode(error.code())))))));
        }

        protected void requestErrorResponse(Errors error, Exception exception) {
            this.result.complete(new TestHandlerResponse(new TestHandlerResponseData().setResults(List.of(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(this.partitionKey().topicId()).setPartitions(List.of(new WriteShareGroupStateResponseData.PartitionResult().setPartition(this.partitionKey().partition()).setErrorMessage(exception == null ? error.message() : exception.getMessage()).setErrorCode(error.code())))))));
        }

        protected String name() {
            return "TestStateHandler";
        }

        protected boolean isBatchable() {
            return false;
        }

        protected PersisterStateManager.RPCType rpcType() {
            return PersisterStateManager.RPCType.UNKNOWN;
        }

        protected CompletableFuture<TestHandlerResponse> result() {
            return this.result;
        }

        private class TestHandlerResponse
        extends WriteShareGroupStateResponse {
            public TestHandlerResponse(WriteShareGroupStateResponseData data) {
                super(data);
            }
        }

        private class TestHandlerResponseData
        extends WriteShareGroupStateResponseData {
            private TestHandlerResponseData() {
            }
        }
    }

    static class TestHolder {
        boolean hasResponse;
        boolean wasDisconnected;
        boolean wasTimedOut;
        Optional<Errors> exp;

        TestHolder(boolean hasResponse, boolean wasDisconnected, boolean wasTimedOut, Optional<Errors> exp) {
            this.hasResponse = hasResponse;
            this.wasDisconnected = wasDisconnected;
            this.wasTimedOut = wasTimedOut;
            this.exp = exp;
        }
    }
}

