package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.admin.NewTopicTest;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.class */
public class CommitRequestManagerTest {
    private static final String CONSUMER_COORDINATOR_METRICS = "consumer-coordinator-metrics";
    private SubscriptionState subscriptionState;
    private LogContext logContext;
    private MockTime time;
    private CoordinatorRequestManager coordinatorRequestManager;
    private OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
    private Properties props;
    private final long retryBackoffMs = 100;
    private final long retryBackoffMaxMs = 1000;
    private final Node mockedNode = new Node(1, "host1", 9092);
    private final Metrics metrics = new Metrics();
    private final int defaultApiTimeoutMs = 60000;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.clients.consumer.internals.CommitRequestManagerTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$protocol$Errors = new int[Errors.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.NOT_COORDINATOR.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.COORDINATOR_NOT_AVAILABLE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.REQUEST_TIMED_OUT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.UNKNOWN_TOPIC_OR_PARTITION.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.COORDINATOR_LOAD_IN_PROGRESS.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.GROUP_AUTHORIZATION_FAILED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.TOPIC_AUTHORIZATION_FAILED.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.OFFSET_METADATA_TOO_LARGE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.INVALID_COMMIT_OFFSET_SIZE.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.FENCED_INSTANCE_ID.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
        }
    }

    @BeforeEach
    public void setup() {
        this.logContext = new LogContext();
        this.time = new MockTime(0L);
        this.subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        this.coordinatorRequestManager = (CoordinatorRequestManager) Mockito.mock(CoordinatorRequestManager.class);
        this.offsetCommitCallbackInvoker = (OffsetCommitCallbackInvoker) Mockito.mock(OffsetCommitCallbackInvoker.class);
        this.props = new Properties();
        this.props.put("auto.commit.interval.ms", 100);
        this.props.put("key.deserializer", StringDeserializer.class);
        this.props.put("value.deserializer", StringDeserializer.class);
    }

    @Test
    public void testOffsetFetchRequestStateToStringBase() {
        CommitRequestManager commitRequestManager = new CommitRequestManager(this.time, this.logContext, this.subscriptionState, (ConsumerConfig) Mockito.mock(ConsumerConfig.class), this.coordinatorRequestManager, this.offsetCommitCallbackInvoker, "group-id", Optional.of("group-instance-id"), 100L, 1000L, OptionalDouble.of(0.0d), this.metrics);
        commitRequestManager.onMemberEpochUpdated(Optional.of(1), Optional.empty());
        CommitRequestManager.OffsetFetchRequestState createOffsetFetchRequest = commitRequestManager.createOffsetFetchRequest(Collections.singleton(new TopicPartition("topic-1", 1)), 0L);
        TimedRequestState timedRequestState = new TimedRequestState(this.logContext, CommitRequestManager.class.getSimpleName(), 100L, 2, 1000L, 0.0d, TimedRequestState.deadlineTimer(this.time, 0L));
        String str = timedRequestState.toStringBase() + ", " + createOffsetFetchRequest.memberInfo + ", requestedPartitions=" + createOffsetFetchRequest.requestedPartitions;
        timedRequestState.getClass();
        Assertions.assertDoesNotThrow(timedRequestState::toString);
        Assertions.assertFalse(str.contains("Optional"));
        Assertions.assertEquals(str, createOffsetFetchRequest.toStringBase());
    }

    @Test
    public void testPollSkipIfCoordinatorUnknown() {
        CommitRequestManager create = create(false, 0L);
        assertPoll(false, 0, create);
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0L));
        create.commitAsync(hashMap);
        assertPoll(false, 0, create);
    }

    @Test
    public void testAsyncCommitWhileCoordinatorUnknownIsSentOutWhenCoordinatorDiscovered() {
        CommitRequestManager create = create(false, 0L);
        assertPoll(false, 0, create);
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0L));
        create.commitAsync(hashMap);
        assertPoll(false, 0, create);
        assertPoll(true, 1, create);
    }

    @Test
    public void testPollEnsureManualCommitSent() {
        CommitRequestManager create = create(false, 0L);
        assertPoll(0, create);
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0L));
        create.commitAsync(hashMap);
        assertPoll(1, create);
    }

    @Test
    public void testPollEnsureAutocommitSent() {
        TopicPartition topicPartition = new TopicPartition("t1", 1);
        this.subscriptionState.assignFromUser(Collections.singleton(topicPartition));
        this.subscriptionState.seek(topicPartition, 100L);
        CommitRequestManager create = create(true, 100L);
        assertPoll(0, create);
        create.updateAutoCommitTimer(this.time.milliseconds());
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        assertPoll(1, create).forEach(futureCompletionHandler -> {
            futureCompletionHandler.onComplete(mockOffsetCommitResponse("t1", 1, (short) 1, Errors.NONE));
        });
        Assertions.assertEquals(0.03d, ((Double) getMetric("commit-rate").metricValue()).doubleValue(), 0.01d);
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("commit-total").metricValue());
    }

    @Test
    public void testPollEnsureCorrectInflightRequestBufferSize() {
        CommitRequestManager create = create(false, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("test", 0), new OffsetAndMetadata(10L));
        hashMap.put(new TopicPartition("test", 1), new OffsetAndMetadata(20L));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new TopicPartition("test", 3), new OffsetAndMetadata(20L));
        hashMap2.put(new TopicPartition("test", 4), new OffsetAndMetadata(20L));
        long milliseconds = this.time.milliseconds() + 60000;
        create.commitSync(hashMap, milliseconds);
        create.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), milliseconds);
        create.commitSync(hashMap2, milliseconds);
        create.fetchOffsets(Collections.singleton(new TopicPartition("test", 1)), milliseconds);
        NetworkClientDelegate.PollResult poll = create.poll(this.time.milliseconds());
        Assertions.assertEquals(4, poll.unsentRequests.size());
        Assertions.assertTrue(poll.unsentRequests.stream().anyMatch(unsentRequest -> {
            return unsentRequest.requestBuilder() instanceof OffsetCommitRequest.Builder;
        }));
        Assertions.assertTrue(poll.unsentRequests.stream().anyMatch(unsentRequest2 -> {
            return unsentRequest2.requestBuilder() instanceof OffsetFetchRequest.Builder;
        }));
        Assertions.assertFalse(create.pendingRequests.hasUnsentRequests());
        Assertions.assertEquals(2, create.pendingRequests.inflightOffsetFetches.size());
        poll.unsentRequests.forEach(unsentRequest3 -> {
            if (unsentRequest3.requestBuilder() instanceof OffsetFetchRequest.Builder) {
                unsentRequest3.handler().onComplete(buildOffsetFetchClientResponse(unsentRequest3, Collections.emptySet(), Errors.NONE));
            } else {
                unsentRequest3.handler().onComplete(buildOffsetCommitClientResponse(new OffsetCommitResponse(0, new HashMap())));
            }
        });
        Assertions.assertEquals(0, create.pendingRequests.inflightOffsetFetches.size());
    }

    @Test
    public void testPollEnsureEmptyPendingRequestAfterPoll() {
        CommitRequestManager create = create(true, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        create.commitAsync(Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L)));
        Assertions.assertEquals(1, create.unsentOffsetCommitRequests().size());
        Assertions.assertEquals(1, create.poll(this.time.milliseconds()).unsentRequests.size());
        Assertions.assertTrue(create.unsentOffsetCommitRequests().isEmpty());
        assertEmptyPendingRequests(create);
    }

    @Test
    public void testAsyncAutocommitNotRetriedAfterException() {
        CommitRequestManager create = create(true, 200L);
        TopicPartition topicPartition = new TopicPartition("topic", 1);
        this.subscriptionState.assignFromUser(Collections.singleton(topicPartition));
        this.subscriptionState.seek(topicPartition, 100L);
        this.time.sleep(200L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        assertPoll(1, create).get(0).onComplete(mockOffsetCommitResponse("topic", 1, (short) 1, Errors.COORDINATOR_LOAD_IN_PROGRESS));
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        assertPoll(0, create);
        create.updateAutoCommitTimer(this.time.milliseconds());
        this.time.sleep(200L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        List<NetworkClientDelegate.FutureCompletionHandler> assertPoll = assertPoll(1, create);
        assertEmptyPendingRequests(create);
        assertPoll.get(0).onComplete(mockOffsetCommitResponse("topic", 1, (short) 1, Errors.NONE));
    }

    @MethodSource({"offsetCommitExceptionSupplier"})
    @ParameterizedTest
    public void testCommitSyncRetriedAfterExpectedRetriableException(Errors errors) {
        CommitRequestManager create = create(false, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(create, errors, create.commitSync(Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L)), this.time.milliseconds() + 60000));
        assertExceptionHandling(create, errors, true);
    }

    @MethodSource({"commitSyncExpectedExceptions"})
    @ParameterizedTest
    public void testCommitSyncFailsWithExpectedException(Errors errors, Class<? extends Exception> cls) {
        CommitRequestManager create = create(false, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        CompletableFuture commitSync = create.commitSync(Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L)), this.time.milliseconds() + 100);
        completeOffsetCommitRequestWithError(create, errors);
        TestUtils.assertFutureThrows(commitSync, cls);
    }

    private static Stream<Arguments> commitSyncExpectedExceptions() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{Errors.FENCED_INSTANCE_ID, CommitFailedException.class}), Arguments.of(new Object[]{Errors.UNKNOWN_MEMBER_ID, CommitFailedException.class}), Arguments.of(new Object[]{Errors.OFFSET_METADATA_TOO_LARGE, Errors.OFFSET_METADATA_TOO_LARGE.exception().getClass()}), Arguments.of(new Object[]{Errors.INVALID_COMMIT_OFFSET_SIZE, Errors.INVALID_COMMIT_OFFSET_SIZE.exception().getClass()}), Arguments.of(new Object[]{Errors.GROUP_AUTHORIZATION_FAILED, Errors.GROUP_AUTHORIZATION_FAILED.exception().getClass()}), Arguments.of(new Object[]{Errors.CORRUPT_MESSAGE, KafkaException.class}), Arguments.of(new Object[]{Errors.UNKNOWN_SERVER_ERROR, KafkaException.class})});
    }

    @Test
    public void testCommitSyncFailsWithCommitFailedExceptionIfUnknownMemberId() {
        CommitRequestManager create = create(false, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        CompletableFuture commitSync = create.commitSync(Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L)), this.time.milliseconds() + 60000);
        completeOffsetCommitRequestWithError(create, Errors.UNKNOWN_MEMBER_ID);
        Assertions.assertEquals(0, create.poll(this.time.milliseconds()).unsentRequests.size());
        Assertions.assertTrue(commitSync.isDone());
        TestUtils.assertFutureThrows(commitSync, CommitFailedException.class);
    }

    @Test
    public void testCommitSyncFailsWithCommitFailedExceptionOnStaleMemberEpoch() {
        CommitRequestManager create = create(true, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        CompletableFuture commitSync = create.commitSync(Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L)), this.time.milliseconds() + 60000);
        completeOffsetCommitRequestWithError(create, Errors.STALE_MEMBER_EPOCH);
        Assertions.assertEquals(0, create.poll(this.time.milliseconds()).unsentRequests.size());
        Assertions.assertTrue(commitSync.isDone());
        TestUtils.assertFutureThrows(commitSync, CommitFailedException.class);
    }

    @Test
    public void testAutoCommitAsyncFailsWithStaleMemberEpochContinuesToCommitOnTheInterval() {
        CommitRequestManager create = create(true, 100L);
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        this.subscriptionState.assignFromUser(Collections.singleton(topicPartition));
        this.subscriptionState.seek(topicPartition, 10L);
        create.maybeAutoCommitAsync();
        completeOffsetCommitRequestWithError(create, Errors.STALE_MEMBER_EPOCH);
        ((CommitRequestManager) Mockito.verify(create)).resetAutoCommitTimer();
        Assertions.assertEquals(0, create.poll(this.time.milliseconds()).unsentRequests.size(), "No request should be generated until the interval expires");
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        Assertions.assertEquals(1, create.poll(this.time.milliseconds()).unsentRequests.size());
    }

    @Test
    public void testCommitAsyncFailsWithRetriableOnCoordinatorDisconnected() {
        CommitRequestManager create = create(false, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        CompletableFuture commitAsync = create.commitAsync(Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L)));
        NetworkClientDelegate.PollResult poll = create.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        mockOffsetCommitResponseDisconnected("topic", 1, (short) 1, (NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).onComplete();
        Assertions.assertTrue(commitAsync.isDone());
        TestUtils.assertFutureThrows(commitAsync, RetriableCommitFailedException.class);
        assertCoordinatorDisconnect();
    }

    @Test
    public void testAutocommitEnsureOnlyOneInflightRequest() {
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        this.subscriptionState.assignFromUser(Collections.singleton(topicPartition));
        this.subscriptionState.seek(topicPartition, 100L);
        CommitRequestManager create = create(true, 100L);
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        List<NetworkClientDelegate.FutureCompletionHandler> assertPoll = assertPoll(1, create);
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        assertPoll(0, create);
        assertEmptyPendingRequests(create);
        assertPoll.get(0).onComplete(buildOffsetCommitClientResponse(new OffsetCommitResponse(0, new HashMap())));
        assertPoll(1, create);
    }

    @Test
    public void testAutoCommitBeforeRevocationNotBlockedByAutoCommitOnIntervalInflightRequest() {
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        this.subscriptionState.assignFromUser(Collections.singleton(topicPartition));
        this.subscriptionState.seek(topicPartition, 100L);
        CommitRequestManager create = create(true, 100L);
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        NetworkClientDelegate.PollResult poll = create.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        NetworkClientDelegate.FutureCompletionHandler handler = ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).handler();
        CompletableFuture maybeAutoCommitSyncBeforeRevocation = create.maybeAutoCommitSyncBeforeRevocation(200L);
        Assertions.assertEquals(1, create.pendingRequests.unsentOffsetCommits.size());
        handler.onComplete(buildOffsetCommitClientResponse(new OffsetCommitResponse(0, new HashMap())));
        Assertions.assertFalse(maybeAutoCommitSyncBeforeRevocation.isDone(), "Auto-commit before revocation should not complete until it receives a response");
    }

    @Test
    public void testAutocommitInterceptorsInvoked() {
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        this.subscriptionState.assignFromUser(Collections.singleton(topicPartition));
        this.subscriptionState.seek(topicPartition, 100L);
        CommitRequestManager create = create(true, 100L);
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        assertPoll(1, create).get(0).onComplete(buildOffsetCommitClientResponse(new OffsetCommitResponse(0, new HashMap())));
        ((OffsetCommitCallbackInvoker) Mockito.verify(this.offsetCommitCallbackInvoker)).enqueueInterceptorInvocation((Map) ArgumentMatchers.eq(Collections.singletonMap(topicPartition, new OffsetAndMetadata(100L))));
    }

    @Test
    public void testAutocommitInterceptorsNotInvokedOnError() {
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        this.subscriptionState.assignFromUser(Collections.singleton(topicPartition));
        this.subscriptionState.seek(topicPartition, 100L);
        CommitRequestManager create = create(true, 100L);
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        assertPoll(1, create).get(0).onComplete(buildOffsetCommitClientResponse(new OffsetCommitResponse(0, Collections.singletonMap(topicPartition, Errors.NETWORK_EXCEPTION))));
        ((OffsetCommitCallbackInvoker) Mockito.verify(this.offsetCommitCallbackInvoker, Mockito.never())).enqueueInterceptorInvocation((Map) ArgumentMatchers.any());
    }

    @Test
    public void testAutoCommitEmptyOffsetsDoesNotGenerateRequest() {
        CommitRequestManager create = create(true, 100L);
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        create.maybeAutoCommitAsync();
        Assertions.assertTrue(create.pendingRequests.unsentOffsetCommits.isEmpty());
        ((CommitRequestManager) Mockito.verify(create)).resetAutoCommitTimer();
    }

    @Test
    public void testAutoCommitEmptyDoesNotLeaveInflightRequestFlagOn() {
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        this.subscriptionState.assignFromUser(Collections.singleton(topicPartition));
        CommitRequestManager create = create(true, 100L);
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        create.maybeAutoCommitAsync();
        this.subscriptionState.seek(topicPartition, 100L);
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        create.maybeAutoCommitAsync();
        Assertions.assertEquals(1, create.pendingRequests.unsentOffsetCommits.size());
        ((CommitRequestManager) Mockito.verify(create, Mockito.times(2))).resetAutoCommitTimer();
    }

    @Test
    public void testAutoCommitOnIntervalSkippedIfPreviousOneInFlight() {
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        this.subscriptionState.assignFromUser(Collections.singleton(topicPartition));
        this.subscriptionState.seek(topicPartition, 100L);
        CommitRequestManager create = create(true, 100L);
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        create.maybeAutoCommitAsync();
        List<NetworkClientDelegate.FutureCompletionHandler> assertPoll = assertPoll(1, create);
        Assertions.assertEquals(1, assertPoll.size());
        NetworkClientDelegate.FutureCompletionHandler futureCompletionHandler = assertPoll.get(0);
        ((CommitRequestManager) Mockito.verify(create, Mockito.times(1))).resetAutoCommitTimer();
        Mockito.clearInvocations(new CommitRequestManager[]{create});
        this.time.sleep(100L);
        create.updateAutoCommitTimer(this.time.milliseconds());
        create.maybeAutoCommitAsync();
        assertPoll(0, create);
        ((CommitRequestManager) Mockito.verify(create, Mockito.never())).resetAutoCommitTimer();
        futureCompletionHandler.onComplete(mockOffsetCommitResponse(topicPartition.topic(), topicPartition.partition(), (short) 1, Errors.NONE));
        assertPoll(1, create);
    }

    @Test
    public void testOffsetFetchRequestEnsureDuplicatedRequestSucceed() {
        CommitRequestManager create = create(true, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashSet hashSet = new HashSet();
        hashSet.add(new TopicPartition("t1", 0));
        sendAndVerifyDuplicatedOffsetFetchRequests(create, hashSet, 2, Errors.NONE).forEach(completableFuture -> {
            Assertions.assertTrue(completableFuture.isDone());
            Assertions.assertFalse(completableFuture.isCompletedExceptionally());
        });
        create.poll(0L);
        assertEmptyPendingRequests(create);
    }

    @MethodSource({"offsetFetchExceptionSupplier"})
    @ParameterizedTest
    public void testOffsetFetchRequestErroredRequests(Errors errors) {
        CommitRequestManager create = create(true, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashSet hashSet = new HashSet();
        hashSet.add(new TopicPartition("t1", 0));
        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> sendAndVerifyDuplicatedOffsetFetchRequests = sendAndVerifyDuplicatedOffsetFetchRequests(create, hashSet, 1, errors);
        if (isRetriableOnOffsetFetch(errors)) {
            testRetriable(create, sendAndVerifyDuplicatedOffsetFetchRequests);
        } else {
            testNonRetriable(sendAndVerifyDuplicatedOffsetFetchRequests);
            assertEmptyPendingRequests(create);
        }
    }

    @MethodSource({"offsetFetchExceptionSupplier"})
    @ParameterizedTest
    public void testOffsetFetchRequestTimeoutRequests(Errors errors) {
        CommitRequestManager create = create(true, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashSet hashSet = new HashSet();
        hashSet.add(new TopicPartition("t1", 0));
        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> sendAndVerifyDuplicatedOffsetFetchRequests = sendAndVerifyDuplicatedOffsetFetchRequests(create, hashSet, 1, errors);
        if (!isRetriableOnOffsetFetch(errors)) {
            sendAndVerifyDuplicatedOffsetFetchRequests.forEach(completableFuture -> {
                TestUtils.assertFutureThrows(completableFuture, KafkaException.class);
            });
            assertEmptyPendingRequests(create);
            return;
        }
        sendAndVerifyDuplicatedOffsetFetchRequests.forEach(completableFuture2 -> {
            Assertions.assertFalse(completableFuture2.isDone());
        });
        this.time.sleep(60000L);
        Assertions.assertFalse(create.pendingRequests.unsentOffsetFetches.isEmpty());
        create.poll(this.time.milliseconds());
        sendAndVerifyDuplicatedOffsetFetchRequests.forEach(completableFuture3 -> {
            TestUtils.assertFutureThrows(completableFuture3, TimeoutException.class);
        });
        Assertions.assertTrue(create.pendingRequests.unsentOffsetFetches.isEmpty());
    }

    private boolean isRetriableOnOffsetFetch(Errors errors) {
        return errors == Errors.NOT_COORDINATOR || errors == Errors.COORDINATOR_LOAD_IN_PROGRESS || errors == Errors.COORDINATOR_NOT_AVAILABLE;
    }

    @Test
    public void testSuccessfulOffsetFetch() {
        CommitRequestManager create = create(false, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        CompletableFuture fetchOffsets = create.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), this.time.milliseconds() + 60000);
        NetworkClientDelegate.PollResult poll = create.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        Assertions.assertEquals(1, create.pendingRequests.inflightOffsetFetches.size());
        Assertions.assertFalse(fetchOffsets.isDone());
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        NetworkClientDelegate.UnsentRequest unsentRequest = (NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0);
        unsentRequest.handler().onComplete(buildOffsetFetchClientResponse(unsentRequest, Collections.singletonMap(topicPartition, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "", Errors.NONE)), Errors.NONE, false));
        Assertions.assertTrue(fetchOffsets.isDone());
        Assertions.assertFalse(fetchOffsets.isCompletedExceptionally());
        Map map = null;
        try {
            map = (Map) fetchOffsets.get();
        } catch (InterruptedException | ExecutionException e) {
            Assertions.fail(e);
        }
        Assertions.assertNotNull(map);
        Assertions.assertEquals(1, map.size());
        Assertions.assertTrue(map.containsKey(topicPartition));
        Assertions.assertEquals(100L, ((OffsetAndMetadata) map.get(topicPartition)).offset());
        Assertions.assertEquals(0, create.pendingRequests.inflightOffsetFetches.size(), "Inflight request should be removed from the queue when a response is received.");
    }

    @MethodSource({"offsetFetchRetriableCoordinatorErrors"})
    @ParameterizedTest
    public void testOffsetFetchMarksCoordinatorUnknownOnRetriableCoordinatorErrors(Errors errors, boolean z) {
        CommitRequestManager create = create(false, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashSet hashSet = new HashSet();
        hashSet.add(new TopicPartition("t1", 0));
        CompletableFuture fetchOffsets = create.fetchOffsets(hashSet, this.time.milliseconds() + 60000);
        completeOffsetFetchRequestWithError(create, hashSet, errors);
        Assertions.assertFalse(fetchOffsets.isDone());
        if (z) {
            assertCoordinatorDisconnect();
        }
        Assertions.assertEquals(0, create.poll(this.time.milliseconds()).unsentRequests.size());
        this.time.sleep(100L);
        Assertions.assertEquals(1, create.poll(this.time.milliseconds()).unsentRequests.size());
    }

    @Test
    public void testOffsetFetchMarksCoordinatorUnknownOnCoordinatorDisconnectedAndRetries() {
        CommitRequestManager create = create(true, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashSet hashSet = new HashSet();
        hashSet.add(new TopicPartition("t1", 0));
        CompletableFuture fetchOffsets = create.fetchOffsets(hashSet, this.time.milliseconds() + 60000);
        NetworkClientDelegate.PollResult poll = create.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        buildOffsetFetchClientResponseDisconnected((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).onComplete();
        Assertions.assertFalse(fetchOffsets.isDone());
        assertCoordinatorDisconnect();
        this.time.sleep(100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        Assertions.assertEquals(1, create.poll(this.time.milliseconds()).unsentRequests.size());
        Assertions.assertEquals(1, create.pendingRequests.inflightOffsetFetches.size());
    }

    @MethodSource({"offsetCommitExceptionSupplier"})
    @ParameterizedTest
    public void testOffsetCommitRequestErroredRequestsNotRetriedForAsyncCommit(Errors errors) {
        CommitRequestManager create = create(true, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        CompletableFuture commitAsync = create.commitAsync(Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L)));
        completeOffsetCommitRequestWithError(create, errors);
        Assertions.assertEquals(0, create.poll(this.time.milliseconds()).unsentRequests.size());
        Assertions.assertTrue(commitAsync.isDone());
        Assertions.assertTrue(commitAsync.isCompletedExceptionally());
        if (errors.exception() instanceof RetriableException) {
            TestUtils.assertFutureThrows(commitAsync, RetriableCommitFailedException.class);
        }
        assertExceptionHandling(create, errors, false);
    }

    @Test
    public void testOffsetCommitSyncTimeoutNotReturnedOnPollAndFails() {
        CommitRequestManager create = create(false, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        CompletableFuture commitSync = create.commitSync(Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L)), this.time.milliseconds() + 200);
        completeOffsetCommitRequestWithError(create, Errors.REQUEST_TIMED_OUT);
        this.time.sleep(100L);
        completeOffsetCommitRequestWithError(create, Errors.REQUEST_TIMED_OUT);
        Assertions.assertFalse(commitSync.isDone());
        this.time.sleep(100L);
        Assertions.assertEquals(0, create.poll(this.time.milliseconds()).unsentRequests.size());
        Assertions.assertTrue(commitSync.isDone());
        Assertions.assertTrue(commitSync.isCompletedExceptionally());
    }

    @MethodSource({"offsetCommitExceptionSupplier"})
    @ParameterizedTest
    public void testOffsetCommitSyncFailedWithRetriableThrowsTimeoutWhenRetryTimeExpires(Errors errors) {
        CommitRequestManager create = create(false, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        Map singletonMap = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L));
        long milliseconds = this.time.milliseconds() + 200;
        CompletableFuture commitSync = create.commitSync(singletonMap, milliseconds);
        completeOffsetCommitRequestWithError(create, errors);
        this.time.sleep(milliseconds);
        Assertions.assertEquals(0, create.poll(this.time.milliseconds()).unsentRequests.size());
        Assertions.assertTrue(commitSync.isDone());
        if (errors.exception() instanceof RetriableException) {
            TestUtils.assertFutureThrows(commitSync, TimeoutException.class);
        } else {
            TestUtils.assertFutureThrows(commitSync, KafkaException.class);
        }
    }

    @Test
    public void testOffsetCommitAsyncFailedWithRetriableThrowsRetriableCommitException() {
        CommitRequestManager create = create(true, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        Map singletonMap = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L));
        Errors errors = Errors.COORDINATOR_NOT_AVAILABLE;
        CompletableFuture commitAsync = create.commitAsync(singletonMap);
        completeOffsetCommitRequestWithError(create, errors);
        Assertions.assertEquals(0, create.poll(this.time.milliseconds()).unsentRequests.size());
        Assertions.assertTrue(commitAsync.isDone());
        Assertions.assertTrue(commitAsync.isCompletedExceptionally());
        assertExceptionHandling(create, errors, false);
        TestUtils.assertFutureThrows(commitAsync, RetriableCommitFailedException.class);
    }

    @Test
    public void testEnsureBackoffRetryOnOffsetCommitRequestTimeout() {
        CommitRequestManager create = create(true, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        create.commitSync(Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L)), this.time.milliseconds() + 60000);
        NetworkClientDelegate.PollResult poll = create.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).handler().onFailure(this.time.milliseconds(), new TimeoutException());
        Assertions.assertTrue(create.pendingRequests.hasUnsentRequests());
        Assertions.assertEquals(1, create.unsentOffsetCommitRequests().size());
        assertRetryBackOff(create, 100L);
    }

    private void assertCoordinatorDisconnect() {
        ((CoordinatorRequestManager) Mockito.verify(this.coordinatorRequestManager)).markCoordinatorUnknown((String) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
    }

    private void assertExceptionHandling(CommitRequestManager commitRequestManager, Errors errors, boolean z) {
        long j = z ? 100L : Long.MAX_VALUE;
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$protocol$Errors[errors.ordinal()]) {
            case 1:
            case 2:
            case NewTopicTest.NUM_PARTITIONS /* 3 */:
                ((CoordinatorRequestManager) Mockito.verify(this.coordinatorRequestManager)).markCoordinatorUnknown((String) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
                assertPollDoesNotReturn(commitRequestManager, j);
                return;
            case 4:
            case 5:
                if (z) {
                    assertRetryBackOff(commitRequestManager, j);
                    return;
                }
                return;
            case 6:
                return;
            case 7:
            case 8:
            case 9:
                assertPollDoesNotReturn(commitRequestManager, Long.MAX_VALUE);
                return;
            case 10:
                assertPollDoesNotReturn(commitRequestManager, Long.MAX_VALUE);
                return;
            default:
                if ((errors.exception() instanceof RetriableException) && z) {
                    assertRetryBackOff(commitRequestManager, j);
                    return;
                } else {
                    assertPollDoesNotReturn(commitRequestManager, Long.MAX_VALUE);
                    return;
                }
        }
    }

    private void assertPollDoesNotReturn(CommitRequestManager commitRequestManager, long j) {
        NetworkClientDelegate.PollResult poll = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(0, poll.unsentRequests.size());
        Assertions.assertEquals(j, poll.timeUntilNextPollMs);
    }

    private void assertRetryBackOff(CommitRequestManager commitRequestManager, long j) {
        assertPollDoesNotReturn(commitRequestManager, j);
        this.time.sleep(j - 1);
        assertPollDoesNotReturn(commitRequestManager, 1L);
        this.time.sleep(1L);
        assertPoll(1, commitRequestManager);
    }

    @Test
    public void testSyncOffsetFetchFailsWithStaleEpochAndRetriesWithNewEpoch() {
        CommitRequestManager create = create(false, 100L);
        Set<TopicPartition> singleton = Collections.singleton(new TopicPartition("t1", 0));
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        create.fetchOffsets(singleton, this.time.milliseconds() + 60000);
        create.onMemberEpochUpdated(Optional.of(8), Optional.of("member1"));
        completeOffsetFetchRequestWithError(create, singleton, Errors.STALE_MEMBER_EPOCH);
        Assertions.assertEquals(0, create.pendingRequests.inflightOffsetFetches.size());
        Assertions.assertEquals(1, create.pendingRequests.unsentOffsetFetches.size());
        Assertions.assertEquals(0, create.poll(this.time.milliseconds()).unsentRequests.size());
        this.time.sleep(100L);
        NetworkClientDelegate.PollResult poll = create.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        OffsetFetchRequestData data = ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).requestBuilder().build().data();
        Assertions.assertEquals(1, data.groups().size());
        Assertions.assertEquals(8, ((OffsetFetchRequestData.OffsetFetchRequestGroup) data.groups().get(0)).memberEpoch());
        Assertions.assertEquals("member1", ((OffsetFetchRequestData.OffsetFetchRequestGroup) data.groups().get(0)).memberId());
    }

    @Test
    public void testSyncOffsetFetchFailsWithStaleEpochAndNotRetriedIfMemberNotInGroupAnymore() {
        CommitRequestManager create = create(false, 100L);
        Set<TopicPartition> singleton = Collections.singleton(new TopicPartition("t1", 0));
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        CompletableFuture fetchOffsets = create.fetchOffsets(singleton, this.time.milliseconds() + 60000);
        create.onMemberEpochUpdated(Optional.empty(), Optional.empty());
        completeOffsetFetchRequestWithError(create, singleton, Errors.STALE_MEMBER_EPOCH);
        Assertions.assertTrue(fetchOffsets.isDone());
        Assertions.assertTrue(fetchOffsets.isCompletedExceptionally());
        Assertions.assertEquals(0, create.poll(this.time.milliseconds()).unsentRequests.size());
    }

    @MethodSource({"offsetCommitExceptionSupplier"})
    @ParameterizedTest
    public void testAutoCommitSyncBeforeRevocationRetriesOnRetriableAndStaleEpoch(Errors errors) {
        CommitRequestManager create = create(true, 2147483647L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        TopicPartition topicPartition = new TopicPartition("topic", 1);
        this.subscriptionState.assignFromUser(Collections.singleton(topicPartition));
        this.subscriptionState.seek(topicPartition, 5L);
        create.maybeAutoCommitSyncBeforeRevocation(this.time.milliseconds() + 200);
        if (errors == Errors.STALE_MEMBER_EPOCH) {
            create.onMemberEpochUpdated(Optional.of(8), Optional.of("member1"));
        }
        completeOffsetCommitRequestWithError(create, errors);
        if ((!(errors.exception() instanceof RetriableException) && errors != Errors.STALE_MEMBER_EPOCH) || errors == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
            Assertions.assertEquals(0, create.pendingRequests.unsentOffsetCommits.size(), "Non-retriable failed request should be removed from the outbound queue");
            Assertions.assertEquals(0, create.poll(this.time.milliseconds()).unsentRequests.size());
            this.time.sleep(100L);
            Assertions.assertEquals(0, create.poll(this.time.milliseconds()).unsentRequests.size());
            return;
        }
        Assertions.assertEquals(1, create.pendingRequests.unsentOffsetCommits.size(), "Request to be retried should be added to the outbound queue");
        Assertions.assertEquals(0, create.poll(this.time.milliseconds()).unsentRequests.size());
        this.time.sleep(100L);
        NetworkClientDelegate.PollResult poll = create.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        if (errors == Errors.STALE_MEMBER_EPOCH) {
            OffsetCommitRequestData data = ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).requestBuilder().build().data();
            Assertions.assertEquals(8, data.generationIdOrMemberEpoch());
            Assertions.assertEquals("member1", data.memberId());
        }
    }

    @Test
    public void testLastEpochSentOnCommit() {
        CommitRequestManager create = create(true, 2147483647L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        TopicPartition topicPartition = new TopicPartition("topic", 1);
        this.subscriptionState.assignFromUser(Collections.singleton(topicPartition));
        this.subscriptionState.seek(topicPartition, 100L);
        create.maybeAutoCommitSyncBeforeRevocation(Long.MAX_VALUE);
        create.onMemberEpochUpdated(Optional.of(1), Optional.of("member1"));
        completeOffsetCommitRequestWithError(create, Errors.STALE_MEMBER_EPOCH);
        Assertions.assertEquals(1, (Integer) create.lastEpochSentOnCommit().orElse(null));
        create.onMemberEpochUpdated(Optional.of(Integer.valueOf(1 + 1)), Optional.of("member1"));
        Assertions.assertEquals(1, (Integer) create.lastEpochSentOnCommit().get());
        this.time.sleep(100L);
        completeOffsetCommitRequestWithError(create, Errors.STALE_MEMBER_EPOCH);
        Assertions.assertEquals(1 + 1, (Integer) create.lastEpochSentOnCommit().orElse(null));
        create.onMemberEpochUpdated(Optional.empty(), Optional.empty());
        this.time.sleep(200L);
        completeOffsetCommitRequestWithError(create, Errors.STALE_MEMBER_EPOCH);
        Assertions.assertFalse(create.lastEpochSentOnCommit().isPresent());
    }

    @Test
    public void testEnsureCommitSensorRecordsMetric() {
        CommitRequestManager create = create(true, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        commitOffsetWithAssertedLatency(create, 100L);
        commitOffsetWithAssertedLatency(create, 101L);
        Assertions.assertEquals(Double.valueOf(100.5d), getMetric("commit-latency-avg").metricValue());
        Assertions.assertEquals(Double.valueOf(101.0d), getMetric("commit-latency-max").metricValue());
        Assertions.assertEquals(0.066d, ((Double) getMetric("commit-rate").metricValue()).doubleValue(), 0.001d);
        Assertions.assertEquals(Double.valueOf(2.0d), getMetric("commit-total").metricValue());
    }

    private void commitOffsetWithAssertedLatency(CommitRequestManager commitRequestManager, long j) {
        Map singletonMap = Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L));
        long milliseconds = this.time.milliseconds();
        commitRequestManager.commitAsync(singletonMap);
        NetworkClientDelegate.PollResult poll = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        this.time.sleep(j);
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).future().complete(mockOffsetCommitResponse("topic", 1, (short) 1, milliseconds, this.time.milliseconds(), Errors.NONE));
    }

    private void completeOffsetFetchRequestWithError(CommitRequestManager commitRequestManager, Set<TopicPartition> set, Errors errors) {
        NetworkClientDelegate.PollResult poll = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).future().complete(buildOffsetFetchClientResponse((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), set, errors));
    }

    private void completeOffsetCommitRequestWithError(CommitRequestManager commitRequestManager, Errors errors) {
        NetworkClientDelegate.PollResult poll = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).future().complete(mockOffsetCommitResponse("topic", 1, (short) 1, errors));
    }

    private void testRetriable(CommitRequestManager commitRequestManager, List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> list) {
        list.forEach(completableFuture -> {
            Assertions.assertFalse(completableFuture.isDone());
        });
        this.time.sleep(100L);
        commitRequestManager.poll(this.time.milliseconds());
        list.forEach(completableFuture2 -> {
            Assertions.assertFalse(completableFuture2.isDone());
        });
    }

    private void testNonRetriable(List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> list) {
        list.forEach(completableFuture -> {
            Assertions.assertTrue(completableFuture.isCompletedExceptionally());
        });
    }

    private static Stream<Arguments> offsetCommitExceptionSupplier() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{Errors.NOT_COORDINATOR}), Arguments.of(new Object[]{Errors.COORDINATOR_LOAD_IN_PROGRESS}), Arguments.of(new Object[]{Errors.UNKNOWN_SERVER_ERROR}), Arguments.of(new Object[]{Errors.GROUP_AUTHORIZATION_FAILED}), Arguments.of(new Object[]{Errors.OFFSET_METADATA_TOO_LARGE}), Arguments.of(new Object[]{Errors.INVALID_COMMIT_OFFSET_SIZE}), Arguments.of(new Object[]{Errors.UNKNOWN_TOPIC_OR_PARTITION}), Arguments.of(new Object[]{Errors.COORDINATOR_NOT_AVAILABLE}), Arguments.of(new Object[]{Errors.REQUEST_TIMED_OUT}), Arguments.of(new Object[]{Errors.FENCED_INSTANCE_ID}), Arguments.of(new Object[]{Errors.TOPIC_AUTHORIZATION_FAILED}), Arguments.of(new Object[]{Errors.STALE_MEMBER_EPOCH}), Arguments.of(new Object[]{Errors.UNKNOWN_MEMBER_ID})});
    }

    private static Stream<Arguments> offsetFetchExceptionSupplier() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{Errors.NOT_COORDINATOR}), Arguments.of(new Object[]{Errors.COORDINATOR_LOAD_IN_PROGRESS}), Arguments.of(new Object[]{Errors.UNKNOWN_SERVER_ERROR}), Arguments.of(new Object[]{Errors.GROUP_AUTHORIZATION_FAILED}), Arguments.of(new Object[]{Errors.OFFSET_METADATA_TOO_LARGE}), Arguments.of(new Object[]{Errors.INVALID_COMMIT_OFFSET_SIZE}), Arguments.of(new Object[]{Errors.UNKNOWN_TOPIC_OR_PARTITION}), Arguments.of(new Object[]{Errors.COORDINATOR_NOT_AVAILABLE}), Arguments.of(new Object[]{Errors.REQUEST_TIMED_OUT}), Arguments.of(new Object[]{Errors.FENCED_INSTANCE_ID}), Arguments.of(new Object[]{Errors.TOPIC_AUTHORIZATION_FAILED}), Arguments.of(new Object[]{Errors.UNKNOWN_MEMBER_ID}), Arguments.of(new Object[]{Errors.STALE_MEMBER_EPOCH}), Arguments.of(new Object[]{Errors.UNSTABLE_OFFSET_COMMIT})});
    }

    private static Stream<Arguments> offsetFetchRetriableCoordinatorErrors() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{Errors.NOT_COORDINATOR, true}), Arguments.of(new Object[]{Errors.COORDINATOR_NOT_AVAILABLE, true}), Arguments.of(new Object[]{Errors.COORDINATOR_LOAD_IN_PROGRESS, false})});
    }

    @MethodSource({"partitionDataErrorSupplier"})
    @ParameterizedTest
    public void testOffsetFetchRequestPartitionDataError(Errors errors, boolean z) {
        CommitRequestManager create = create(true, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        HashSet hashSet = new HashSet();
        TopicPartition topicPartition = new TopicPartition("t1", 2);
        TopicPartition topicPartition2 = new TopicPartition("t2", 3);
        hashSet.add(topicPartition);
        hashSet.add(topicPartition2);
        CompletableFuture fetchOffsets = create.fetchOffsets(hashSet, this.time.milliseconds() + 60000);
        NetworkClientDelegate.PollResult poll = create.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", errors));
        hashMap.put(topicPartition2, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", Errors.NONE));
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).handler().onComplete(buildOffsetFetchClientResponse((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), hashMap, Errors.NONE, false));
        if (z) {
            testRetriable(create, Collections.singletonList(fetchOffsets));
        } else {
            testNonRetriable(Collections.singletonList(fetchOffsets));
        }
    }

    @Test
    public void testSignalClose() {
        CommitRequestManager create = create(true, 100L);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        create.commitAsync(Collections.singletonMap(new TopicPartition("topic", 1), new OffsetAndMetadata(0L)));
        create.signalClose();
        NetworkClientDelegate.PollResult poll = create.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        Assertions.assertEquals("topic", ((OffsetCommitRequestData.OffsetCommitRequestTopic) ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).requestBuilder().build().data().topics().get(0)).name());
    }

    private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManager) {
        Assertions.assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty());
        Assertions.assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());
        Assertions.assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty());
    }

    private static Stream<Arguments> partitionDataErrorSupplier() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{Errors.UNSTABLE_OFFSET_COMMIT, true}), Arguments.of(new Object[]{Errors.UNKNOWN_TOPIC_OR_PARTITION, false}), Arguments.of(new Object[]{Errors.TOPIC_AUTHORIZATION_FAILED, false}), Arguments.of(new Object[]{Errors.UNKNOWN_SERVER_ERROR, false})});
    }

    private List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> sendAndVerifyDuplicatedOffsetFetchRequests(CommitRequestManager commitRequestManager, Set<TopicPartition> set, int i, Errors errors) {
        ArrayList arrayList = new ArrayList();
        long milliseconds = this.time.milliseconds() + 60000;
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(commitRequestManager.fetchOffsets(set, milliseconds));
        }
        NetworkClientDelegate.PollResult poll = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).handler().onComplete(buildOffsetFetchClientResponse((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), set, errors));
        Assertions.assertEquals(0, commitRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
        return arrayList;
    }

    private void sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(CommitRequestManager commitRequestManager, Errors errors, CompletableFuture<Void> completableFuture) {
        completeOffsetCommitRequestWithError(commitRequestManager, errors);
        Assertions.assertEquals(0, commitRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
        if (errors.exception() instanceof RetriableException) {
            Assertions.assertFalse(completableFuture.isDone());
        } else {
            Assertions.assertTrue(completableFuture.isDone());
            Assertions.assertTrue(completableFuture.isCompletedExceptionally());
        }
    }

    private List<NetworkClientDelegate.FutureCompletionHandler> assertPoll(int i, CommitRequestManager commitRequestManager) {
        return assertPoll(true, i, commitRequestManager);
    }

    private List<NetworkClientDelegate.FutureCompletionHandler> assertPoll(boolean z, int i, CommitRequestManager commitRequestManager) {
        if (z) {
            Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.mockedNode));
        } else {
            Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
        }
        NetworkClientDelegate.PollResult poll = commitRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(i, poll.unsentRequests.size());
        return (List) poll.unsentRequests.stream().map((v0) -> {
            return v0.handler();
        }).collect(Collectors.toList());
    }

    private CommitRequestManager create(boolean z, long j) {
        this.props.setProperty("auto.commit.interval.ms", String.valueOf(j));
        this.props.setProperty("enable.auto.commit", String.valueOf(z));
        if (z) {
            this.props.setProperty("group.id", TestUtils.randomString(10));
        }
        return (CommitRequestManager) Mockito.spy(new CommitRequestManager(this.time, this.logContext, this.subscriptionState, new ConsumerConfig(this.props), this.coordinatorRequestManager, this.offsetCommitCallbackInvoker, "group-id", Optional.of("group-instance-id"), 100L, 1000L, OptionalDouble.of(0.0d), this.metrics));
    }

    private ClientResponse buildOffsetFetchClientResponse(NetworkClientDelegate.UnsentRequest unsentRequest, Set<TopicPartition> set, Errors errors) {
        HashMap hashMap = new HashMap();
        set.forEach(topicPartition -> {
        });
        return buildOffsetFetchClientResponse(unsentRequest, hashMap, errors, false);
    }

    private ClientResponse buildOffsetFetchClientResponseDisconnected(NetworkClientDelegate.UnsentRequest unsentRequest) {
        return buildOffsetFetchClientResponse(unsentRequest, Collections.emptyMap(), Errors.NONE, true);
    }

    private ClientResponse buildOffsetCommitClientResponse(OffsetCommitResponse offsetCommitResponse) {
        return new ClientResponse(new RequestHeader(ApiKeys.OFFSET_COMMIT, (short) 1, "", 1), (RequestCompletionHandler) null, "-1", this.time.milliseconds(), this.time.milliseconds(), false, (UnsupportedVersionException) null, (AuthenticationException) null, offsetCommitResponse);
    }

    public ClientResponse mockOffsetCommitResponse(String str, int i, short s, Errors errors) {
        return mockOffsetCommitResponse(str, i, s, this.time.milliseconds(), this.time.milliseconds(), errors);
    }

    public ClientResponse mockOffsetCommitResponse(String str, int i, short s, long j, long j2, Errors errors) {
        OffsetCommitResponseData topics = new OffsetCommitResponseData().setTopics(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(str).setPartitions(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponsePartition().setErrorCode(errors.code()).setPartitionIndex(i)))));
        Mockito.when(((OffsetCommitResponse) Mockito.mock(OffsetCommitResponse.class)).data()).thenReturn(topics);
        return new ClientResponse(new RequestHeader(ApiKeys.OFFSET_COMMIT, s, "", 1), (RequestCompletionHandler) null, "-1", j, j2, false, (UnsupportedVersionException) null, (AuthenticationException) null, new OffsetCommitResponse(topics));
    }

    public ClientResponse mockOffsetCommitResponseDisconnected(String str, int i, short s, NetworkClientDelegate.UnsentRequest unsentRequest) {
        OffsetCommitResponseData topics = new OffsetCommitResponseData().setTopics(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(str).setPartitions(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponsePartition().setErrorCode(Errors.NONE.code()).setPartitionIndex(i)))));
        Mockito.when(((OffsetCommitResponse) Mockito.mock(OffsetCommitResponse.class)).data()).thenReturn(topics);
        return new ClientResponse(new RequestHeader(ApiKeys.OFFSET_COMMIT, s, "", 1), unsentRequest.handler(), "-1", this.time.milliseconds(), this.time.milliseconds(), true, (UnsupportedVersionException) null, (AuthenticationException) null, new OffsetCommitResponse(topics));
    }

    private ClientResponse buildOffsetFetchClientResponse(NetworkClientDelegate.UnsentRequest unsentRequest, Map<TopicPartition, OffsetFetchResponse.PartitionData> map, Errors errors, boolean z) {
        OffsetFetchRequest build = unsentRequest.requestBuilder().build();
        Assertions.assertInstanceOf(OffsetFetchRequest.class, build);
        OffsetFetchRequest offsetFetchRequest = build;
        return new ClientResponse(new RequestHeader(ApiKeys.OFFSET_FETCH, offsetFetchRequest.version(), "", 1), unsentRequest.handler(), "-1", this.time.milliseconds(), this.time.milliseconds(), z, (UnsupportedVersionException) null, (AuthenticationException) null, new OffsetFetchResponse(errors, map));
    }

    private KafkaMetric getMetric(String str) {
        return (KafkaMetric) this.metrics.metrics().get(this.metrics.metricName(str, CONSUMER_COORDINATOR_METRICS));
    }
}
