/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.TimedRequestState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class CommitRequestManagerTest {
    private final long retryBackoffMs = 100L;
    private final long retryBackoffMaxMs = 1000L;
    private static final String CONSUMER_COORDINATOR_METRICS = "consumer-coordinator-metrics";
    private final Node mockedNode = new Node(1, "host1", 9092);
    private SubscriptionState subscriptionState;
    private LogContext logContext;
    private MockTime time;
    private CoordinatorRequestManager coordinatorRequestManager;
    private OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
    private final Metrics metrics = new Metrics();
    private Properties props;
    private final int defaultApiTimeoutMs = 60000;

    @BeforeEach
    public void setup() {
        this.logContext = new LogContext();
        this.time = new MockTime(0L);
        this.subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        this.coordinatorRequestManager = (CoordinatorRequestManager)Mockito.mock(CoordinatorRequestManager.class);
        this.offsetCommitCallbackInvoker = (OffsetCommitCallbackInvoker)Mockito.mock(OffsetCommitCallbackInvoker.class);
        this.props = new Properties();
        this.props.put("auto.commit.interval.ms", (Object)100);
        this.props.put("key.deserializer", StringDeserializer.class);
        this.props.put("value.deserializer", StringDeserializer.class);
    }

    @Test
    public void testOffsetFetchRequestStateToStringBase() {
        ConsumerConfig config = (ConsumerConfig)Mockito.mock(ConsumerConfig.class);
        CommitRequestManager commitRequestManager = new CommitRequestManager((Time)this.time, this.logContext, this.subscriptionState, config, this.coordinatorRequestManager, this.offsetCommitCallbackInvoker, "group-id", Optional.of("group-instance-id"), 100L, 1000L, OptionalDouble.of(0.0), this.metrics);
        commitRequestManager.onMemberEpochUpdated(Optional.of(1), Optional.empty());
        Set<TopicPartition> requestedPartitions = Collections.singleton(new TopicPartition("topic-1", 1));
        CommitRequestManager.OffsetFetchRequestState offsetFetchRequestState = commitRequestManager.createOffsetFetchRequest(requestedPartitions, 0L);
        TimedRequestState timedRequestState = new TimedRequestState(this.logContext, CommitRequestManager.class.getSimpleName(), 100L, 2, 1000L, 0.0, TimedRequestState.deadlineTimer((Time)this.time, (long)0L));
        String target = timedRequestState.toStringBase() + ", " + offsetFetchRequestState.memberInfo + ", requestedPartitions=" + offsetFetchRequestState.requestedPartitions;
        Assertions.assertDoesNotThrow(() -> ((TimedRequestState)timedRequestState).toString());
        Assertions.assertFalse((boolean)target.contains("Optional"));
        Assertions.assertEquals((Object)target, (Object)offsetFetchRequestState.toStringBase());
    }

    @Test
    public void testPollSkipIfCoordinatorUnknown() {
        CommitRequestManager commitRequestManager = this.create(false, 0L);
        this.assertPoll(false, 0, commitRequestManager);
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0L));
        commitRequestManager.commitAsync(offsets);
        this.assertPoll(false, 0, commitRequestManager);
    }

    @Test
    public void testAsyncCommitWhileCoordinatorUnknownIsSentOutWhenCoordinatorDiscovered() {
        CommitRequestManager commitRequestManager = this.create(false, 0L);
        this.assertPoll(false, 0, commitRequestManager);
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0L));
        commitRequestManager.commitAsync(offsets);
        this.assertPoll(false, 0, commitRequestManager);
        this.assertPoll(true, 1, commitRequestManager);
    }

    @Test
    public void testPollEnsureManualCommitSent() {
        CommitRequestManager commitRequestManager = this.create(false, 0L);
        this.assertPoll(0, commitRequestManager);
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0L));
        commitRequestManager.commitAsync(offsets);
        this.assertPoll(1, commitRequestManager);
    }

    @Test
    public void testPollEnsureAutocommitSent() {
        TopicPartition tp = new TopicPartition("t1", 1);
        this.subscriptionState.assignFromUser(Collections.singleton(tp));
        this.subscriptionState.seek(tp, 100L);
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        this.assertPoll(0, commitRequestManager);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        this.time.sleep(100L);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        List<NetworkClientDelegate.FutureCompletionHandler> pollResults = this.assertPoll(1, commitRequestManager);
        pollResults.forEach(v -> v.onComplete(this.mockOffsetCommitResponse("t1", 1, (short)1, Errors.NONE)));
        Assertions.assertEquals((double)0.03, (double)((Double)this.getMetric("commit-rate").metricValue()), (double)0.01);
        Assertions.assertEquals((Object)1.0, (Object)this.getMetric("commit-total").metricValue());
    }

    @Test
    public void testPollEnsureCorrectInflightRequestBufferSize() {
        CommitRequestManager commitManager = this.create(false, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashMap<TopicPartition, OffsetAndMetadata> offsets1 = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets1.put(new TopicPartition("test", 0), new OffsetAndMetadata(10L));
        offsets1.put(new TopicPartition("test", 1), new OffsetAndMetadata(20L));
        HashMap<TopicPartition, OffsetAndMetadata> offsets2 = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets2.put(new TopicPartition("test", 3), new OffsetAndMetadata(20L));
        offsets2.put(new TopicPartition("test", 4), new OffsetAndMetadata(20L));
        long deadlineMs = this.time.milliseconds() + 60000L;
        commitManager.commitSync(offsets1, deadlineMs);
        commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), deadlineMs);
        commitManager.commitSync(offsets2, deadlineMs);
        commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 1)), deadlineMs);
        NetworkClientDelegate.PollResult result = commitManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)4, (int)result.unsentRequests.size());
        Assertions.assertTrue((boolean)result.unsentRequests.stream().anyMatch(r -> r.requestBuilder() instanceof OffsetCommitRequest.Builder));
        Assertions.assertTrue((boolean)result.unsentRequests.stream().anyMatch(r -> r.requestBuilder() instanceof OffsetFetchRequest.Builder));
        Assertions.assertFalse((boolean)commitManager.pendingRequests.hasUnsentRequests());
        Assertions.assertEquals((int)2, (int)commitManager.pendingRequests.inflightOffsetFetches.size());
        result.unsentRequests.forEach(req -> {
            if (req.requestBuilder() instanceof OffsetFetchRequest.Builder) {
                req.handler().onComplete(this.buildOffsetFetchClientResponse((NetworkClientDelegate.UnsentRequest)req, Collections.emptySet(), Errors.NONE));
            } else {
                req.handler().onComplete(this.buildOffsetCommitClientResponse(new OffsetCommitResponse(0, new HashMap())));
            }
        });
        Assertions.assertEquals((int)0, (int)commitManager.pendingRequests.inflightOffsetFetches.size());
    }

    @Test
    public void testPollEnsureEmptyPendingRequestAfterPoll() {
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L));
        commitRequestManager.commitAsync(offsets);
        Assertions.assertEquals((int)1, (int)commitRequestManager.unsentOffsetCommitRequests().size());
        Assertions.assertEquals((int)1, (int)commitRequestManager.poll((long)this.time.milliseconds()).unsentRequests.size());
        Assertions.assertTrue((boolean)commitRequestManager.unsentOffsetCommitRequests().isEmpty());
        CommitRequestManagerTest.assertEmptyPendingRequests(commitRequestManager);
    }

    @Test
    public void testAsyncAutocommitNotRetriedAfterException() {
        long commitInterval = 200L;
        CommitRequestManager commitRequestManager = this.create(true, commitInterval);
        TopicPartition tp = new TopicPartition("topic", 1);
        this.subscriptionState.assignFromUser(Collections.singleton(tp));
        this.subscriptionState.seek(tp, 100L);
        this.time.sleep(commitInterval);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        List<NetworkClientDelegate.FutureCompletionHandler> futures = this.assertPoll(1, commitRequestManager);
        futures.get(0).onComplete(this.mockOffsetCommitResponse("topic", 1, (short)1, Errors.COORDINATOR_LOAD_IN_PROGRESS));
        this.time.sleep(100L);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        this.assertPoll(0, commitRequestManager);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        this.time.sleep(commitInterval);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        futures = this.assertPoll(1, commitRequestManager);
        CommitRequestManagerTest.assertEmptyPendingRequests(commitRequestManager);
        futures.get(0).onComplete(this.mockOffsetCommitResponse("topic", 1, (short)1, Errors.NONE));
    }

    @ParameterizedTest
    @MethodSource(value={"offsetCommitExceptionSupplier"})
    public void testCommitSyncRetriedAfterExpectedRetriableException(Errors error) {
        CommitRequestManager commitRequestManager = this.create(false, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L));
        long deadlineMs = this.time.milliseconds() + 60000L;
        CompletableFuture commitResult = commitRequestManager.commitSync(offsets, deadlineMs);
        this.sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(commitRequestManager, error, commitResult);
        this.assertExceptionHandling(commitRequestManager, error, true);
    }

    @ParameterizedTest
    @MethodSource(value={"commitSyncExpectedExceptions"})
    public void testCommitSyncFailsWithExpectedException(Errors commitError, Class<? extends Exception> expectedException) {
        CommitRequestManager commitRequestManager = this.create(false, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L));
        long deadlineMs = this.time.milliseconds() + 100L;
        CompletableFuture commitResult = commitRequestManager.commitSync(offsets, deadlineMs);
        this.completeOffsetCommitRequestWithError(commitRequestManager, commitError);
        TestUtils.assertFutureThrows(commitResult, expectedException);
    }

    private static Stream<Arguments> commitSyncExpectedExceptions() {
        return Stream.of(Arguments.of((Object[])new Object[]{Errors.FENCED_INSTANCE_ID, CommitFailedException.class}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_MEMBER_ID, CommitFailedException.class}), Arguments.of((Object[])new Object[]{Errors.OFFSET_METADATA_TOO_LARGE, Errors.OFFSET_METADATA_TOO_LARGE.exception().getClass()}), Arguments.of((Object[])new Object[]{Errors.INVALID_COMMIT_OFFSET_SIZE, Errors.INVALID_COMMIT_OFFSET_SIZE.exception().getClass()}), Arguments.of((Object[])new Object[]{Errors.GROUP_AUTHORIZATION_FAILED, Errors.GROUP_AUTHORIZATION_FAILED.exception().getClass()}), Arguments.of((Object[])new Object[]{Errors.CORRUPT_MESSAGE, KafkaException.class}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_SERVER_ERROR, KafkaException.class}));
    }

    @Test
    public void testCommitSyncFailsWithCommitFailedExceptionIfUnknownMemberId() {
        CommitRequestManager commitRequestManager = this.create(false, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L));
        long deadlineMs = this.time.milliseconds() + 60000L;
        CompletableFuture commitResult = commitRequestManager.commitSync(offsets, deadlineMs);
        this.completeOffsetCommitRequestWithError(commitRequestManager, Errors.UNKNOWN_MEMBER_ID);
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)res.unsentRequests.size());
        Assertions.assertTrue((boolean)commitResult.isDone());
        TestUtils.assertFutureThrows(commitResult, CommitFailedException.class);
    }

    @Test
    public void testCommitSyncFailsWithCommitFailedExceptionOnStaleMemberEpoch() {
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L));
        CompletableFuture commitResult = commitRequestManager.commitSync(offsets, this.time.milliseconds() + 60000L);
        this.completeOffsetCommitRequestWithError(commitRequestManager, Errors.STALE_MEMBER_EPOCH);
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)res.unsentRequests.size());
        Assertions.assertTrue((boolean)commitResult.isDone());
        TestUtils.assertFutureThrows(commitResult, CommitFailedException.class);
    }

    @Test
    public void testAutoCommitAsyncFailsWithStaleMemberEpochContinuesToCommitOnTheInterval() {
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        this.time.sleep(100L);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        TopicPartition t1p = new TopicPartition("topic1", 0);
        this.subscriptionState.assignFromUser(Collections.singleton(t1p));
        this.subscriptionState.seek(t1p, 10L);
        commitRequestManager.maybeAutoCommitAsync();
        this.completeOffsetCommitRequestWithError(commitRequestManager, Errors.STALE_MEMBER_EPOCH);
        ((CommitRequestManager)Mockito.verify((Object)commitRequestManager)).resetAutoCommitTimer();
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)res.unsentRequests.size(), (String)"No request should be generated until the interval expires");
        this.time.sleep(100L);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
    }

    @Test
    public void testCommitAsyncFailsWithRetriableOnCoordinatorDisconnected() {
        CommitRequestManager commitRequestManager = this.create(false, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L));
        CompletableFuture commitResult = commitRequestManager.commitAsync(offsets);
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest req = (NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0);
        ClientResponse response = this.mockOffsetCommitResponseDisconnected("topic", 1, (short)1, req);
        response.onComplete();
        Assertions.assertTrue((boolean)commitResult.isDone());
        TestUtils.assertFutureThrows(commitResult, RetriableCommitFailedException.class);
        this.assertCoordinatorDisconnect();
    }

    @Test
    public void testAutocommitEnsureOnlyOneInflightRequest() {
        TopicPartition t1p = new TopicPartition("topic1", 0);
        this.subscriptionState.assignFromUser(Collections.singleton(t1p));
        this.subscriptionState.seek(t1p, 100L);
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        this.time.sleep(100L);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        List<NetworkClientDelegate.FutureCompletionHandler> futures = this.assertPoll(1, commitRequestManager);
        this.time.sleep(100L);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        this.assertPoll(0, commitRequestManager);
        CommitRequestManagerTest.assertEmptyPendingRequests(commitRequestManager);
        futures.get(0).onComplete(this.buildOffsetCommitClientResponse(new OffsetCommitResponse(0, new HashMap())));
        this.assertPoll(1, commitRequestManager);
    }

    @Test
    public void testAutoCommitBeforeRevocationNotBlockedByAutoCommitOnIntervalInflightRequest() {
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        TopicPartition t1p = new TopicPartition("topic1", 0);
        this.subscriptionState.assignFromUser(Collections.singleton(t1p));
        this.subscriptionState.seek(t1p, 100L);
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        this.time.sleep(100L);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
        NetworkClientDelegate.FutureCompletionHandler autoCommitOnInterval = ((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0)).handler();
        CompletableFuture autoCommitBeforeRevocation = commitRequestManager.maybeAutoCommitSyncBeforeRevocation(200L);
        Assertions.assertEquals((int)1, (int)commitRequestManager.pendingRequests.unsentOffsetCommits.size());
        autoCommitOnInterval.onComplete(this.buildOffsetCommitClientResponse(new OffsetCommitResponse(0, new HashMap())));
        Assertions.assertFalse((boolean)autoCommitBeforeRevocation.isDone(), (String)"Auto-commit before revocation should not complete until it receives a response");
    }

    @Test
    public void testAutocommitInterceptorsInvoked() {
        TopicPartition t1p = new TopicPartition("topic1", 0);
        this.subscriptionState.assignFromUser(Collections.singleton(t1p));
        this.subscriptionState.seek(t1p, 100L);
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        this.time.sleep(100L);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        List<NetworkClientDelegate.FutureCompletionHandler> futures = this.assertPoll(1, commitRequestManager);
        futures.get(0).onComplete(this.buildOffsetCommitClientResponse(new OffsetCommitResponse(0, new HashMap())));
        ((OffsetCommitCallbackInvoker)Mockito.verify((Object)this.offsetCommitCallbackInvoker)).enqueueInterceptorInvocation((Map)ArgumentMatchers.eq(Collections.singletonMap(t1p, new OffsetAndMetadata(100L))));
    }

    @Test
    public void testAutocommitInterceptorsNotInvokedOnError() {
        TopicPartition t1p = new TopicPartition("topic1", 0);
        this.subscriptionState.assignFromUser(Collections.singleton(t1p));
        this.subscriptionState.seek(t1p, 100L);
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        this.time.sleep(100L);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        List<NetworkClientDelegate.FutureCompletionHandler> futures = this.assertPoll(1, commitRequestManager);
        futures.get(0).onComplete(this.buildOffsetCommitClientResponse(new OffsetCommitResponse(0, Collections.singletonMap(t1p, Errors.NETWORK_EXCEPTION))));
        ((OffsetCommitCallbackInvoker)Mockito.verify((Object)this.offsetCommitCallbackInvoker, (VerificationMode)Mockito.never())).enqueueInterceptorInvocation((Map)ArgumentMatchers.any());
    }

    @Test
    public void testAutoCommitEmptyOffsetsDoesNotGenerateRequest() {
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        this.time.sleep(100L);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        commitRequestManager.maybeAutoCommitAsync();
        Assertions.assertTrue((boolean)commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty());
        ((CommitRequestManager)Mockito.verify((Object)commitRequestManager)).resetAutoCommitTimer();
    }

    @Test
    public void testAutoCommitEmptyDoesNotLeaveInflightRequestFlagOn() {
        TopicPartition t1p = new TopicPartition("topic1", 0);
        this.subscriptionState.assignFromUser(Collections.singleton(t1p));
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        this.time.sleep(100L);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        commitRequestManager.maybeAutoCommitAsync();
        this.subscriptionState.seek(t1p, 100L);
        this.time.sleep(100L);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        commitRequestManager.maybeAutoCommitAsync();
        Assertions.assertEquals((int)1, (int)commitRequestManager.pendingRequests.unsentOffsetCommits.size());
        ((CommitRequestManager)Mockito.verify((Object)commitRequestManager, (VerificationMode)Mockito.times((int)2))).resetAutoCommitTimer();
    }

    @Test
    public void testAutoCommitOnIntervalSkippedIfPreviousOneInFlight() {
        TopicPartition t1p = new TopicPartition("topic1", 0);
        this.subscriptionState.assignFromUser(Collections.singleton(t1p));
        this.subscriptionState.seek(t1p, 100L);
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        this.time.sleep(100L);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        commitRequestManager.maybeAutoCommitAsync();
        List<NetworkClientDelegate.FutureCompletionHandler> futures = this.assertPoll(1, commitRequestManager);
        Assertions.assertEquals((int)1, (int)futures.size());
        NetworkClientDelegate.FutureCompletionHandler inflightCommitResult = futures.get(0);
        ((CommitRequestManager)Mockito.verify((Object)commitRequestManager, (VerificationMode)Mockito.times((int)1))).resetAutoCommitTimer();
        Mockito.clearInvocations((Object[])new CommitRequestManager[]{commitRequestManager});
        this.time.sleep(100L);
        commitRequestManager.updateAutoCommitTimer(this.time.milliseconds());
        commitRequestManager.maybeAutoCommitAsync();
        this.assertPoll(0, commitRequestManager);
        ((CommitRequestManager)Mockito.verify((Object)commitRequestManager, (VerificationMode)Mockito.never())).resetAutoCommitTimer();
        inflightCommitResult.onComplete(this.mockOffsetCommitResponse(t1p.topic(), t1p.partition(), (short)1, Errors.NONE));
        this.assertPoll(1, commitRequestManager);
    }

    @Test
    public void testOffsetFetchRequestEnsureDuplicatedRequestSucceed() {
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(new TopicPartition("t1", 0));
        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures = this.sendAndVerifyDuplicatedOffsetFetchRequests(commitRequestManager, partitions, 2, Errors.NONE);
        futures.forEach(f -> {
            Assertions.assertTrue((boolean)f.isDone());
            Assertions.assertFalse((boolean)f.isCompletedExceptionally());
        });
        commitRequestManager.poll(0L);
        CommitRequestManagerTest.assertEmptyPendingRequests(commitRequestManager);
    }

    @ParameterizedTest
    @MethodSource(value={"offsetFetchExceptionSupplier"})
    public void testOffsetFetchRequestErroredRequests(Errors error) {
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(new TopicPartition("t1", 0));
        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures = this.sendAndVerifyDuplicatedOffsetFetchRequests(commitRequestManager, partitions, 1, error);
        if (this.isRetriableOnOffsetFetch(error)) {
            this.testRetriable(commitRequestManager, futures);
        } else {
            this.testNonRetriable(futures);
            CommitRequestManagerTest.assertEmptyPendingRequests(commitRequestManager);
        }
    }

    @ParameterizedTest
    @MethodSource(value={"offsetFetchExceptionSupplier"})
    public void testOffsetFetchRequestTimeoutRequests(Errors error) {
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(new TopicPartition("t1", 0));
        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures = this.sendAndVerifyDuplicatedOffsetFetchRequests(commitRequestManager, partitions, 1, error);
        if (this.isRetriableOnOffsetFetch(error)) {
            futures.forEach(f -> Assertions.assertFalse((boolean)f.isDone()));
            this.time.sleep(60000L);
            Assertions.assertFalse((boolean)commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());
            commitRequestManager.poll(this.time.milliseconds());
            futures.forEach(f -> TestUtils.assertFutureThrows(f, TimeoutException.class));
            Assertions.assertTrue((boolean)commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());
        } else {
            futures.forEach(f -> TestUtils.assertFutureThrows(f, KafkaException.class));
            CommitRequestManagerTest.assertEmptyPendingRequests(commitRequestManager);
        }
    }

    private boolean isRetriableOnOffsetFetch(Errors error) {
        return error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE;
    }

    @Test
    public void testSuccessfulOffsetFetch() {
        CommitRequestManager commitManager = this.create(false, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        long deadlineMs = this.time.milliseconds() + 60000L;
        CompletableFuture fetchResult = commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), deadlineMs);
        NetworkClientDelegate.PollResult result = commitManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
        Assertions.assertEquals((int)1, (int)commitManager.pendingRequests.inflightOffsetFetches.size());
        Assertions.assertFalse((boolean)fetchResult.isDone());
        TopicPartition tp = new TopicPartition("topic1", 0);
        long expectedOffset = 100L;
        NetworkClientDelegate.UnsentRequest req = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
        Map<TopicPartition, OffsetFetchResponse.PartitionData> topicPartitionData = Collections.singletonMap(tp, new OffsetFetchResponse.PartitionData(expectedOffset, Optional.of(1), "", Errors.NONE));
        req.handler().onComplete(this.buildOffsetFetchClientResponse(req, topicPartitionData, Errors.NONE, false));
        Assertions.assertTrue((boolean)fetchResult.isDone());
        Assertions.assertFalse((boolean)fetchResult.isCompletedExceptionally());
        Map offsetsAndMetadata = null;
        try {
            offsetsAndMetadata = (Map)fetchResult.get();
        }
        catch (InterruptedException | ExecutionException e) {
            Assertions.fail((Throwable)e);
        }
        Assertions.assertNotNull((Object)offsetsAndMetadata);
        Assertions.assertEquals((int)1, (int)offsetsAndMetadata.size());
        Assertions.assertTrue((boolean)offsetsAndMetadata.containsKey(tp));
        Assertions.assertEquals((long)expectedOffset, (long)((OffsetAndMetadata)offsetsAndMetadata.get(tp)).offset());
        Assertions.assertEquals((int)0, (int)commitManager.pendingRequests.inflightOffsetFetches.size(), (String)"Inflight request should be removed from the queue when a response is received.");
    }

    @ParameterizedTest
    @MethodSource(value={"offsetFetchRetriableCoordinatorErrors"})
    public void testOffsetFetchMarksCoordinatorUnknownOnRetriableCoordinatorErrors(Errors error, boolean shouldRediscoverCoordinator) {
        CommitRequestManager commitRequestManager = this.create(false, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(new TopicPartition("t1", 0));
        long deadlineMs = this.time.milliseconds() + 60000L;
        CompletableFuture result = commitRequestManager.fetchOffsets(partitions, deadlineMs);
        this.completeOffsetFetchRequestWithError(commitRequestManager, partitions, error);
        Assertions.assertFalse((boolean)result.isDone());
        if (shouldRediscoverCoordinator) {
            this.assertCoordinatorDisconnect();
        }
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)res.unsentRequests.size());
        this.time.sleep(100L);
        res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
    }

    @Test
    public void testOffsetFetchMarksCoordinatorUnknownOnCoordinatorDisconnectedAndRetries() {
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(new TopicPartition("t1", 0));
        long deadlineMs = this.time.milliseconds() + 60000L;
        CompletableFuture result = commitRequestManager.fetchOffsets(partitions, deadlineMs);
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest request = (NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0);
        ClientResponse response = this.buildOffsetFetchClientResponseDisconnected(request);
        response.onComplete();
        Assertions.assertFalse((boolean)result.isDone());
        this.assertCoordinatorDisconnect();
        this.time.sleep(100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
        Assertions.assertEquals((int)1, (int)commitRequestManager.pendingRequests.inflightOffsetFetches.size());
    }

    @ParameterizedTest
    @MethodSource(value={"offsetCommitExceptionSupplier"})
    public void testOffsetCommitRequestErroredRequestsNotRetriedForAsyncCommit(Errors error) {
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L));
        CompletableFuture commitResult = commitRequestManager.commitAsync(offsets);
        this.completeOffsetCommitRequestWithError(commitRequestManager, error);
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)res.unsentRequests.size());
        Assertions.assertTrue((boolean)commitResult.isDone());
        Assertions.assertTrue((boolean)commitResult.isCompletedExceptionally());
        if (error.exception() instanceof RetriableException) {
            TestUtils.assertFutureThrows(commitResult, RetriableCommitFailedException.class);
        }
        this.assertExceptionHandling(commitRequestManager, error, false);
    }

    @Test
    public void testOffsetCommitSyncTimeoutNotReturnedOnPollAndFails() {
        CommitRequestManager commitRequestManager = this.create(false, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L));
        long deadlineMs = this.time.milliseconds() + 200L;
        CompletableFuture commitResult = commitRequestManager.commitSync(offsets, deadlineMs);
        this.completeOffsetCommitRequestWithError(commitRequestManager, Errors.REQUEST_TIMED_OUT);
        this.time.sleep(100L);
        this.completeOffsetCommitRequestWithError(commitRequestManager, Errors.REQUEST_TIMED_OUT);
        Assertions.assertFalse((boolean)commitResult.isDone());
        this.time.sleep(100L);
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)res.unsentRequests.size());
        Assertions.assertTrue((boolean)commitResult.isDone());
        Assertions.assertTrue((boolean)commitResult.isCompletedExceptionally());
    }

    @ParameterizedTest
    @MethodSource(value={"offsetCommitExceptionSupplier"})
    public void testOffsetCommitSyncFailedWithRetriableThrowsTimeoutWhenRetryTimeExpires(Errors error) {
        CommitRequestManager commitRequestManager = this.create(false, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L));
        long deadlineMs = this.time.milliseconds() + 200L;
        CompletableFuture commitResult = commitRequestManager.commitSync(offsets, deadlineMs);
        this.completeOffsetCommitRequestWithError(commitRequestManager, error);
        this.time.sleep(deadlineMs);
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)res.unsentRequests.size());
        Assertions.assertTrue((boolean)commitResult.isDone());
        if (error.exception() instanceof RetriableException) {
            TestUtils.assertFutureThrows(commitResult, TimeoutException.class);
        } else {
            TestUtils.assertFutureThrows(commitResult, KafkaException.class);
        }
    }

    @Test
    public void testOffsetCommitAsyncFailedWithRetriableThrowsRetriableCommitException() {
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L));
        Errors retriableError = Errors.COORDINATOR_NOT_AVAILABLE;
        CompletableFuture commitResult = commitRequestManager.commitAsync(offsets);
        this.completeOffsetCommitRequestWithError(commitRequestManager, retriableError);
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)res.unsentRequests.size());
        Assertions.assertTrue((boolean)commitResult.isDone());
        Assertions.assertTrue((boolean)commitResult.isCompletedExceptionally());
        this.assertExceptionHandling(commitRequestManager, retriableError, false);
        TestUtils.assertFutureThrows(commitResult, RetriableCommitFailedException.class);
    }

    @Test
    public void testEnsureBackoffRetryOnOffsetCommitRequestTimeout() {
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L));
        long deadlineMs = this.time.milliseconds() + 60000L;
        commitRequestManager.commitSync(offsets, deadlineMs);
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0)).handler().onFailure(this.time.milliseconds(), (RuntimeException)new TimeoutException());
        Assertions.assertTrue((boolean)commitRequestManager.pendingRequests.hasUnsentRequests());
        Assertions.assertEquals((int)1, (int)commitRequestManager.unsentOffsetCommitRequests().size());
        this.assertRetryBackOff(commitRequestManager, 100L);
    }

    private void assertCoordinatorDisconnect() {
        ((CoordinatorRequestManager)Mockito.verify((Object)this.coordinatorRequestManager)).markCoordinatorUnknown((String)ArgumentMatchers.any(), ArgumentMatchers.anyLong());
    }

    private void assertExceptionHandling(CommitRequestManager commitRequestManager, Errors errors, boolean requestShouldBeRetried) {
        long remainBackoffMs = requestShouldBeRetried ? 100L : Long.MAX_VALUE;
        switch (errors) {
            case NOT_COORDINATOR: 
            case COORDINATOR_NOT_AVAILABLE: 
            case REQUEST_TIMED_OUT: {
                ((CoordinatorRequestManager)Mockito.verify((Object)this.coordinatorRequestManager)).markCoordinatorUnknown((String)ArgumentMatchers.any(), ArgumentMatchers.anyLong());
                this.assertPollDoesNotReturn(commitRequestManager, remainBackoffMs);
                break;
            }
            case UNKNOWN_TOPIC_OR_PARTITION: 
            case COORDINATOR_LOAD_IN_PROGRESS: {
                if (!requestShouldBeRetried) break;
                this.assertRetryBackOff(commitRequestManager, remainBackoffMs);
                break;
            }
            case GROUP_AUTHORIZATION_FAILED: {
                break;
            }
            case TOPIC_AUTHORIZATION_FAILED: 
            case OFFSET_METADATA_TOO_LARGE: 
            case INVALID_COMMIT_OFFSET_SIZE: {
                this.assertPollDoesNotReturn(commitRequestManager, Long.MAX_VALUE);
                break;
            }
            case FENCED_INSTANCE_ID: {
                this.assertPollDoesNotReturn(commitRequestManager, Long.MAX_VALUE);
                break;
            }
            default: {
                if (errors.exception() instanceof RetriableException && requestShouldBeRetried) {
                    this.assertRetryBackOff(commitRequestManager, remainBackoffMs);
                    break;
                }
                this.assertPollDoesNotReturn(commitRequestManager, Long.MAX_VALUE);
            }
        }
    }

    private void assertPollDoesNotReturn(CommitRequestManager commitRequestManager, long assertNextPollMs) {
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)res.unsentRequests.size());
        Assertions.assertEquals((long)assertNextPollMs, (long)res.timeUntilNextPollMs);
    }

    private void assertRetryBackOff(CommitRequestManager commitRequestManager, long retryBackoffMs) {
        this.assertPollDoesNotReturn(commitRequestManager, retryBackoffMs);
        this.time.sleep(retryBackoffMs - 1L);
        this.assertPollDoesNotReturn(commitRequestManager, 1L);
        this.time.sleep(1L);
        this.assertPoll(1, commitRequestManager);
    }

    @Test
    public void testSyncOffsetFetchFailsWithStaleEpochAndRetriesWithNewEpoch() {
        CommitRequestManager commitRequestManager = this.create(false, 100L);
        Set<TopicPartition> partitions = Collections.singleton(new TopicPartition("t1", 0));
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        long deadlineMs = this.time.milliseconds() + 60000L;
        commitRequestManager.fetchOffsets(partitions, deadlineMs);
        int newEpoch = 8;
        String memberId = "member1";
        commitRequestManager.onMemberEpochUpdated(Optional.of(newEpoch), Optional.of(memberId));
        this.completeOffsetFetchRequestWithError(commitRequestManager, partitions, Errors.STALE_MEMBER_EPOCH);
        Assertions.assertEquals((int)0, (int)commitRequestManager.pendingRequests.inflightOffsetFetches.size());
        Assertions.assertEquals((int)1, (int)commitRequestManager.pendingRequests.unsentOffsetFetches.size());
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)res.unsentRequests.size());
        this.time.sleep(100L);
        res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
        OffsetFetchRequestData reqData = (OffsetFetchRequestData)((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0)).requestBuilder().build().data();
        Assertions.assertEquals((int)1, (int)reqData.groups().size());
        Assertions.assertEquals((int)newEpoch, (int)((OffsetFetchRequestData.OffsetFetchRequestGroup)reqData.groups().get(0)).memberEpoch());
        Assertions.assertEquals((Object)memberId, (Object)((OffsetFetchRequestData.OffsetFetchRequestGroup)reqData.groups().get(0)).memberId());
    }

    @Test
    public void testSyncOffsetFetchFailsWithStaleEpochAndNotRetriedIfMemberNotInGroupAnymore() {
        CommitRequestManager commitRequestManager = this.create(false, 100L);
        Set<TopicPartition> partitions = Collections.singleton(new TopicPartition("t1", 0));
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        long deadlineMs = this.time.milliseconds() + 60000L;
        CompletableFuture requestResult = commitRequestManager.fetchOffsets(partitions, deadlineMs);
        commitRequestManager.onMemberEpochUpdated(Optional.empty(), Optional.empty());
        this.completeOffsetFetchRequestWithError(commitRequestManager, partitions, Errors.STALE_MEMBER_EPOCH);
        Assertions.assertTrue((boolean)requestResult.isDone());
        Assertions.assertTrue((boolean)requestResult.isCompletedExceptionally());
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)res.unsentRequests.size());
    }

    @ParameterizedTest
    @MethodSource(value={"offsetCommitExceptionSupplier"})
    public void testAutoCommitSyncBeforeRevocationRetriesOnRetriableAndStaleEpoch(Errors error) {
        CommitRequestManager commitRequestManager = this.create(true, Integer.MAX_VALUE);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        TopicPartition tp = new TopicPartition("topic", 1);
        this.subscriptionState.assignFromUser(Collections.singleton(tp));
        this.subscriptionState.seek(tp, 5L);
        long deadlineMs = this.time.milliseconds() + 200L;
        commitRequestManager.maybeAutoCommitSyncBeforeRevocation(deadlineMs);
        int newEpoch = 8;
        String memberId = "member1";
        if (error == Errors.STALE_MEMBER_EPOCH) {
            commitRequestManager.onMemberEpochUpdated(Optional.of(newEpoch), Optional.of(memberId));
        }
        this.completeOffsetCommitRequestWithError(commitRequestManager, error);
        if ((error.exception() instanceof RetriableException || error == Errors.STALE_MEMBER_EPOCH) && error != Errors.UNKNOWN_TOPIC_OR_PARTITION) {
            Assertions.assertEquals((int)1, (int)commitRequestManager.pendingRequests.unsentOffsetCommits.size(), (String)"Request to be retried should be added to the outbound queue");
            NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)0, (int)res.unsentRequests.size());
            this.time.sleep(100L);
            res = commitRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
            if (error == Errors.STALE_MEMBER_EPOCH) {
                OffsetCommitRequestData reqData = (OffsetCommitRequestData)((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0)).requestBuilder().build().data();
                Assertions.assertEquals((int)newEpoch, (int)reqData.generationIdOrMemberEpoch());
                Assertions.assertEquals((Object)memberId, (Object)reqData.memberId());
            }
        } else {
            Assertions.assertEquals((int)0, (int)commitRequestManager.pendingRequests.unsentOffsetCommits.size(), (String)"Non-retriable failed request should be removed from the outbound queue");
            NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)0, (int)res.unsentRequests.size());
            this.time.sleep(100L);
            res = commitRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)0, (int)res.unsentRequests.size());
        }
    }

    @Test
    public void testLastEpochSentOnCommit() {
        CommitRequestManager commitRequestManager = this.create(true, Integer.MAX_VALUE);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        TopicPartition tp = new TopicPartition("topic", 1);
        this.subscriptionState.assignFromUser(Collections.singleton(tp));
        this.subscriptionState.seek(tp, 100L);
        commitRequestManager.maybeAutoCommitSyncBeforeRevocation(Long.MAX_VALUE);
        int initialEpoch = 1;
        String memberId = "member1";
        commitRequestManager.onMemberEpochUpdated(Optional.of(initialEpoch), Optional.of(memberId));
        this.completeOffsetCommitRequestWithError(commitRequestManager, Errors.STALE_MEMBER_EPOCH);
        Assertions.assertEquals((int)initialEpoch, (Integer)commitRequestManager.lastEpochSentOnCommit().orElse(null));
        commitRequestManager.onMemberEpochUpdated(Optional.of(initialEpoch + 1), Optional.of(memberId));
        Assertions.assertEquals((int)initialEpoch, (Integer)((Integer)commitRequestManager.lastEpochSentOnCommit().get()));
        this.time.sleep(100L);
        this.completeOffsetCommitRequestWithError(commitRequestManager, Errors.STALE_MEMBER_EPOCH);
        Assertions.assertEquals((int)(initialEpoch + 1), (Integer)commitRequestManager.lastEpochSentOnCommit().orElse(null));
        commitRequestManager.onMemberEpochUpdated(Optional.empty(), Optional.empty());
        this.time.sleep(200L);
        this.completeOffsetCommitRequestWithError(commitRequestManager, Errors.STALE_MEMBER_EPOCH);
        Assertions.assertFalse((boolean)commitRequestManager.lastEpochSentOnCommit().isPresent());
    }

    @Test
    public void testEnsureCommitSensorRecordsMetric() {
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        this.commitOffsetWithAssertedLatency(commitRequestManager, 100L);
        this.commitOffsetWithAssertedLatency(commitRequestManager, 101L);
        Assertions.assertEquals((Object)100.5, (Object)this.getMetric("commit-latency-avg").metricValue());
        Assertions.assertEquals((Object)101.0, (Object)this.getMetric("commit-latency-max").metricValue());
        Assertions.assertEquals((double)0.066, (double)((Double)this.getMetric("commit-rate").metricValue()), (double)0.001);
        Assertions.assertEquals((Object)2.0, (Object)this.getMetric("commit-total").metricValue());
    }

    private void commitOffsetWithAssertedLatency(CommitRequestManager commitRequestManager, long latencyMs) {
        String topic = "topic";
        boolean partition = true;
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L));
        long commitCreationTimeMs = this.time.milliseconds();
        commitRequestManager.commitAsync(offsets);
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
        this.time.sleep(latencyMs);
        long commitReceivedTimeMs = this.time.milliseconds();
        ((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0)).future().complete(this.mockOffsetCommitResponse("topic", 1, (short)1, commitCreationTimeMs, commitReceivedTimeMs, Errors.NONE));
    }

    private void completeOffsetFetchRequestWithError(CommitRequestManager commitRequestManager, Set<TopicPartition> partitions, Errors error) {
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0)).future().complete(this.buildOffsetFetchClientResponse((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0), partitions, error));
    }

    private void completeOffsetCommitRequestWithError(CommitRequestManager commitRequestManager, Errors error) {
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0)).future().complete(this.mockOffsetCommitResponse("topic", 1, (short)1, error));
    }

    private void testRetriable(CommitRequestManager commitRequestManager, List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
        futures.forEach(f -> Assertions.assertFalse((boolean)f.isDone()));
        this.time.sleep(100L);
        commitRequestManager.poll(this.time.milliseconds());
        futures.forEach(f -> Assertions.assertFalse((boolean)f.isDone()));
    }

    private void testNonRetriable(List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
        futures.forEach(f -> Assertions.assertTrue((boolean)f.isCompletedExceptionally()));
    }

    private static Stream<Arguments> offsetCommitExceptionSupplier() {
        return Stream.of(Arguments.of((Object[])new Object[]{Errors.NOT_COORDINATOR}), Arguments.of((Object[])new Object[]{Errors.COORDINATOR_LOAD_IN_PROGRESS}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_SERVER_ERROR}), Arguments.of((Object[])new Object[]{Errors.GROUP_AUTHORIZATION_FAILED}), Arguments.of((Object[])new Object[]{Errors.OFFSET_METADATA_TOO_LARGE}), Arguments.of((Object[])new Object[]{Errors.INVALID_COMMIT_OFFSET_SIZE}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_TOPIC_OR_PARTITION}), Arguments.of((Object[])new Object[]{Errors.COORDINATOR_NOT_AVAILABLE}), Arguments.of((Object[])new Object[]{Errors.REQUEST_TIMED_OUT}), Arguments.of((Object[])new Object[]{Errors.FENCED_INSTANCE_ID}), Arguments.of((Object[])new Object[]{Errors.TOPIC_AUTHORIZATION_FAILED}), Arguments.of((Object[])new Object[]{Errors.STALE_MEMBER_EPOCH}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_MEMBER_ID}));
    }

    private static Stream<Arguments> offsetFetchExceptionSupplier() {
        return Stream.of(Arguments.of((Object[])new Object[]{Errors.NOT_COORDINATOR}), Arguments.of((Object[])new Object[]{Errors.COORDINATOR_LOAD_IN_PROGRESS}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_SERVER_ERROR}), Arguments.of((Object[])new Object[]{Errors.GROUP_AUTHORIZATION_FAILED}), Arguments.of((Object[])new Object[]{Errors.OFFSET_METADATA_TOO_LARGE}), Arguments.of((Object[])new Object[]{Errors.INVALID_COMMIT_OFFSET_SIZE}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_TOPIC_OR_PARTITION}), Arguments.of((Object[])new Object[]{Errors.COORDINATOR_NOT_AVAILABLE}), Arguments.of((Object[])new Object[]{Errors.REQUEST_TIMED_OUT}), Arguments.of((Object[])new Object[]{Errors.FENCED_INSTANCE_ID}), Arguments.of((Object[])new Object[]{Errors.TOPIC_AUTHORIZATION_FAILED}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_MEMBER_ID}), Arguments.of((Object[])new Object[]{Errors.STALE_MEMBER_EPOCH}), Arguments.of((Object[])new Object[]{Errors.UNSTABLE_OFFSET_COMMIT}));
    }

    private static Stream<Arguments> offsetFetchRetriableCoordinatorErrors() {
        return Stream.of(Arguments.of((Object[])new Object[]{Errors.NOT_COORDINATOR, true}), Arguments.of((Object[])new Object[]{Errors.COORDINATOR_NOT_AVAILABLE, true}), Arguments.of((Object[])new Object[]{Errors.COORDINATOR_LOAD_IN_PROGRESS, false}));
    }

    @ParameterizedTest
    @MethodSource(value={"partitionDataErrorSupplier"})
    public void testOffsetFetchRequestPartitionDataError(Errors error, boolean isRetriable) {
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        TopicPartition tp1 = new TopicPartition("t1", 2);
        TopicPartition tp2 = new TopicPartition("t2", 3);
        partitions.add(tp1);
        partitions.add(tp2);
        long deadlineMs = this.time.milliseconds() + 60000L;
        CompletableFuture future = commitRequestManager.fetchOffsets(partitions, deadlineMs);
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
        HashMap<TopicPartition, OffsetFetchResponse.PartitionData> topicPartitionData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
        topicPartitionData.put(tp1, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", error));
        topicPartitionData.put(tp2, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", Errors.NONE));
        ((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0)).handler().onComplete(this.buildOffsetFetchClientResponse((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0), topicPartitionData, Errors.NONE, false));
        if (isRetriable) {
            this.testRetriable(commitRequestManager, Collections.singletonList(future));
        } else {
            this.testNonRetriable(Collections.singletonList(future));
        }
    }

    @Test
    public void testSignalClose() {
        CommitRequestManager commitRequestManager = this.create(true, 100L);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L));
        commitRequestManager.commitAsync(offsets);
        commitRequestManager.signalClose();
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
        OffsetCommitRequestData data = (OffsetCommitRequestData)((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0)).requestBuilder().build().data();
        Assertions.assertEquals((Object)"topic", (Object)((OffsetCommitRequestData.OffsetCommitRequestTopic)data.topics().get(0)).name());
    }

    private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManager) {
        Assertions.assertTrue((boolean)commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty());
        Assertions.assertTrue((boolean)commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());
        Assertions.assertTrue((boolean)commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty());
    }

    private static Stream<Arguments> partitionDataErrorSupplier() {
        return Stream.of(Arguments.of((Object[])new Object[]{Errors.UNSTABLE_OFFSET_COMMIT, true}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_TOPIC_OR_PARTITION, false}), Arguments.of((Object[])new Object[]{Errors.TOPIC_AUTHORIZATION_FAILED, false}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_SERVER_ERROR, false}));
    }

    private List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> sendAndVerifyDuplicatedOffsetFetchRequests(CommitRequestManager commitRequestManager, Set<TopicPartition> partitions, int numRequest, Errors error) {
        ArrayList<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures = new ArrayList<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>>();
        long deadlineMs = this.time.milliseconds() + 60000L;
        for (int i = 0; i < numRequest; ++i) {
            futures.add(commitRequestManager.fetchOffsets(partitions, deadlineMs));
        }
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0)).handler().onComplete(this.buildOffsetFetchClientResponse((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0), partitions, error));
        res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)res.unsentRequests.size());
        return futures;
    }

    private void sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(CommitRequestManager commitRequestManager, Errors error, CompletableFuture<Void> commitResult) {
        this.completeOffsetCommitRequestWithError(commitRequestManager, error);
        NetworkClientDelegate.PollResult res = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)res.unsentRequests.size());
        if (error.exception() instanceof RetriableException) {
            Assertions.assertFalse((boolean)commitResult.isDone());
        } else {
            Assertions.assertTrue((boolean)commitResult.isDone());
            Assertions.assertTrue((boolean)commitResult.isCompletedExceptionally());
        }
    }

    private List<NetworkClientDelegate.FutureCompletionHandler> assertPoll(int numRes, CommitRequestManager manager) {
        return this.assertPoll(true, numRes, manager);
    }

    private List<NetworkClientDelegate.FutureCompletionHandler> assertPoll(boolean coordinatorDiscovered, int numRes, CommitRequestManager manager) {
        if (coordinatorDiscovered) {
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        } else {
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
        }
        NetworkClientDelegate.PollResult res = manager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)numRes, (int)res.unsentRequests.size());
        return res.unsentRequests.stream().map(NetworkClientDelegate.UnsentRequest::handler).collect(Collectors.toList());
    }

    private CommitRequestManager create(boolean autoCommitEnabled, long autoCommitInterval) {
        this.props.setProperty("auto.commit.interval.ms", String.valueOf(autoCommitInterval));
        this.props.setProperty("enable.auto.commit", String.valueOf(autoCommitEnabled));
        if (autoCommitEnabled) {
            this.props.setProperty("group.id", TestUtils.randomString(10));
        }
        return (CommitRequestManager)Mockito.spy((Object)new CommitRequestManager((Time)this.time, this.logContext, this.subscriptionState, new ConsumerConfig(this.props), this.coordinatorRequestManager, this.offsetCommitCallbackInvoker, "group-id", Optional.of("group-instance-id"), 100L, 1000L, OptionalDouble.of(0.0), this.metrics));
    }

    private ClientResponse buildOffsetFetchClientResponse(NetworkClientDelegate.UnsentRequest request, Set<TopicPartition> topicPartitions, Errors error) {
        HashMap<TopicPartition, OffsetFetchResponse.PartitionData> topicPartitionData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
        topicPartitions.forEach(tp -> topicPartitionData.put((TopicPartition)tp, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", Errors.NONE)));
        return this.buildOffsetFetchClientResponse(request, topicPartitionData, error, false);
    }

    private ClientResponse buildOffsetFetchClientResponseDisconnected(NetworkClientDelegate.UnsentRequest request) {
        return this.buildOffsetFetchClientResponse(request, Collections.emptyMap(), Errors.NONE, true);
    }

    private ClientResponse buildOffsetCommitClientResponse(OffsetCommitResponse commitResponse) {
        short apiVersion = 1;
        return new ClientResponse(new RequestHeader(ApiKeys.OFFSET_COMMIT, apiVersion, "", 1), null, "-1", this.time.milliseconds(), this.time.milliseconds(), false, null, null, (AbstractResponse)commitResponse);
    }

    public ClientResponse mockOffsetCommitResponse(String topic, int partition, short apiKeyVersion, Errors error) {
        return this.mockOffsetCommitResponse(topic, partition, apiKeyVersion, this.time.milliseconds(), this.time.milliseconds(), error);
    }

    public ClientResponse mockOffsetCommitResponse(String topic, int partition, short apiKeyVersion, long createdTimeMs, long receivedTimeMs, Errors error) {
        OffsetCommitResponseData responseData = new OffsetCommitResponseData().setTopics(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(topic).setPartitions(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponsePartition().setErrorCode(error.code()).setPartitionIndex(partition)))));
        OffsetCommitResponse response = (OffsetCommitResponse)Mockito.mock(OffsetCommitResponse.class);
        Mockito.when((Object)response.data()).thenReturn((Object)responseData);
        return new ClientResponse(new RequestHeader(ApiKeys.OFFSET_COMMIT, apiKeyVersion, "", 1), null, "-1", createdTimeMs, receivedTimeMs, false, null, null, (AbstractResponse)new OffsetCommitResponse(responseData));
    }

    public ClientResponse mockOffsetCommitResponseDisconnected(String topic, int partition, short apiKeyVersion, NetworkClientDelegate.UnsentRequest unsentRequest) {
        OffsetCommitResponseData responseData = new OffsetCommitResponseData().setTopics(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(topic).setPartitions(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponsePartition().setErrorCode(Errors.NONE.code()).setPartitionIndex(partition)))));
        OffsetCommitResponse response = (OffsetCommitResponse)Mockito.mock(OffsetCommitResponse.class);
        Mockito.when((Object)response.data()).thenReturn((Object)responseData);
        return new ClientResponse(new RequestHeader(ApiKeys.OFFSET_COMMIT, apiKeyVersion, "", 1), (RequestCompletionHandler)unsentRequest.handler(), "-1", this.time.milliseconds(), this.time.milliseconds(), true, null, null, (AbstractResponse)new OffsetCommitResponse(responseData));
    }

    private ClientResponse buildOffsetFetchClientResponse(NetworkClientDelegate.UnsentRequest request, Map<TopicPartition, OffsetFetchResponse.PartitionData> topicPartitionData, Errors error, boolean disconnected) {
        AbstractRequest abstractRequest = request.requestBuilder().build();
        Assertions.assertInstanceOf(OffsetFetchRequest.class, (Object)abstractRequest);
        OffsetFetchRequest offsetFetchRequest = (OffsetFetchRequest)abstractRequest;
        OffsetFetchResponse response = new OffsetFetchResponse(error, topicPartitionData);
        return new ClientResponse(new RequestHeader(ApiKeys.OFFSET_FETCH, offsetFetchRequest.version(), "", 1), (RequestCompletionHandler)request.handler(), "-1", this.time.milliseconds(), this.time.milliseconds(), disconnected, null, null, (AbstractResponse)response);
    }

    private KafkaMetric getMetric(String name) {
        return (KafkaMetric)this.metrics.metrics().get(this.metrics.metricName(name, CONSUMER_COORDINATOR_METRICS));
    }
}

