/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.MockPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.RequestFutureAdapter;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.ThrowOnAssignmentAssignor;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

@RunWith(value=Parameterized.class)
public class ConsumerCoordinatorTest {
    private final String topic1 = "test1";
    private final String topic2 = "test2";
    private final TopicPartition t1p = new TopicPartition("test1", 0);
    private final TopicPartition t2p = new TopicPartition("test2", 0);
    private final String groupId = "test-group";
    private final Optional<String> groupInstanceId = Optional.of("test-instance");
    private final int rebalanceTimeoutMs = 60000;
    private final int sessionTimeoutMs = 10000;
    private final int heartbeatIntervalMs = 5000;
    private final long retryBackoffMs = 100L;
    private final int autoCommitIntervalMs = 2000;
    private final int requestTimeoutMs = 30000;
    private final MockTime time = new MockTime();
    private GroupRebalanceConfig rebalanceConfig;
    private final ConsumerPartitionAssignor.RebalanceProtocol protocol;
    private final MockPartitionAssignor partitionAssignor;
    private final ThrowOnAssignmentAssignor throwOnAssignmentAssignor;
    private final ThrowOnAssignmentAssignor throwFatalErrorOnAssignmentAssignor;
    private final List<ConsumerPartitionAssignor> assignors;
    private final Map<String, MockPartitionAssignor> assignorMap;
    private final String consumerId = "consumer";
    private final String consumerId2 = "consumer2";
    private MockClient client;
    private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, (Map<String, Integer>)new HashMap<String, Integer>(){
        {
            this.put("test1", 1);
            this.put("test2", 1);
        }
    });
    private Node node = (Node)this.metadataResponse.brokers().iterator().next();
    private SubscriptionState subscriptions;
    private ConsumerMetadata metadata;
    private Metrics metrics;
    private ConsumerNetworkClient consumerClient;
    private MockRebalanceListener rebalanceListener;
    private MockCommitCallback mockOffsetCommitCallback;
    private ConsumerCoordinator coordinator;

    public ConsumerCoordinatorTest(ConsumerPartitionAssignor.RebalanceProtocol protocol) {
        this.protocol = protocol;
        this.partitionAssignor = new MockPartitionAssignor(Collections.singletonList(protocol));
        this.throwOnAssignmentAssignor = new ThrowOnAssignmentAssignor(Collections.singletonList(protocol), (RuntimeException)((Object)new KafkaException("Kaboom for assignment!")), "throw-on-assignment-assignor");
        this.throwFatalErrorOnAssignmentAssignor = new ThrowOnAssignmentAssignor(Collections.singletonList(protocol), new IllegalStateException("Illegal state for assignment!"), "throw-fatal-error-on-assignment-assignor");
        this.assignors = Arrays.asList(new ConsumerPartitionAssignor[]{this.partitionAssignor, this.throwOnAssignmentAssignor, this.throwFatalErrorOnAssignmentAssignor});
        this.assignorMap = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partitionAssignor.name(), (Object)((Object)this.partitionAssignor)), Utils.mkEntry((Object)this.throwOnAssignmentAssignor.name(), (Object)((Object)this.throwOnAssignmentAssignor)), Utils.mkEntry((Object)this.throwFatalErrorOnAssignmentAssignor.name(), (Object)((Object)this.throwFatalErrorOnAssignmentAssignor))});
    }

    @Parameterized.Parameters(name="rebalance protocol = {0}")
    public static Collection<Object[]> data() {
        ArrayList<Object[]> values = new ArrayList<Object[]>();
        for (ConsumerPartitionAssignor.RebalanceProtocol protocol : ConsumerPartitionAssignor.RebalanceProtocol.values()) {
            values.add(new Object[]{protocol});
        }
        return values;
    }

    @Before
    public void setup() {
        LogContext logContext = new LogContext();
        this.subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST);
        this.metadata = new ConsumerMetadata(0L, Long.MAX_VALUE, false, false, this.subscriptions, logContext, new ClusterResourceListeners());
        this.client = new MockClient((Time)this.time, (Metadata)this.metadata);
        this.client.updateMetadata(this.metadataResponse);
        this.consumerClient = new ConsumerNetworkClient(logContext, (KafkaClient)this.client, (Metadata)this.metadata, (Time)this.time, 100L, 30000, Integer.MAX_VALUE);
        this.metrics = new Metrics((Time)this.time);
        this.rebalanceListener = new MockRebalanceListener();
        this.mockOffsetCommitCallback = new MockCommitCallback();
        this.partitionAssignor.clear();
        this.rebalanceConfig = this.buildRebalanceConfig(Optional.empty());
        this.coordinator = this.buildCoordinator(this.rebalanceConfig, this.metrics, this.assignors, false);
    }

    private GroupRebalanceConfig buildRebalanceConfig(Optional<String> groupInstanceId) {
        return new GroupRebalanceConfig(10000, 60000, 5000, "test-group", groupInstanceId, 100L, !groupInstanceId.isPresent());
    }

    @After
    public void teardown() {
        this.metrics.close();
        this.coordinator.close(this.time.timer(0L));
    }

    @Test
    public void testMetrics() {
        Assert.assertNotNull((Object)this.getMetric("commit-latency-avg"));
        Assert.assertNotNull((Object)this.getMetric("commit-latency-max"));
        Assert.assertNotNull((Object)this.getMetric("commit-rate"));
        Assert.assertNotNull((Object)this.getMetric("commit-total"));
        Assert.assertNotNull((Object)this.getMetric("partition-revoked-latency-avg"));
        Assert.assertNotNull((Object)this.getMetric("partition-revoked-latency-max"));
        Assert.assertNotNull((Object)this.getMetric("partition-assigned-latency-avg"));
        Assert.assertNotNull((Object)this.getMetric("partition-assigned-latency-max"));
        Assert.assertNotNull((Object)this.getMetric("partition-lost-latency-avg"));
        Assert.assertNotNull((Object)this.getMetric("partition-lost-latency-max"));
        Assert.assertNotNull((Object)this.getMetric("assigned-partitions"));
        this.metrics.sensor("commit-latency").record(1.0);
        this.metrics.sensor("commit-latency").record(6.0);
        this.metrics.sensor("commit-latency").record(2.0);
        Assert.assertEquals((Object)3.0, (Object)this.getMetric("commit-latency-avg").metricValue());
        Assert.assertEquals((Object)6.0, (Object)this.getMetric("commit-latency-max").metricValue());
        Assert.assertEquals((Object)0.1, (Object)this.getMetric("commit-rate").metricValue());
        Assert.assertEquals((Object)3.0, (Object)this.getMetric("commit-total").metricValue());
        this.metrics.sensor("partition-revoked-latency").record(1.0);
        this.metrics.sensor("partition-revoked-latency").record(2.0);
        this.metrics.sensor("partition-assigned-latency").record(1.0);
        this.metrics.sensor("partition-assigned-latency").record(2.0);
        this.metrics.sensor("partition-lost-latency").record(1.0);
        this.metrics.sensor("partition-lost-latency").record(2.0);
        Assert.assertEquals((Object)1.5, (Object)this.getMetric("partition-revoked-latency-avg").metricValue());
        Assert.assertEquals((Object)2.0, (Object)this.getMetric("partition-revoked-latency-max").metricValue());
        Assert.assertEquals((Object)1.5, (Object)this.getMetric("partition-assigned-latency-avg").metricValue());
        Assert.assertEquals((Object)2.0, (Object)this.getMetric("partition-assigned-latency-max").metricValue());
        Assert.assertEquals((Object)1.5, (Object)this.getMetric("partition-lost-latency-avg").metricValue());
        Assert.assertEquals((Object)2.0, (Object)this.getMetric("partition-lost-latency-max").metricValue());
        Assert.assertEquals((Object)0.0, (Object)this.getMetric("assigned-partitions").metricValue());
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        Assert.assertEquals((Object)1.0, (Object)this.getMetric("assigned-partitions").metricValue());
        this.subscriptions.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.t1p, this.t2p}));
        Assert.assertEquals((Object)2.0, (Object)this.getMetric("assigned-partitions").metricValue());
    }

    private KafkaMetric getMetric(String name) {
        return (KafkaMetric)this.metrics.metrics().get(this.metrics.metricName(name, "consumertest-group-coordinator-metrics"));
    }

    public ByteBuffer subscriptionUserData(int generation) {
        String generationKeyName = "generation";
        Schema cooperativeStickyAssignorUserDataV0 = new Schema(new Field[]{new Field("generation", (Type)Type.INT32)});
        Struct struct = new Struct(cooperativeStickyAssignorUserDataV0);
        struct.set("generation", (Object)generation);
        ByteBuffer buffer = ByteBuffer.allocate(cooperativeStickyAssignorUserDataV0.sizeOf((Object)struct));
        cooperativeStickyAssignorUserDataV0.write(buffer, (Object)struct);
        buffer.flip();
        return buffer;
    }

    private List<JoinGroupResponseData.JoinGroupResponseMember> validateCooperativeAssignmentTestSetup() {
        HashMap<String, List<String>> memberSubscriptions = new HashMap<String, List<String>>();
        List<String> subscribedTopics = Collections.singletonList("test1");
        memberSubscriptions.put("consumer", subscribedTopics);
        memberSubscriptions.put("consumer2", subscribedTopics);
        ConsumerPartitionAssignor.Subscription subscriptionConsumer1 = new ConsumerPartitionAssignor.Subscription(subscribedTopics, this.subscriptionUserData(1), Arrays.asList(this.t1p, this.t2p));
        ConsumerPartitionAssignor.Subscription subscriptionConsumer2 = new ConsumerPartitionAssignor.Subscription(subscribedTopics, this.subscriptionUserData(1), Collections.emptyList());
        ArrayList<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<JoinGroupResponseData.JoinGroupResponseMember>();
        for (Map.Entry subscriptionEntry : memberSubscriptions.entrySet()) {
            ByteBuffer buf = null;
            buf = ((String)subscriptionEntry.getKey()).equals("consumer") ? ConsumerProtocol.serializeSubscription((ConsumerPartitionAssignor.Subscription)subscriptionConsumer1) : ConsumerProtocol.serializeSubscription((ConsumerPartitionAssignor.Subscription)subscriptionConsumer2);
            metadata.add(new JoinGroupResponseData.JoinGroupResponseMember().setMemberId((String)subscriptionEntry.getKey()).setMetadata(buf.array()));
        }
        return metadata;
    }

    @Test
    public void testPerformAssignmentShouldValidateCooperativeAssignment() {
        SubscriptionState mockSubscriptionState = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        List<JoinGroupResponseData.JoinGroupResponseMember> metadata = this.validateCooperativeAssignmentTestSetup();
        HashMap<String, List<TopicPartition>> assignment = new HashMap<String, List<TopicPartition>>();
        assignment.put("consumer", Arrays.asList(this.t1p));
        assignment.put("consumer2", Arrays.asList(this.t2p));
        this.partitionAssignor.prepare(assignment);
        try (ConsumerCoordinator coordinator = this.buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, false, mockSubscriptionState);){
            if (this.protocol == ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE) {
                Exception e = (Exception)Assert.assertThrows(IllegalStateException.class, () -> coordinator.performAssignment("1", this.partitionAssignor.name(), metadata));
                Assert.assertTrue((boolean)e.getMessage().contains("Assignor supporting the COOPERATIVE protocol violates its requirements"));
            } else {
                coordinator.performAssignment("1", this.partitionAssignor.name(), metadata);
            }
        }
    }

    @Test
    public void testPerformAssignmentShouldSkipValidateCooperativeAssignmentForBuiltInCooperativeStickyAssignor() {
        SubscriptionState mockSubscriptionState = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        List<JoinGroupResponseData.JoinGroupResponseMember> metadata = this.validateCooperativeAssignmentTestSetup();
        ArrayList<ConsumerPartitionAssignor> assignorsWithCooperativeStickyAssignor = new ArrayList<ConsumerPartitionAssignor>(this.assignors);
        MockPartitionAssignor mockCooperativeStickyAssignor = new MockPartitionAssignor(Collections.singletonList(this.protocol)){

            @Override
            public String name() {
                return "cooperative-sticky";
            }
        };
        assignorsWithCooperativeStickyAssignor.add((ConsumerPartitionAssignor)mockCooperativeStickyAssignor);
        HashMap<String, List<TopicPartition>> assignment = new HashMap<String, List<TopicPartition>>();
        assignment.put("consumer", Arrays.asList(this.t1p));
        assignment.put("consumer2", Arrays.asList(this.t2p));
        mockCooperativeStickyAssignor.prepare(assignment);
        try (ConsumerCoordinator coordinator = this.buildCoordinator(this.rebalanceConfig, new Metrics(), assignorsWithCooperativeStickyAssignor, false, mockSubscriptionState);){
            coordinator.performAssignment("1", mockCooperativeStickyAssignor.name(), metadata);
        }
    }

    @Test
    public void testSelectRebalanceProtcol() {
        ArrayList<ConsumerPartitionAssignor> assignors = new ArrayList<ConsumerPartitionAssignor>();
        assignors.add((ConsumerPartitionAssignor)new MockPartitionAssignor(Collections.singletonList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER)));
        assignors.add((ConsumerPartitionAssignor)new MockPartitionAssignor(Collections.singletonList(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
        Assert.assertThrows(IllegalArgumentException.class, () -> this.buildCoordinator(this.rebalanceConfig, new Metrics(), assignors, false));
        assignors.clear();
        assignors.add((ConsumerPartitionAssignor)new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
        assignors.add((ConsumerPartitionAssignor)new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
        try (ConsumerCoordinator coordinator = this.buildCoordinator(this.rebalanceConfig, new Metrics(), assignors, false);){
            Assert.assertEquals((Object)ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE, (Object)coordinator.getProtocol());
        }
    }

    @Test
    public void testNormalHeartbeat() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(10000L);
        RequestFuture future = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertFalse((boolean)future.isDone());
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.NONE));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)future.succeeded());
    }

    @Test(expected=GroupAuthorizationException.class)
    public void testGroupDescribeUnauthorized() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.GROUP_AUTHORIZATION_FAILED));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
    }

    @Test(expected=GroupAuthorizationException.class)
    public void testGroupReadUnauthorized() {
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(0, "memberId", Collections.emptyMap(), Errors.GROUP_AUTHORIZATION_FAILED));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testCoordinatorNotAvailable() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(10000L);
        RequestFuture future = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertFalse((boolean)future.isDone());
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.COORDINATOR_NOT_AVAILABLE));
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)future.failed());
        Assert.assertEquals((Object)Errors.COORDINATOR_NOT_AVAILABLE.exception(), (Object)future.exception());
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        int numRequests = 1000;
        TopicPartition tp = new TopicPartition("foo", 0);
        AtomicInteger responses = new AtomicInteger(0);
        for (int i = 0; i < numRequests; ++i) {
            Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(tp, new OffsetAndMetadata((long)i));
            this.coordinator.commitOffsetsAsync(offsets, (offsets1, exception) -> {
                responses.incrementAndGet();
                Throwable cause = exception.getCause();
                Assert.assertTrue((String)("Unexpected exception cause type: " + (cause == null ? null : cause.getClass())), (boolean)(cause instanceof DisconnectException));
            });
        }
        this.coordinator.markCoordinatorUnknown("test cause");
        this.consumerClient.pollNoWakeup();
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertEquals((long)numRequests, (long)responses.get());
    }

    @Test
    public void testCoordinatorUnknownInUnsentCallbacksAfterCoordinatorDead() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        final AtomicBoolean asyncCallbackInvoked = new AtomicBoolean(false);
        OffsetCommitRequestData offsetCommitRequestData = new OffsetCommitRequestData().setGroupId("test-group").setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("foo").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedLeaderEpoch(-1).setCommittedMetadata("").setCommittedOffset(13L).setCommitTimestamp(0L)))));
        this.consumerClient.send(this.coordinator.checkAndGetCoordinator(), (AbstractRequest.Builder)new OffsetCommitRequest.Builder(offsetCommitRequestData)).compose((RequestFutureAdapter)new RequestFutureAdapter<ClientResponse, Object>(){

            public void onSuccess(ClientResponse value, RequestFuture<Object> future) {
            }

            public void onFailure(RuntimeException e, RequestFuture<Object> future) {
                Assert.assertTrue((String)("Unexpected exception type: " + e.getClass()), (boolean)(e instanceof DisconnectException));
                Assert.assertTrue((boolean)ConsumerCoordinatorTest.this.coordinator.coordinatorUnknown());
                asyncCallbackInvoked.set(true);
            }
        });
        this.coordinator.markCoordinatorUnknown("test cause");
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue((boolean)asyncCallbackInvoked.get());
    }

    @Test
    public void testNotCoordinator() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(10000L);
        RequestFuture future = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertFalse((boolean)future.isDone());
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.NOT_COORDINATOR));
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)future.failed());
        Assert.assertEquals((Object)Errors.NOT_COORDINATOR.exception(), (Object)future.exception());
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testIllegalGeneration() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.subscriptions.assignFromSubscribed(Collections.singletonList(this.t1p));
        this.time.sleep(10000L);
        RequestFuture future = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertFalse((boolean)future.isDone());
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.ILLEGAL_GENERATION));
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)future.failed());
        Assert.assertEquals((Object)Errors.ILLEGAL_GENERATION.exception(), (Object)future.exception());
        Assert.assertTrue((boolean)this.coordinator.rejoinNeededOrPending());
        this.coordinator.poll(this.time.timer(0L));
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.lostCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.lost);
    }

    @Test
    public void testUnsubscribeWithValidGeneration() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        ByteBuffer buffer = ConsumerProtocol.serializeAssignment((ConsumerPartitionAssignor.Assignment)new ConsumerPartitionAssignor.Assignment(Collections.singletonList(this.t1p), ByteBuffer.wrap(new byte[0])));
        this.coordinator.onJoinComplete(1, "memberId", this.partitionAssignor.name(), buffer);
        this.coordinator.onLeavePrepare();
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.lostCount);
        Assert.assertEquals((long)0L, (long)this.rebalanceListener.revokedCount);
    }

    @Test
    public void testRevokeExceptionThrownFirstNonBlockingSubCallbacks() {
        MockRebalanceListener throwOnRevokeListener = new MockRebalanceListener(){

            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                super.onPartitionsRevoked(partitions);
                throw new KafkaException("Kaboom on revoke!");
            }
        };
        if (this.protocol == ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE) {
            this.verifyOnCallbackExceptions(throwOnRevokeListener, this.throwOnAssignmentAssignor.name(), "Kaboom on revoke!", null);
        } else {
            this.verifyOnCallbackExceptions(throwOnRevokeListener, this.throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null);
        }
    }

    @Test
    public void testOnAssignmentExceptionThrownFirstNonBlockingSubCallbacks() {
        MockRebalanceListener throwOnAssignListener = new MockRebalanceListener(){

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                super.onPartitionsAssigned(partitions);
                throw new KafkaException("Kaboom on partition assign!");
            }
        };
        this.verifyOnCallbackExceptions(throwOnAssignListener, this.throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null);
    }

    @Test
    public void testOnPartitionsAssignExceptionThrownWhenNoPreviousThrownCallbacks() {
        MockRebalanceListener throwOnAssignListener = new MockRebalanceListener(){

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                super.onPartitionsAssigned(partitions);
                throw new KafkaException("Kaboom on partition assign!");
            }
        };
        this.verifyOnCallbackExceptions(throwOnAssignListener, this.partitionAssignor.name(), "Kaboom on partition assign!", null);
    }

    @Test
    public void testOnRevokeExceptionShouldBeRenderedIfNotKafkaException() {
        MockRebalanceListener throwOnRevokeListener = new MockRebalanceListener(){

            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                super.onPartitionsRevoked(partitions);
                throw new IllegalStateException("Illegal state on partition revoke!");
            }
        };
        if (this.protocol == ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE) {
            this.verifyOnCallbackExceptions(throwOnRevokeListener, this.throwOnAssignmentAssignor.name(), "User rebalance callback throws an error", "Illegal state on partition revoke!");
        } else {
            this.verifyOnCallbackExceptions(throwOnRevokeListener, this.throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null);
        }
    }

    @Test
    public void testOnAssignmentExceptionShouldBeRenderedIfNotKafkaException() {
        MockRebalanceListener throwOnAssignListener = new MockRebalanceListener(){

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                super.onPartitionsAssigned(partitions);
                throw new KafkaException("Kaboom on partition assign!");
            }
        };
        this.verifyOnCallbackExceptions(throwOnAssignListener, this.throwFatalErrorOnAssignmentAssignor.name(), "User rebalance callback throws an error", "Illegal state for assignment!");
    }

    @Test
    public void testOnPartitionsAssignExceptionShouldBeRenderedIfNotKafkaException() {
        MockRebalanceListener throwOnAssignListener = new MockRebalanceListener(){

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                super.onPartitionsAssigned(partitions);
                throw new IllegalStateException("Illegal state on partition assign!");
            }
        };
        this.verifyOnCallbackExceptions(throwOnAssignListener, this.partitionAssignor.name(), "User rebalance callback throws an error", "Illegal state on partition assign!");
    }

    private void verifyOnCallbackExceptions(MockRebalanceListener rebalanceListener, String assignorName, String exceptionMessage, String causeMessage) {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)rebalanceListener);
        ByteBuffer buffer = ConsumerProtocol.serializeAssignment((ConsumerPartitionAssignor.Assignment)new ConsumerPartitionAssignor.Assignment(Collections.singletonList(this.t1p), ByteBuffer.wrap(new byte[0])));
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.t2p));
        if (exceptionMessage != null) {
            Exception exception = (Exception)Assert.assertThrows(KafkaException.class, () -> this.coordinator.onJoinComplete(1, "memberId", assignorName, buffer));
            Assert.assertEquals((Object)exceptionMessage, (Object)exception.getMessage());
            if (causeMessage != null) {
                Assert.assertEquals((Object)causeMessage, (Object)exception.getCause().getMessage());
            }
        }
        Assert.assertEquals((long)(this.protocol == ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE ? 1L : 0L), (long)rebalanceListener.revokedCount);
        Assert.assertEquals((long)0L, (long)rebalanceListener.lostCount);
        Assert.assertEquals((long)1L, (long)rebalanceListener.assignedCount);
        Assert.assertTrue((String)("Unknown assignor name: " + assignorName), (boolean)this.assignorMap.containsKey(assignorName));
        Assert.assertEquals((long)1L, (long)this.assignorMap.get(assignorName).numAssignment());
    }

    @Test
    public void testUnsubscribeWithInvalidGeneration() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.subscriptions.assignFromSubscribed(Collections.singletonList(this.t1p));
        this.coordinator.onLeavePrepare();
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.lostCount);
        Assert.assertEquals((long)0L, (long)this.rebalanceListener.revokedCount);
    }

    @Test
    public void testUnknownMemberId() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.subscriptions.assignFromSubscribed(Collections.singletonList(this.t1p));
        this.time.sleep(10000L);
        RequestFuture future = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertFalse((boolean)future.isDone());
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.UNKNOWN_MEMBER_ID));
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)future.failed());
        Assert.assertEquals((Object)Errors.UNKNOWN_MEMBER_ID.exception(), (Object)future.exception());
        Assert.assertTrue((boolean)this.coordinator.rejoinNeededOrPending());
        this.coordinator.poll(this.time.timer(0L));
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.lostCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.lost);
    }

    @Test
    public void testCoordinatorDisconnect() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(10000L);
        RequestFuture future = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertFalse((boolean)future.isDone());
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.NONE), true);
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)future.failed());
        Assert.assertTrue((boolean)(future.exception() instanceof DisconnectException));
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
    }

    @Test(expected=ApiException.class)
    public void testJoinGroupInvalidGroupId() {
        String consumerId = "leader";
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(0, "leader", Collections.emptyMap(), Errors.INVALID_GROUP_ID));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testNormalJoinGroupLeader() {
        String consumerId = "leader";
        Set<String> subscription = Collections.singleton("test1");
        List<TopicPartition> owned = Collections.emptyList();
        List<TopicPartition> assigned = Arrays.asList(this.t1p);
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> memberSubscriptions = Collections.singletonMap("leader", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", assigned));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "leader", memberSubscriptions, Errors.NONE));
        this.client.prepareResponse(body -> {
            SyncGroupRequest sync = (SyncGroupRequest)body;
            return sync.data.memberId().equals("leader") && sync.data.generationId() == 1 && sync.groupAssignments().containsKey("leader");
        }, (AbstractResponse)this.syncGroupResponse(assigned, Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(TestUtils.toSet(assigned), (Object)this.subscriptions.assignedPartitions());
        Assert.assertEquals(subscription, (Object)this.subscriptions.metadataTopics());
        Assert.assertEquals((long)0L, (long)this.rebalanceListener.revokedCount);
        Assert.assertNull(this.rebalanceListener.revoked);
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(this.getAdded(owned, assigned), this.rebalanceListener.assigned);
    }

    @Test
    public void testOutdatedCoordinatorAssignment() {
        String consumerId = "outdated_assignment";
        List<TopicPartition> owned = Collections.emptyList();
        List<String> oldSubscription = Collections.singletonList("test2");
        List<TopicPartition> oldAssignment = Arrays.asList(this.t2p);
        List<String> newSubscription = Collections.singletonList("test1");
        List<TopicPartition> newAssignment = Arrays.asList(this.t1p);
        this.subscriptions.subscribe(TestUtils.toSet(oldSubscription), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.partitionAssignor.prepare(Collections.singletonMap("outdated_assignment", newAssignment));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "outdated_assignment", Collections.singletonMap("outdated_assignment", oldSubscription), Errors.NONE));
        this.client.prepareResponse(body -> {
            SyncGroupRequest sync = (SyncGroupRequest)body;
            return sync.data.memberId().equals("outdated_assignment") && sync.data.generationId() == 1 && sync.groupAssignments().containsKey("outdated_assignment");
        }, (AbstractResponse)this.syncGroupResponse(oldAssignment, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "outdated_assignment", Collections.singletonMap("outdated_assignment", newSubscription), Errors.NONE));
        this.client.prepareResponse(body -> {
            SyncGroupRequest sync = (SyncGroupRequest)body;
            return sync.data.memberId().equals("outdated_assignment") && sync.data.generationId() == 1 && sync.groupAssignments().containsKey("outdated_assignment");
        }, (AbstractResponse)this.syncGroupResponse(newAssignment, Errors.NONE));
        this.coordinator.poll(this.time.timer(0L));
        this.subscriptions.subscribe(TestUtils.toSet(newSubscription), (ConsumerRebalanceListener)this.rebalanceListener);
        this.coordinator.poll(this.time.timer(0L));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Collection<TopicPartition> assigned = this.getAdded(owned, newAssignment);
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(TestUtils.toSet(newAssignment), (Object)this.subscriptions.assignedPartitions());
        Assert.assertEquals(TestUtils.toSet(newSubscription), (Object)this.subscriptions.metadataTopics());
        Assert.assertEquals((long)(this.protocol == ConsumerPartitionAssignor.RebalanceProtocol.EAGER ? 1L : 0L), (long)this.rebalanceListener.revokedCount);
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(assigned, this.rebalanceListener.assigned);
    }

    @Test
    public void testMetadataTopicsDuringSubscriptionChange() {
        String consumerId = "subscription_change";
        List<String> oldSubscription = Collections.singletonList("test1");
        List<TopicPartition> oldAssignment = Collections.singletonList(this.t1p);
        List<String> newSubscription = Collections.singletonList("test2");
        List<TopicPartition> newAssignment = Collections.singletonList(this.t2p);
        this.subscriptions.subscribe(TestUtils.toSet(oldSubscription), (ConsumerRebalanceListener)this.rebalanceListener);
        Assert.assertEquals(TestUtils.toSet(oldSubscription), (Object)this.subscriptions.metadataTopics());
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.prepareJoinAndSyncResponse("subscription_change", 1, oldSubscription, oldAssignment);
        this.coordinator.poll(this.time.timer(0L));
        Assert.assertEquals(TestUtils.toSet(oldSubscription), (Object)this.subscriptions.metadataTopics());
        this.subscriptions.subscribe(TestUtils.toSet(newSubscription), (ConsumerRebalanceListener)this.rebalanceListener);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"test1", "test2"}), (Object)this.subscriptions.metadataTopics());
        this.prepareJoinAndSyncResponse("subscription_change", 2, newSubscription, newAssignment);
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(TestUtils.toSet(newAssignment), (Object)this.subscriptions.assignedPartitions());
        Assert.assertEquals(TestUtils.toSet(newSubscription), (Object)this.subscriptions.metadataTopics());
    }

    @Test
    public void testPatternJoinGroupLeader() {
        String consumerId = "leader";
        List<TopicPartition> assigned = Arrays.asList(this.t1p, this.t2p);
        List<TopicPartition> owned = Collections.emptyList();
        this.subscriptions.subscribe(Pattern.compile("test.*"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> memberSubscriptions = Collections.singletonMap("leader", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", assigned));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "leader", memberSubscriptions, Errors.NONE));
        this.client.prepareResponse(body -> {
            SyncGroupRequest sync = (SyncGroupRequest)body;
            return sync.data.memberId().equals("leader") && sync.data.generationId() == 1 && sync.groupAssignments().containsKey("leader");
        }, (AbstractResponse)this.syncGroupResponse(assigned, Errors.NONE));
        this.client.prepareMetadataUpdate(this.metadataResponse);
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals((long)2L, (long)this.subscriptions.numAssignedPartitions());
        Assert.assertEquals((long)2L, (long)this.subscriptions.metadataTopics().size());
        Assert.assertEquals((long)2L, (long)this.subscriptions.subscription().size());
        Assert.assertEquals((long)0L, (long)this.rebalanceListener.revokedCount);
        Assert.assertNull(this.rebalanceListener.revoked);
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(this.getAdded(owned, assigned), this.rebalanceListener.assigned);
    }

    @Test
    public void testMetadataRefreshDuringRebalance() {
        String consumerId = "leader";
        List<TopicPartition> owned = Collections.emptyList();
        List<TopicPartition> oldAssigned = Collections.singletonList(this.t1p);
        this.subscriptions.subscribe(Pattern.compile(".*"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.coordinator.maybeUpdateSubscriptionMetadata();
        Assert.assertEquals(Collections.singleton("test1"), (Object)this.subscriptions.subscription());
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> initialSubscription = Collections.singletonMap("leader", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", oldAssigned));
        List<String> updatedSubscription = Arrays.asList("test1", "test2");
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "leader", initialSubscription, Errors.NONE));
        this.client.prepareResponse(body -> {
            HashMap<String, Integer> updatedPartitions = new HashMap<String, Integer>();
            for (String topic : updatedSubscription) {
                updatedPartitions.put(topic, 1);
            }
            this.client.updateMetadata(TestUtils.metadataUpdateWith(1, updatedPartitions));
            return true;
        }, (AbstractResponse)this.syncGroupResponse(oldAssigned, Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.singleton("test1"), (Object)this.subscriptions.subscription());
        Assert.assertEquals(TestUtils.toSet(oldAssigned), (Object)this.subscriptions.assignedPartitions());
        Assert.assertEquals((long)0L, (long)this.rebalanceListener.revokedCount);
        Assert.assertNull(this.rebalanceListener.revoked);
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(this.getAdded(owned, oldAssigned), this.rebalanceListener.assigned);
        List<TopicPartition> newAssigned = Arrays.asList(this.t1p, this.t2p);
        Map<String, List<String>> updatedSubscriptions = Collections.singletonMap("leader", Arrays.asList("test1", "test2"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", newAssigned));
        this.client.prepareResponse(body -> {
            JoinGroupRequest join = (JoinGroupRequest)body;
            Iterator protocolIterator = join.data().protocols().iterator();
            Assert.assertTrue((boolean)protocolIterator.hasNext());
            JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = (JoinGroupRequestData.JoinGroupRequestProtocol)protocolIterator.next();
            ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
            ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription((ByteBuffer)metadata);
            metadata.rewind();
            return subscription.topics().containsAll(updatedSubscription);
        }, (AbstractResponse)this.joinGroupLeaderResponse(2, "leader", updatedSubscriptions, Errors.NONE));
        this.client.prepareResponse(body -> {
            this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
            return true;
        }, (AbstractResponse)this.syncGroupResponse(newAssigned, Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Collection<TopicPartition> revoked = this.getRevoked(oldAssigned, newAssigned);
        int revokedCount = revoked.isEmpty() ? 0 : 1;
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(TestUtils.toSet(updatedSubscription), (Object)this.subscriptions.subscription());
        Assert.assertEquals(TestUtils.toSet(newAssigned), (Object)this.subscriptions.assignedPartitions());
        Assert.assertEquals((long)revokedCount, (long)this.rebalanceListener.revokedCount);
        Assert.assertEquals(revoked.isEmpty() ? null : revoked, this.rebalanceListener.revoked);
        Assert.assertEquals((long)2L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(this.getAdded(oldAssigned, newAssigned), this.rebalanceListener.assigned);
        this.partitionAssignor.prepare(Collections.singletonMap("leader", oldAssigned));
        this.client.prepareResponse(body -> {
            JoinGroupRequest join = (JoinGroupRequest)body;
            Iterator protocolIterator = join.data().protocols().iterator();
            Assert.assertTrue((boolean)protocolIterator.hasNext());
            JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = (JoinGroupRequestData.JoinGroupRequestProtocol)protocolIterator.next();
            ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
            ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription((ByteBuffer)metadata);
            metadata.rewind();
            return subscription.topics().contains("test1");
        }, (AbstractResponse)this.joinGroupLeaderResponse(3, "leader", initialSubscription, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(oldAssigned, Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        revoked = this.getRevoked(newAssigned, oldAssigned);
        Assert.assertFalse((boolean)revoked.isEmpty());
        Collection<TopicPartition> added = this.getAdded(newAssigned, oldAssigned);
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.singleton("test1"), (Object)this.subscriptions.subscription());
        Assert.assertEquals(TestUtils.toSet(oldAssigned), (Object)this.subscriptions.assignedPartitions());
        Assert.assertEquals((long)(++revokedCount), (long)this.rebalanceListener.revokedCount);
        Assert.assertEquals(revoked.isEmpty() ? null : revoked, this.rebalanceListener.revoked);
        Assert.assertEquals((long)3L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(added, this.rebalanceListener.assigned);
        Assert.assertEquals((long)0L, (long)this.rebalanceListener.lostCount);
    }

    @Test
    public void testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
        this.subscriptions.subscribe(Pattern.compile(".*"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.coordinator.maybeUpdateSubscriptionMetadata();
        Assert.assertEquals(Collections.singleton("test1"), (Object)this.subscriptions.subscription());
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareMetadataUpdate(this.metadataResponse);
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(body -> {
            SyncGroupRequest sync = (SyncGroupRequest)body;
            return sync.data.memberId().equals("consumer") && sync.data.generationId() == 1 && sync.groupAssignments().isEmpty();
        }, (AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.partitionAssignor.prepare(Collections.singletonMap("consumer", Collections.singletonList(this.t1p)));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        HashSet<String> updatedSubscriptionSet = new HashSet<String>(Arrays.asList("test1", "test2"));
        Assert.assertEquals(updatedSubscriptionSet, (Object)this.subscriptions.subscription());
        this.metadata.requestUpdate();
        this.consumerClient.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
    }

    @Test
    public void testRebalanceWithMetadataChange() {
        String consumerId = "leader";
        List<String> topics = Arrays.asList("test1", "test2");
        List<TopicPartition> partitions = Arrays.asList(this.t1p, this.t2p);
        this.subscriptions.subscribe(TestUtils.toSet(topics), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test1", (Object)1), Utils.mkEntry((Object)"test2", (Object)1)})));
        this.coordinator.maybeUpdateSubscriptionMetadata();
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> initialSubscription = Collections.singletonMap("leader", topics);
        this.partitionAssignor.prepare(Collections.singletonMap("leader", partitions));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "leader", initialSubscription, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(partitions, Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(TestUtils.toSet(topics), (Object)this.subscriptions.subscription());
        Assert.assertEquals(TestUtils.toSet(partitions), (Object)this.subscriptions.assignedPartitions());
        Assert.assertEquals((long)0L, (long)this.rebalanceListener.revokedCount);
        Assert.assertNull(this.rebalanceListener.revoked);
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.coordinator.poll(this.time.timer(0L));
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test1", (Object)1), Utils.mkEntry((Object)"test2", (Object)1)})));
        this.client.respond((AbstractResponse)this.joinGroupFollowerResponse(1, "leader", "leader", Errors.NOT_COORDINATOR));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)this.coordinator.rejoinNeededOrPending());
        this.client.respond(request -> {
            if (!(request instanceof JoinGroupRequest)) {
                return false;
            }
            JoinGroupRequest joinRequest = (JoinGroupRequest)request;
            return "leader".equals(joinRequest.data().memberId());
        }, (AbstractResponse)this.joinGroupLeaderResponse(2, "leader", initialSubscription, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(partitions, Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Collection<TopicPartition> revoked = this.getRevoked(partitions, partitions);
        Assert.assertEquals((long)(revoked.isEmpty() ? 0L : 1L), (long)this.rebalanceListener.revokedCount);
        Assert.assertEquals(revoked.isEmpty() ? null : revoked, this.rebalanceListener.revoked);
        Assert.assertEquals((long)0L, (long)this.rebalanceListener.lostCount);
        Assert.assertNull(this.rebalanceListener.lost);
        Collection<TopicPartition> added = this.getAdded(partitions, partitions);
        Assert.assertEquals((long)2L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(added.isEmpty() ? Collections.emptySet() : TestUtils.toSet(partitions), this.rebalanceListener.assigned);
        Assert.assertEquals(TestUtils.toSet(partitions), (Object)this.subscriptions.assignedPartitions());
    }

    @Test
    public void testWakeupDuringJoin() {
        String consumerId = "leader";
        List<TopicPartition> owned = Collections.emptyList();
        List<TopicPartition> assigned = Collections.singletonList(this.t1p);
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> memberSubscriptions = Collections.singletonMap("leader", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", assigned));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "leader", memberSubscriptions, Errors.NONE));
        this.consumerClient.wakeup();
        try {
            this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(assigned, Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(TestUtils.toSet(assigned), (Object)this.subscriptions.assignedPartitions());
        Assert.assertEquals((long)0L, (long)this.rebalanceListener.revokedCount);
        Assert.assertNull(this.rebalanceListener.revoked);
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(this.getAdded(owned, assigned), this.rebalanceListener.assigned);
    }

    @Test
    public void testNormalJoinGroupFollower() {
        Set<String> subscription = Collections.singleton("test1");
        List<TopicPartition> owned = Collections.emptyList();
        List<TopicPartition> assigned = Collections.singletonList(this.t1p);
        this.subscriptions.subscribe(subscription, (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(body -> {
            SyncGroupRequest sync = (SyncGroupRequest)body;
            return sync.data.memberId().equals("consumer") && sync.data.generationId() == 1 && sync.groupAssignments().isEmpty();
        }, (AbstractResponse)this.syncGroupResponse(assigned, Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(TestUtils.toSet(assigned), (Object)this.subscriptions.assignedPartitions());
        Assert.assertEquals(subscription, (Object)this.subscriptions.metadataTopics());
        Assert.assertEquals((long)0L, (long)this.rebalanceListener.revokedCount);
        Assert.assertNull(this.rebalanceListener.revoked);
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(this.getAdded(owned, assigned), this.rebalanceListener.assigned);
    }

    @Test
    public void testUpdateLastHeartbeatPollWhenCoordinatorUnknown() throws Exception {
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.NOT_COORDINATOR));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(5000L);
        TestUtils.waitForCondition(() -> !this.client.hasPendingResponses(), "Failed to observe expected heartbeat from background thread");
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
        Assert.assertFalse((boolean)this.coordinator.poll(this.time.timer(0L)));
        Assert.assertEquals((long)this.time.milliseconds(), (long)this.coordinator.heartbeat().lastPollTime());
        this.time.sleep(59999L);
        Assert.assertFalse((boolean)this.coordinator.heartbeat().pollTimeoutExpired(this.time.milliseconds()));
    }

    @Test
    public void testPatternJoinGroupFollower() {
        Set subscription = Utils.mkSet((Object[])new String[]{"test1", "test2"});
        List<TopicPartition> owned = Collections.emptyList();
        List<TopicPartition> assigned = Arrays.asList(this.t1p, this.t2p);
        this.subscriptions.subscribe(Pattern.compile("test.*"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(body -> {
            SyncGroupRequest sync = (SyncGroupRequest)body;
            return sync.data.memberId().equals("consumer") && sync.data.generationId() == 1 && sync.groupAssignments().isEmpty();
        }, (AbstractResponse)this.syncGroupResponse(assigned, Errors.NONE));
        this.client.prepareMetadataUpdate(this.metadataResponse);
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals((long)assigned.size(), (long)this.subscriptions.numAssignedPartitions());
        Assert.assertEquals((Object)subscription, (Object)this.subscriptions.subscription());
        Assert.assertEquals((long)0L, (long)this.rebalanceListener.revokedCount);
        Assert.assertNull(this.rebalanceListener.revoked);
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(this.getAdded(owned, assigned), this.rebalanceListener.assigned);
    }

    @Test
    public void testLeaveGroupOnClose() {
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.joinAsFollowerAndReceiveAssignment(this.coordinator, Collections.singletonList(this.t1p));
        AtomicBoolean received = new AtomicBoolean(false);
        this.client.prepareResponse(body -> {
            received.set(true);
            LeaveGroupRequest leaveRequest = (LeaveGroupRequest)body;
            return this.validateLeaveGroup("test-group", "consumer", leaveRequest);
        }, (AbstractResponse)new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
        this.coordinator.close(this.time.timer(0L));
        Assert.assertTrue((boolean)received.get());
    }

    @Test
    public void testMaybeLeaveGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.joinAsFollowerAndReceiveAssignment(this.coordinator, Collections.singletonList(this.t1p));
        AtomicBoolean received = new AtomicBoolean(false);
        this.client.prepareResponse(body -> {
            received.set(true);
            LeaveGroupRequest leaveRequest = (LeaveGroupRequest)body;
            return this.validateLeaveGroup("test-group", "consumer", leaveRequest);
        }, (AbstractResponse)new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
        this.coordinator.maybeLeaveGroup("test maybe leave group");
        Assert.assertTrue((boolean)received.get());
        AbstractCoordinator.Generation generation = this.coordinator.generationIfStable();
        Assert.assertNull((Object)generation);
    }

    private boolean validateLeaveGroup(String groupId, String consumerId, LeaveGroupRequest leaveRequest) {
        List<LeaveGroupRequestData.MemberIdentity> members = leaveRequest.data().members();
        return leaveRequest.data().groupId().equals(groupId) && members.size() == 1 && members.get(0).memberId().equals(consumerId);
    }

    @Test
    public void testPendingMemberShouldLeaveGroup() {
        String consumerId = "consumer-id";
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(-1, "consumer-id", "leader-id", Errors.MEMBER_ID_REQUIRED));
        this.coordinator.joinGroupIfNeeded(this.time.timer(0L));
        AtomicBoolean received = new AtomicBoolean(false);
        this.client.prepareResponse(body -> {
            received.set(true);
            LeaveGroupRequest leaveRequest = (LeaveGroupRequest)body;
            return this.validateLeaveGroup("test-group", "consumer-id", leaveRequest);
        }, (AbstractResponse)new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
        this.coordinator.maybeLeaveGroup("pending member leaves");
        Assert.assertTrue((boolean)received.get());
    }

    @Test(expected=KafkaException.class)
    public void testUnexpectedErrorOnSyncGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_SERVER_ERROR));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testUnknownMemberIdOnSyncGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_MEMBER_ID));
        this.client.prepareResponse(body -> {
            JoinGroupRequest joinRequest = (JoinGroupRequest)body;
            return joinRequest.data().memberId().equals("");
        }, (AbstractResponse)this.joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.singleton(this.t1p), (Object)this.subscriptions.assignedPartitions());
    }

    @Test
    public void testRebalanceInProgressOnSyncGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.emptyList(), Errors.REBALANCE_IN_PROGRESS));
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.singleton(this.t1p), (Object)this.subscriptions.assignedPartitions());
    }

    @Test
    public void testIllegalGenerationOnSyncGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.emptyList(), Errors.ILLEGAL_GENERATION));
        this.client.prepareResponse(body -> {
            JoinGroupRequest joinRequest = (JoinGroupRequest)body;
            return joinRequest.data().memberId().equals("");
        }, (AbstractResponse)this.joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.singleton(this.t1p), (Object)this.subscriptions.assignedPartitions());
    }

    @Test
    public void testMetadataChangeTriggersRebalance() {
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> memberSubscriptions = Collections.singletonMap("consumer", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("consumer", Collections.singletonList(this.t1p)));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "consumer", memberSubscriptions, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        this.metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 2)), false, this.time.milliseconds());
        this.coordinator.maybeUpdateSubscriptionMetadata();
        Assert.assertTrue((boolean)this.coordinator.rejoinNeededOrPending());
    }

    @Test
    public void testUpdateMetadataDuringRebalance() {
        String topic1 = "topic1";
        String topic2 = "topic2";
        TopicPartition tp1 = new TopicPartition("topic1", 0);
        TopicPartition tp2 = new TopicPartition("topic2", 0);
        String consumerId = "leader";
        List<String> topics = Arrays.asList("topic1", "topic2");
        this.subscriptions.subscribe(new HashSet<String>(topics), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic1", 1)));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> memberSubscriptions = Collections.singletonMap("leader", topics);
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Arrays.asList(tp1)));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "leader", memberSubscriptions, Errors.NONE));
        this.client.prepareResponse(body -> {
            SyncGroupRequest sync = (SyncGroupRequest)body;
            if (sync.data.memberId().equals("leader") && sync.data.generationId() == 1 && sync.groupAssignments().containsKey("leader")) {
                HashMap<String, Integer> topicPartitionCounts = new HashMap<String, Integer>();
                topicPartitionCounts.put("topic1", 1);
                topicPartitionCounts.put("topic2", 1);
                this.client.updateMetadata(TestUtils.metadataUpdateWith(1, topicPartitionCounts));
                return true;
            }
            return false;
        }, (AbstractResponse)this.syncGroupResponse(Collections.singletonList(tp1), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(2, "leader", memberSubscriptions, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(new HashSet<TopicPartition>(Arrays.asList(tp1, tp2)), (Object)this.subscriptions.assignedPartitions());
    }

    @Test
    public void testSubscriptionChangeWithAuthorizationFailure() {
        this.subscriptions.subscribe(Utils.mkSet((Object[])new String[]{"test1", "test2"}), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.singletonMap("test2", Errors.TOPIC_AUTHORIZATION_FAILED), Collections.singletonMap("test1", 1)));
        Assert.assertThrows(TopicAuthorizationException.class, () -> this.coordinator.poll(this.time.timer(Long.MAX_VALUE)));
        this.client.respond((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(0, "consumer", Collections.emptyMap(), Errors.GROUP_AUTHORIZATION_FAILED));
        Assert.assertThrows(GroupAuthorizationException.class, () -> this.coordinator.poll(this.time.timer(Long.MAX_VALUE)));
        this.subscriptions.subscribe(Utils.mkSet((Object[])new String[]{"test1"}), (ConsumerRebalanceListener)this.rebalanceListener);
        Assert.assertEquals(Collections.singleton("test1"), (Object)this.subscriptions.metadataTopics());
        this.client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.emptyMap(), Collections.singletonMap("test1", 1)));
        Map<String, List<String>> memberSubscriptions = Collections.singletonMap("consumer", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("consumer", Collections.singletonList(this.t1p)));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "consumer", memberSubscriptions, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.singleton("test1"), (Object)this.subscriptions.subscription());
        Assert.assertEquals(Collections.singleton("test1"), (Object)this.subscriptions.metadataTopics());
    }

    @Test
    public void testWakeupFromAssignmentCallback() {
        String topic = "topic1";
        TopicPartition partition = new TopicPartition("topic1", 0);
        String consumerId = "follower";
        Set<String> topics = Collections.singleton("topic1");
        MockRebalanceListener rebalanceListener = new MockRebalanceListener(){

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                boolean raiseWakeup = this.assignedCount == 0;
                super.onPartitionsAssigned(partitions);
                if (raiseWakeup) {
                    throw new WakeupException();
                }
            }
        };
        this.subscriptions.subscribe(topics, (ConsumerRebalanceListener)rebalanceListener);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.partitionAssignor.prepare(Collections.singletonMap("follower", Collections.singletonList(partition)));
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "follower", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(partition), Errors.NONE));
        try {
            this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assert.fail((String)"Expected exception thrown from assignment callback");
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals((long)0L, (long)rebalanceListener.revokedCount);
        Assert.assertEquals((long)2L, (long)rebalanceListener.assignedCount);
    }

    @Test
    public void testRebalanceAfterTopicUnavailableWithSubscribe() {
        this.unavailableTopicTest(false, Collections.emptySet());
    }

    @Test
    public void testRebalanceAfterTopicUnavailableWithPatternSubscribe() {
        this.unavailableTopicTest(true, Collections.emptySet());
    }

    @Test
    public void testRebalanceAfterNotMatchingTopicUnavailableWithPatternSubscribe() {
        this.unavailableTopicTest(true, Collections.singleton("notmatching"));
    }

    private void unavailableTopicTest(boolean patternSubscribe, Set<String> unavailableTopicsInLastMetadata) {
        if (patternSubscribe) {
            this.subscriptions.subscribe(Pattern.compile("test.*"), (ConsumerRebalanceListener)this.rebalanceListener);
        } else {
            this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        }
        this.client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.singletonMap("test1", Errors.UNKNOWN_TOPIC_OR_PARTITION), Collections.emptyMap()));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> memberSubscriptions = Collections.singletonMap("consumer", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.emptyMap());
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "consumer", memberSubscriptions, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.emptyList(), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.assigned);
        Assert.assertTrue((String)"Metadata refresh not requested for unavailable partitions", (boolean)this.metadata.updateRequested());
        HashMap<String, Errors> topicErrors = new HashMap<String, Errors>();
        for (String topic : unavailableTopicsInLastMetadata) {
            topicErrors.put(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION);
        }
        this.client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1, topicErrors, Collections.singletonMap("test1", 1)));
        this.consumerClient.poll(this.time.timer(0L));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(2, "consumer", memberSubscriptions, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((String)"Metadata refresh requested unnecessarily", (boolean)this.metadata.updateRequested());
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testExcludeInternalTopicsConfigOption() {
        this.testInternalTopicInclusion(false);
    }

    @Test
    public void testIncludeInternalTopicsConfigOption() {
        this.testInternalTopicInclusion(true);
    }

    private void testInternalTopicInclusion(boolean includeInternalTopics) {
        this.metadata = new ConsumerMetadata(0L, Long.MAX_VALUE, includeInternalTopics, false, this.subscriptions, new LogContext(), new ClusterResourceListeners());
        this.client = new MockClient((Time)this.time, (Metadata)this.metadata);
        try (ConsumerCoordinator coordinator = this.buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, false);){
            this.subscriptions.subscribe(Pattern.compile(".*"), (ConsumerRebalanceListener)this.rebalanceListener);
            Node node = new Node(0, "localhost", 9999);
            MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, new TopicPartition("__consumer_offsets", 0), Optional.of(node.id()), Optional.empty(), Collections.singletonList(node.id()), Collections.singletonList(node.id()), Collections.singletonList(node.id()));
            MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE, "__consumer_offsets", true, Collections.singletonList(partitionMetadata));
            this.client.updateMetadata(MetadataResponse.prepareResponse(Collections.singletonList(node), (String)"clusterId", (int)node.id(), Collections.singletonList(topicMetadata)));
            coordinator.maybeUpdateSubscriptionMetadata();
            Assert.assertEquals((Object)includeInternalTopics, (Object)this.subscriptions.subscription().contains("__consumer_offsets"));
        }
    }

    @Test
    public void testRejoinGroup() {
        String otherTopic = "otherTopic";
        List<TopicPartition> owned = Collections.emptyList();
        List<TopicPartition> assigned = Arrays.asList(this.t1p);
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.joinAsFollowerAndReceiveAssignment(this.coordinator, assigned);
        Assert.assertEquals((long)0L, (long)this.rebalanceListener.revokedCount);
        Assert.assertNull(this.rebalanceListener.revoked);
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(this.getAdded(owned, assigned), this.rebalanceListener.assigned);
        this.rebalanceListener.revoked = null;
        this.rebalanceListener.assigned = null;
        this.subscriptions.subscribe(new HashSet<String>(Arrays.asList("test1", otherTopic)), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(assigned, Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Collection<TopicPartition> revoked = this.getRevoked(assigned, assigned);
        Collection<TopicPartition> added = this.getAdded(assigned, assigned);
        Assert.assertEquals((long)(revoked.isEmpty() ? 0L : 1L), (long)this.rebalanceListener.revokedCount);
        Assert.assertEquals(revoked.isEmpty() ? null : revoked, this.rebalanceListener.revoked);
        Assert.assertEquals((long)2L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(added, this.rebalanceListener.assigned);
    }

    @Test
    public void testDisconnectInJoin() {
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        List<TopicPartition> owned = Collections.emptyList();
        List<TopicPartition> assigned = Arrays.asList(this.t1p);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE), true);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(assigned, Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(TestUtils.toSet(assigned), (Object)this.subscriptions.assignedPartitions());
        Assert.assertEquals((long)0L, (long)this.rebalanceListener.revokedCount);
        Assert.assertNull(this.rebalanceListener.revoked);
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(this.getAdded(owned, assigned), this.rebalanceListener.assigned);
    }

    @Test(expected=ApiException.class)
    public void testInvalidSessionTimeout() {
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testCommitOffsetOnly() {
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        AtomicBoolean success = new AtomicBoolean(false);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.callback(success));
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue((boolean)success.get());
    }

    @Test
    public void testCoordinatorDisconnectAfterNotCoordinatorError() {
        this.testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors.NOT_COORDINATOR);
    }

    @Test
    public void testCoordinatorDisconnectAfterCoordinatorNotAvailableError() {
        this.testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors.COORDINATOR_NOT_AVAILABLE);
    }

    private void testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors error) {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        MockCommitCallback firstCommitCallback = new MockCommitCallback();
        MockCommitCallback secondCommitCallback = new MockCommitCallback();
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), (OffsetCommitCallback)firstCommitCallback);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), (OffsetCommitCallback)secondCommitCallback);
        this.respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), error);
        this.consumerClient.pollNoWakeup();
        this.consumerClient.pollNoWakeup();
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
        Assert.assertTrue((boolean)(firstCommitCallback.exception instanceof RetriableCommitFailedException));
        Assert.assertTrue((boolean)(secondCommitCallback.exception instanceof RetriableCommitFailedException));
    }

    @Test
    public void testAutoCommitDynamicAssignment() {
        try (ConsumerCoordinator coordinator = this.buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, true);){
            this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
            this.joinAsFollowerAndReceiveAssignment(coordinator, Collections.singletonList(this.t1p));
            this.subscriptions.seek(this.t1p, 100L);
            this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
            this.time.sleep(2000L);
            coordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assert.assertFalse((boolean)this.client.hasPendingResponses());
        }
    }

    @Test
    public void testAutoCommitRetryBackoff() {
        try (ConsumerCoordinator coordinator = this.buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, true);){
            this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
            this.joinAsFollowerAndReceiveAssignment(coordinator, Collections.singletonList(this.t1p));
            this.subscriptions.seek(this.t1p, 100L);
            this.time.sleep(2000L);
            this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NOT_COORDINATOR);
            coordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assert.assertTrue((boolean)coordinator.coordinatorUnknown());
            this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
            coordinator.poll(this.time.timer(Long.MAX_VALUE));
            this.subscriptions.seek(this.t1p, 200L);
            this.time.sleep(50L);
            coordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
            this.time.sleep(50L);
            coordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            this.respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, 200L), Errors.NONE);
        }
    }

    @Test
    public void testAutoCommitAwaitsInterval() {
        try (ConsumerCoordinator coordinator = this.buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, true);){
            this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
            this.joinAsFollowerAndReceiveAssignment(coordinator, Collections.singletonList(this.t1p));
            this.subscriptions.seek(this.t1p, 100L);
            this.time.sleep(2000L);
            coordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            this.time.sleep(1000L);
            coordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            this.respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
            coordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
            this.subscriptions.seek(this.t1p, 200L);
            coordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
            this.time.sleep(1000L);
            coordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            this.respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, 200L), Errors.NONE);
        }
    }

    @Test
    public void testAutoCommitDynamicAssignmentRebalance() {
        try (ConsumerCoordinator coordinator = this.buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, true);){
            this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
            this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
            coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
            this.time.sleep(2000L);
            this.consumerClient.poll(this.time.timer(0L));
            this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
            this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
            coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
            this.subscriptions.seek(this.t1p, 100L);
            this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
            this.time.sleep(2000L);
            coordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assert.assertFalse((boolean)this.client.hasPendingResponses());
        }
    }

    @Test
    public void testAutoCommitManualAssignment() {
        try (ConsumerCoordinator coordinator = this.buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, true);){
            this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
            this.subscriptions.seek(this.t1p, 100L);
            this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
            coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
            this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
            this.time.sleep(2000L);
            coordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assert.assertFalse((boolean)this.client.hasPendingResponses());
        }
    }

    @Test
    public void testAutoCommitManualAssignmentCoordinatorUnknown() {
        try (ConsumerCoordinator coordinator = this.buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, true);){
            this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
            this.subscriptions.seek(this.t1p, 100L);
            this.consumerClient.poll(this.time.timer(0L));
            this.time.sleep(2000L);
            this.consumerClient.poll(this.time.timer(0L));
            this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
            coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
            this.time.sleep(100L);
            this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
            coordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assert.assertFalse((boolean)this.client.hasPendingResponses());
        }
    }

    @Test
    public void testCommitOffsetMetadata() {
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        AtomicBoolean success = new AtomicBoolean(false);
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "hello"));
        this.coordinator.commitOffsetsAsync(offsets, this.callback(offsets, success));
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue((boolean)success.get());
    }

    @Test
    public void testCommitOffsetAsyncWithDefaultCallback() {
        int invokedBeforeTest = this.mockOffsetCommitCallback.invoked;
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), (OffsetCommitCallback)this.mockOffsetCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertEquals((long)(invokedBeforeTest + 1), (long)this.mockOffsetCommitCallback.invoked);
        Assert.assertNull((Object)this.mockOffsetCommitCallback.exception);
    }

    @Test
    public void testCommitAfterLeaveGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.joinAsFollowerAndReceiveAssignment(this.coordinator, Collections.singletonList(this.t1p));
        this.client.prepareResponse((AbstractResponse)new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
        this.subscriptions.unsubscribe();
        this.coordinator.maybeLeaveGroup("test commit after leave");
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(body -> {
            OffsetCommitRequest commitRequest = (OffsetCommitRequest)body;
            return commitRequest.data().memberId().equals("") && commitRequest.data().generationId() == -1;
        }, (AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        AtomicBoolean success = new AtomicBoolean(false);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.callback(success));
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue((boolean)success.get());
    }

    @Test
    public void testCommitOffsetAsyncFailedWithDefaultCallback() {
        int invokedBeforeTest = this.mockOffsetCommitCallback.invoked;
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), (OffsetCommitCallback)this.mockOffsetCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertEquals((long)(invokedBeforeTest + 1), (long)this.mockOffsetCommitCallback.invoked);
        Assert.assertTrue((boolean)(this.mockOffsetCommitCallback.exception instanceof RetriableCommitFailedException));
    }

    @Test
    public void testCommitOffsetAsyncCoordinatorNotAvailable() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        MockCommitCallback cb = new MockCommitCallback();
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), (OffsetCommitCallback)cb);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
        Assert.assertEquals((long)1L, (long)cb.invoked);
        Assert.assertTrue((boolean)(cb.exception instanceof RetriableCommitFailedException));
    }

    @Test
    public void testCommitOffsetAsyncNotCoordinator() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        MockCommitCallback cb = new MockCommitCallback();
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), (OffsetCommitCallback)cb);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
        Assert.assertEquals((long)1L, (long)cb.invoked);
        Assert.assertTrue((boolean)(cb.exception instanceof RetriableCommitFailedException));
    }

    @Test
    public void testCommitOffsetAsyncDisconnected() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        MockCommitCallback cb = new MockCommitCallback();
        this.prepareOffsetCommitRequestDisconnect(Collections.singletonMap(this.t1p, 100L));
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), (OffsetCommitCallback)cb);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
        Assert.assertEquals((long)1L, (long)cb.invoked);
        Assert.assertTrue((boolean)(cb.exception instanceof RetriableCommitFailedException));
    }

    @Test
    public void testCommitOffsetSyncNotCoordinator() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NOT_COORDINATOR);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testCommitOffsetSyncCoordinatorNotAvailable() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testCommitOffsetSyncCoordinatorDisconnected() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.prepareOffsetCommitRequestDisconnect(Collections.singletonMap(this.t1p, 100L));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testAsyncCommitCallbacksInvokedPriorToSyncCommitCompletion() throws Exception {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        final List committedOffsets = Collections.synchronizedList(new ArrayList());
        final OffsetAndMetadata firstOffset = new OffsetAndMetadata(0L);
        final OffsetAndMetadata secondOffset = new OffsetAndMetadata(1L);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, firstOffset), new OffsetCommitCallback(){

            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                committedOffsets.add(firstOffset);
            }
        });
        Thread thread = new Thread(){

            @Override
            public void run() {
                ConsumerCoordinatorTest.this.coordinator.commitOffsetsSync(Collections.singletonMap(ConsumerCoordinatorTest.this.t1p, secondOffset), ConsumerCoordinatorTest.this.time.timer(10000L));
                committedOffsets.add(secondOffset);
            }
        };
        thread.start();
        this.client.waitForRequests(2, 5000L);
        this.respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, firstOffset.offset()), Errors.NONE);
        this.respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, secondOffset.offset()), Errors.NONE);
        thread.join();
        Assert.assertEquals(Arrays.asList(firstOffset, secondOffset), committedOffsets);
    }

    @Test
    public void testRetryCommitUnknownTopicOrPartition() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        Assert.assertTrue((boolean)this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(10000L)));
    }

    @Test(expected=OffsetMetadataTooLarge.class)
    public void testCommitOffsetMetadataTooLarge() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.OFFSET_METADATA_TOO_LARGE);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(Long.MAX_VALUE));
    }

    @Test(expected=CommitFailedException.class)
    public void testCommitOffsetIllegalGeneration() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.ILLEGAL_GENERATION);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(Long.MAX_VALUE));
    }

    @Test(expected=CommitFailedException.class)
    public void testCommitOffsetUnknownMemberId() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.UNKNOWN_MEMBER_ID);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testCommitOffsetIllegalGenerationWithNewGeneration() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        AbstractCoordinator.Generation currGen = new AbstractCoordinator.Generation(1, "memberId", null);
        this.coordinator.setNewGeneration(currGen);
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.ILLEGAL_GENERATION);
        RequestFuture future = this.coordinator.sendOffsetCommitRequest(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")));
        AbstractCoordinator.Generation newGen = new AbstractCoordinator.Generation(2, "memberId-new", null);
        this.coordinator.setNewGeneration(newGen);
        this.coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.time.timer(30000L)));
        Assert.assertTrue((boolean)future.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
        Assert.assertEquals((Object)newGen, (Object)this.coordinator.generation());
    }

    @Test
    public void testCommitOffsetIllegalGenerationWithResetGenearion() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        AbstractCoordinator.Generation currGen = new AbstractCoordinator.Generation(1, "memberId", null);
        this.coordinator.setNewGeneration(currGen);
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.ILLEGAL_GENERATION);
        RequestFuture future = this.coordinator.sendOffsetCommitRequest(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")));
        this.coordinator.setNewGeneration(AbstractCoordinator.Generation.NO_GENERATION);
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.time.timer(30000L)));
        Assert.assertTrue((boolean)future.exception().getClass().isInstance(new CommitFailedException()));
        Assert.assertEquals((Object)AbstractCoordinator.Generation.NO_GENERATION, (Object)this.coordinator.generation());
    }

    @Test
    public void testCommitOffsetUnknownMemberWithNewGenearion() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        AbstractCoordinator.Generation currGen = new AbstractCoordinator.Generation(1, "memberId", null);
        this.coordinator.setNewGeneration(currGen);
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.UNKNOWN_MEMBER_ID);
        RequestFuture future = this.coordinator.sendOffsetCommitRequest(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")));
        AbstractCoordinator.Generation newGen = new AbstractCoordinator.Generation(2, "memberId-new", null);
        this.coordinator.setNewGeneration(newGen);
        this.coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.time.timer(30000L)));
        Assert.assertTrue((boolean)future.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
        Assert.assertEquals((Object)newGen, (Object)this.coordinator.generation());
    }

    @Test
    public void testCommitOffsetUnknownMemberWithResetGenearion() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        AbstractCoordinator.Generation currGen = new AbstractCoordinator.Generation(1, "memberId", null);
        this.coordinator.setNewGeneration(currGen);
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.UNKNOWN_MEMBER_ID);
        RequestFuture future = this.coordinator.sendOffsetCommitRequest(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")));
        this.coordinator.setNewGeneration(AbstractCoordinator.Generation.NO_GENERATION);
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.time.timer(30000L)));
        Assert.assertTrue((boolean)future.exception().getClass().isInstance(new CommitFailedException()));
        Assert.assertEquals((Object)AbstractCoordinator.Generation.NO_GENERATION, (Object)this.coordinator.generation());
    }

    @Test
    public void testCommitOffsetFencedInstanceWithRebalancingGenearion() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        AbstractCoordinator.Generation currGen = new AbstractCoordinator.Generation(1, "memberId", null);
        this.coordinator.setNewGeneration(currGen);
        this.coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.FENCED_INSTANCE_ID);
        RequestFuture future = this.coordinator.sendOffsetCommitRequest(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")));
        AbstractCoordinator.Generation newGen = new AbstractCoordinator.Generation(2, "memberId-new", null);
        this.coordinator.setNewGeneration(newGen);
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.time.timer(30000L)));
        Assert.assertTrue((boolean)future.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
        Assert.assertEquals((Object)newGen, (Object)this.coordinator.generation());
    }

    @Test
    public void testCommitOffsetFencedInstanceWithNewGenearion() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        AbstractCoordinator.Generation currGen = new AbstractCoordinator.Generation(1, "memberId", null);
        this.coordinator.setNewGeneration(currGen);
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.FENCED_INSTANCE_ID);
        RequestFuture future = this.coordinator.sendOffsetCommitRequest(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")));
        AbstractCoordinator.Generation newGen = new AbstractCoordinator.Generation(2, "memberId-new", null);
        this.coordinator.setNewGeneration(newGen);
        Assert.assertTrue((boolean)this.consumerClient.poll(future, this.time.timer(30000L)));
        Assert.assertTrue((boolean)future.exception().getClass().isInstance(new CommitFailedException()));
        Assert.assertEquals((Object)newGen, (Object)this.coordinator.generation());
    }

    @Test
    public void testCommitOffsetRebalanceInProgress() {
        String consumerId = "leader";
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> memberSubscriptions = Collections.singletonMap("leader", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Collections.singletonList(this.t1p)));
        this.coordinator.ensureActiveGroup(this.time.timer(0L));
        Assert.assertTrue((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertNull((Object)this.coordinator.generationIfStable());
        Assert.assertThrows(RebalanceInProgressException.class, () -> this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(Long.MAX_VALUE)));
        Node coordinatorNode = new Node(Integer.MAX_VALUE - this.node.id(), this.node.host(), this.node.port());
        this.client.respondFrom((AbstractResponse)this.joinGroupLeaderResponse(1, "leader", memberSubscriptions, Errors.NONE), coordinatorNode);
        this.client.prepareResponse(body -> {
            SyncGroupRequest sync = (SyncGroupRequest)body;
            return sync.data.memberId().equals("leader") && sync.data.generationId() == 1 && sync.groupAssignments().containsKey("leader");
        }, (AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        AbstractCoordinator.Generation expectedGeneration = new AbstractCoordinator.Generation(1, "leader", this.partitionAssignor.name());
        Assert.assertFalse((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals((Object)expectedGeneration, (Object)this.coordinator.generationIfStable());
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
        Assert.assertThrows(RebalanceInProgressException.class, () -> this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(Long.MAX_VALUE)));
        Assert.assertTrue((boolean)this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals((Object)expectedGeneration, (Object)this.coordinator.generationIfStable());
    }

    @Test(expected=KafkaException.class)
    public void testCommitOffsetSyncCallbackWithNonRetriableException() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.UNKNOWN_SERVER_ERROR);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testCommitOffsetSyncWithoutFutureGetsCompleted() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse((boolean)this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(0L)));
    }

    @Test
    public void testRefreshOffset() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.emptySet(), (Object)this.subscriptions.initializingPartitions());
        Assert.assertTrue((boolean)this.subscriptions.hasAllFetchPositions());
        Assert.assertEquals((long)100L, (long)this.subscriptions.position((TopicPartition)this.t1p).offset);
    }

    @Test
    public void testRefreshOffsetWithValidation() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.emptyMap(), Collections.singletonMap("test1", 1), tp -> 4);
        this.client.updateMetadata(metadataResponse);
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(this.t1p, Errors.NONE, "", 100L, Optional.of(3)));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.emptySet(), (Object)this.subscriptions.initializingPartitions());
        Assert.assertFalse((boolean)this.subscriptions.hasAllFetchPositions());
        Assert.assertTrue((boolean)this.subscriptions.awaitingValidation(this.t1p));
        Assert.assertEquals((long)this.subscriptions.position((TopicPartition)this.t1p).offset, (long)100L);
        Assert.assertNull((Object)this.subscriptions.validPosition(this.t1p));
    }

    @Test
    public void testFetchCommittedOffsets() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        long offset = 500L;
        String metadata = "blahblah";
        Optional<Integer> leaderEpoch = Optional.of(15);
        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch, metadata, Errors.NONE);
        this.client.prepareResponse((AbstractResponse)new OffsetFetchResponse(Errors.NONE, Collections.singletonMap(this.t1p, data)));
        Map fetchedOffsets = this.coordinator.fetchCommittedOffsets(Collections.singleton(this.t1p), this.time.timer(Long.MAX_VALUE));
        Assert.assertNotNull((Object)fetchedOffsets);
        Assert.assertEquals((Object)new OffsetAndMetadata(offset, leaderEpoch, metadata), fetchedOffsets.get(this.t1p));
    }

    @Test
    public void testTopicAuthorizationFailedInOffsetFetch() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(-1L, Optional.empty(), "", Errors.TOPIC_AUTHORIZATION_FAILED);
        this.client.prepareResponse((AbstractResponse)new OffsetFetchResponse(Errors.NONE, Collections.singletonMap(this.t1p, data)));
        TopicAuthorizationException exception = (TopicAuthorizationException)Assert.assertThrows(TopicAuthorizationException.class, () -> this.coordinator.fetchCommittedOffsets(Collections.singleton(this.t1p), this.time.timer(Long.MAX_VALUE)));
        Assert.assertEquals(Collections.singleton("test1"), (Object)exception.unauthorizedTopics());
    }

    @Test
    public void testRefreshOffsetLoadInProgress() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.emptySet(), (Object)this.subscriptions.initializingPartitions());
        Assert.assertTrue((boolean)this.subscriptions.hasAllFetchPositions());
        Assert.assertEquals((long)100L, (long)this.subscriptions.position((TopicPartition)this.t1p).offset);
    }

    @Test
    public void testRefreshOffsetsGroupNotAuthorized() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED));
        try {
            this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
            Assert.fail((String)"Expected group authorization error");
        }
        catch (GroupAuthorizationException e) {
            Assert.assertEquals((Object)"test-group", (Object)e.groupId());
        }
    }

    @Test
    public void testRefreshOffsetWithPendingTransactions() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(this.t1p, Errors.UNSTABLE_OFFSET_COMMIT, "", -1L));
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        Assert.assertEquals(Collections.singleton(this.t1p), (Object)this.subscriptions.initializingPartitions());
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(0L));
        Assert.assertEquals(Collections.singleton(this.t1p), (Object)this.subscriptions.initializingPartitions());
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(0L));
        Assert.assertEquals(Collections.emptySet(), (Object)this.subscriptions.initializingPartitions());
        Assert.assertTrue((boolean)this.subscriptions.hasAllFetchPositions());
        Assert.assertEquals((long)100L, (long)this.subscriptions.position((TopicPartition)this.t1p).offset);
    }

    @Test(expected=KafkaException.class)
    public void testRefreshOffsetUnknownTopicOrPartition() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(this.t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testRefreshOffsetNotCoordinatorForConsumer() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(Errors.NOT_COORDINATOR));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.emptySet(), (Object)this.subscriptions.initializingPartitions());
        Assert.assertTrue((boolean)this.subscriptions.hasAllFetchPositions());
        Assert.assertEquals((long)100L, (long)this.subscriptions.position((TopicPartition)this.t1p).offset);
    }

    @Test
    public void testRefreshOffsetWithNoFetchableOffsets() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(this.t1p, Errors.NONE, "", -1L));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.singleton(this.t1p), (Object)this.subscriptions.initializingPartitions());
        Assert.assertEquals(Collections.emptySet(), (Object)this.subscriptions.partitionsNeedingReset(this.time.milliseconds()));
        Assert.assertFalse((boolean)this.subscriptions.hasAllFetchPositions());
        Assert.assertNull((Object)this.subscriptions.position(this.t1p));
    }

    @Test
    public void testNoCoordinatorDiscoveryIfPositionsKnown() {
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.seek(this.t1p, 500L);
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.emptySet(), (Object)this.subscriptions.initializingPartitions());
        Assert.assertTrue((boolean)this.subscriptions.hasAllFetchPositions());
        Assert.assertEquals((long)500L, (long)this.subscriptions.position((TopicPartition)this.t1p).offset);
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testNoCoordinatorDiscoveryIfPartitionAwaitingReset() {
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.requestOffsetReset(this.t1p, OffsetResetStrategy.EARLIEST);
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.emptySet(), (Object)this.subscriptions.initializingPartitions());
        Assert.assertFalse((boolean)this.subscriptions.hasAllFetchPositions());
        Assert.assertEquals(Collections.singleton(this.t1p), (Object)this.subscriptions.partitionsNeedingReset(this.time.milliseconds()));
        Assert.assertEquals((Object)OffsetResetStrategy.EARLIEST, (Object)this.subscriptions.resetStrategy(this.t1p));
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testAuthenticationFailureInEnsureActiveGroup() {
        this.client.createPendingAuthenticationError(this.node, 300L);
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail((String)"Expected an authentication error.");
        }
        catch (AuthenticationException authenticationException) {
            // empty catch block
        }
    }

    @Test
    public void testThreadSafeAssignedPartitionsMetric() throws Exception {
        KafkaMetric metric = this.metrics.metric(new MetricName("assigned-partitions", "consumertest-group-coordinator-metrics", "", Collections.emptyMap()));
        final AtomicBoolean doStop = new AtomicBoolean();
        AtomicReference exceptionHolder = new AtomicReference();
        AtomicInteger observedSize = new AtomicInteger();
        Thread poller = new Thread((Metric)metric, observedSize, exceptionHolder){
            final /* synthetic */ Metric val$metric;
            final /* synthetic */ AtomicInteger val$observedSize;
            final /* synthetic */ AtomicReference val$exceptionHolder;
            {
                this.val$metric = metric;
                this.val$observedSize = atomicInteger;
                this.val$exceptionHolder = atomicReference;
            }

            @Override
            public void run() {
                while (!doStop.get()) {
                    try {
                        int size = ((Double)this.val$metric.metricValue()).intValue();
                        this.val$observedSize.set(size);
                    }
                    catch (Exception e) {
                        this.val$exceptionHolder.set(e);
                        return;
                    }
                }
            }
        };
        poller.start();
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        int totalPartitions = 10;
        for (int partition = 0; partition < totalPartitions; ++partition) {
            partitions.add(new TopicPartition("test1", partition));
            this.subscriptions.assignFromUser(partitions);
        }
        TestUtils.waitForCondition(() -> observedSize.get() == totalPartitions || exceptionHolder.get() != null, "Failed to observe expected assignment change");
        doStop.set(true);
        poller.join();
        Assert.assertNull((String)"Failed fetching the metric at least once", exceptionHolder.get());
    }

    @Test
    public void testCloseDynamicAssignment() {
        try (ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, true, Optional.empty());){
            this.gracefulCloseTest(coordinator, true);
        }
    }

    @Test
    public void testCloseManualAssignment() {
        try (ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(false, true, Optional.empty());){
            this.gracefulCloseTest(coordinator, false);
        }
    }

    @Test
    public void testCloseCoordinatorNotKnownManualAssignment() throws Exception {
        try (ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(false, true, Optional.empty());){
            this.makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
            this.time.sleep(2000L);
            this.closeVerifyTimeout(coordinator, 1000L, 1000L, 1000L);
        }
    }

    @Test
    public void testCloseCoordinatorNotKnownNoCommits() throws Exception {
        try (ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, false, Optional.empty());){
            this.makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
            this.closeVerifyTimeout(coordinator, 1000L, 0L, 0L);
        }
    }

    @Test
    public void testCloseCoordinatorNotKnownWithCommits() throws Exception {
        try (ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, true, Optional.empty());){
            this.makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
            this.time.sleep(2000L);
            this.closeVerifyTimeout(coordinator, 1000L, 1000L, 1000L);
        }
    }

    @Test
    public void testCloseCoordinatorUnavailableNoCommits() throws Exception {
        try (ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, false, Optional.empty());){
            this.makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
            this.closeVerifyTimeout(coordinator, 1000L, 0L, 0L);
        }
    }

    @Test
    public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception {
        try (ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, true, this.groupInstanceId);){
            this.makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
            this.time.sleep(2000L);
            this.closeVerifyTimeout(coordinator, 1000L, 1000L, 1000L);
        }
    }

    @Test
    public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception {
        try (ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, true, this.groupInstanceId);){
            this.makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
            this.time.sleep(2000L);
            this.closeVerifyTimeout(coordinator, Long.MAX_VALUE, 30000L, 30000L);
        }
    }

    @Test
    public void testCloseNoResponseForCommit() throws Exception {
        try (ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, true, this.groupInstanceId);){
            this.time.sleep(2000L);
            this.closeVerifyTimeout(coordinator, Long.MAX_VALUE, 30000L, 30000L);
        }
    }

    @Test
    public void testCloseNoResponseForLeaveGroup() throws Exception {
        try (ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, false, Optional.empty());){
            this.closeVerifyTimeout(coordinator, Long.MAX_VALUE, 30000L, 30000L);
        }
    }

    @Test
    public void testCloseNoWait() throws Exception {
        try (ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, true, this.groupInstanceId);){
            this.time.sleep(2000L);
            this.closeVerifyTimeout(coordinator, 0L, 0L, 0L);
        }
    }

    @Test
    public void testHeartbeatThreadClose() throws Exception {
        try (ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, true, this.groupInstanceId);){
            coordinator.ensureActiveGroup();
            this.time.sleep(5100L);
            Thread.yield();
            this.closeVerifyTimeout(coordinator, Long.MAX_VALUE, 30000L, 30000L);
            Thread[] threads = new Thread[Thread.activeCount()];
            int threadCount = Thread.enumerate(threads);
            for (int i = 0; i < threadCount; ++i) {
                Assert.assertFalse((String)"Heartbeat thread active after close", (boolean)threads[i].getName().contains("test-group"));
            }
        }
    }

    @Test
    public void testAutoCommitAfterCoordinatorBackToService() {
        try (ConsumerCoordinator coordinator = this.buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, true);){
            this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
            this.subscriptions.seek(this.t1p, 100L);
            coordinator.markCoordinatorUnknown("test cause");
            Assert.assertTrue((boolean)coordinator.coordinatorUnknown());
            this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
            this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
            this.time.sleep(2000L);
            coordinator.maybeAutoCommitOffsetsAsync(this.time.milliseconds());
            Assert.assertFalse((boolean)coordinator.coordinatorUnknown());
            Assert.assertEquals((long)100L, (long)this.subscriptions.position((TopicPartition)this.t1p).offset);
        }
    }

    @Test(expected=FencedInstanceIdException.class)
    public void testCommitOffsetRequestSyncWithFencedInstanceIdException() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.FENCED_INSTANCE_ID);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
    }

    @Test(expected=FencedInstanceIdException.class)
    public void testCommitOffsetRequestAsyncWithFencedInstanceIdException() {
        this.receiveFencedInstanceIdException();
    }

    @Test
    public void testCommitOffsetRequestAsyncAlwaysReceiveFencedException() {
        Assert.assertThrows(FencedInstanceIdException.class, this::receiveFencedInstanceIdException);
        Assert.assertThrows(FencedInstanceIdException.class, () -> this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), (OffsetCommitCallback)new MockCommitCallback()));
        Assert.assertThrows(FencedInstanceIdException.class, () -> this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE)));
    }

    @Test
    public void testGetGroupMetadata() {
        ConsumerGroupMetadata groupMetadata = this.coordinator.groupMetadata();
        Assert.assertNotNull((Object)groupMetadata);
        Assert.assertEquals((Object)"test-group", (Object)groupMetadata.groupId());
        Assert.assertEquals((long)-1L, (long)groupMetadata.generationId());
        Assert.assertEquals((Object)"", (Object)groupMetadata.memberId());
        Assert.assertFalse((boolean)groupMetadata.groupInstanceId().isPresent());
        try (ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, true, this.groupInstanceId);){
            coordinator.ensureActiveGroup();
            ConsumerGroupMetadata joinedGroupMetadata = coordinator.groupMetadata();
            Assert.assertNotNull((Object)joinedGroupMetadata);
            Assert.assertEquals((Object)"test-group", (Object)joinedGroupMetadata.groupId());
            Assert.assertEquals((long)1L, (long)joinedGroupMetadata.generationId());
            Assert.assertEquals((Object)"consumer", (Object)joinedGroupMetadata.memberId());
            Assert.assertEquals(this.groupInstanceId, (Object)joinedGroupMetadata.groupInstanceId());
        }
    }

    @Test
    public void shouldUpdateConsumerGroupMetadataBeforeCallbacks() {
        MockRebalanceListener rebalanceListener = new MockRebalanceListener(){

            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                Assert.assertEquals((long)2L, (long)ConsumerCoordinatorTest.this.coordinator.groupMetadata().generationId());
            }
        };
        this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)rebalanceListener);
        ByteBuffer buffer = ConsumerProtocol.serializeAssignment((ConsumerPartitionAssignor.Assignment)new ConsumerPartitionAssignor.Assignment(Collections.singletonList(this.t1p), ByteBuffer.wrap(new byte[0])));
        this.coordinator.onJoinComplete(1, "memberId", this.partitionAssignor.name(), buffer);
        buffer = ConsumerProtocol.serializeAssignment((ConsumerPartitionAssignor.Assignment)new ConsumerPartitionAssignor.Assignment(Collections.emptyList(), ByteBuffer.wrap(new byte[0])));
        this.coordinator.onJoinComplete(2, "memberId", this.partitionAssignor.name(), buffer);
    }

    @Test
    public void testConsumerPrepareJoinAndRejoinAfterFailedRebalance() {
        List<TopicPartition> partitions = Collections.singletonList(this.t1p);
        try (ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"));){
            coordinator.ensureActiveGroup();
            this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
            Assert.assertThrows(RebalanceInProgressException.class, () -> coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE)));
            Assert.assertFalse((boolean)this.client.hasPendingResponses());
            Assert.assertFalse((boolean)this.client.hasInFlightRequests());
            int generationId = 42;
            String memberId = "consumer-42";
            this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE));
            MockTime time = new MockTime(1L);
            boolean res = coordinator.joinGroupIfNeeded(time.timer(2L));
            Assert.assertFalse((boolean)res);
            Assert.assertFalse((boolean)this.client.hasPendingResponses());
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertEquals((long)generationId, (long)coordinator.generation().generationId);
            Assert.assertEquals((Object)memberId, (Object)coordinator.generation().memberId);
            coordinator.maybeLeaveGroup("Clear generation data.");
            Assert.assertEquals((Object)AbstractCoordinator.Generation.NO_GENERATION, (Object)coordinator.generation());
            this.client.respond((AbstractResponse)this.syncGroupResponse(partitions, Errors.NONE));
            res = coordinator.joinGroupIfNeeded(time.timer(1L));
            Assert.assertFalse((boolean)res);
            Assert.assertFalse((boolean)this.client.hasPendingResponses());
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            System.out.println(this.client.requests());
            this.client.respond((AbstractResponse)this.joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE));
            this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(partitions, Errors.NONE));
            res = coordinator.joinGroupIfNeeded(time.timer(3000L));
            Assert.assertTrue((boolean)res);
            Assert.assertFalse((boolean)this.client.hasPendingResponses());
            Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        }
        Collection<TopicPartition> lost = this.getLost(partitions);
        Assert.assertEquals((long)(lost.isEmpty() ? 0L : 1L), (long)this.rebalanceListener.lostCount);
        Assert.assertEquals(lost.isEmpty() ? null : lost, this.rebalanceListener.lost);
    }

    @Test
    public void testThrowOnUnsupportedStableFlag() {
        this.supportStableFlag((short)6, true);
    }

    @Test
    public void testNoThrowWhenStableFlagIsSupported() {
        this.supportStableFlag((short)7, false);
    }

    private void supportStableFlag(short upperVersion, boolean expectThrows) {
        ConsumerCoordinator coordinator = new ConsumerCoordinator(this.rebalanceConfig, new LogContext(), this.consumerClient, this.assignors, this.metadata, this.subscriptions, new Metrics((Time)this.time), "consumertest-group", (Time)this.time, false, 2000, null, true);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.client.setNodeApiVersions(NodeApiVersions.create((short)ApiKeys.OFFSET_FETCH.id, (short)0, (short)upperVersion));
        long offset = 500L;
        String metadata = "blahblah";
        Optional<Integer> leaderEpoch = Optional.of(15);
        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch, metadata, Errors.NONE);
        this.client.prepareResponse((AbstractResponse)new OffsetFetchResponse(Errors.NONE, Collections.singletonMap(this.t1p, data)));
        if (expectThrows) {
            Assert.assertThrows(UnsupportedVersionException.class, () -> coordinator.fetchCommittedOffsets(Collections.singleton(this.t1p), this.time.timer(Long.MAX_VALUE)));
        } else {
            Map fetchedOffsets = coordinator.fetchCommittedOffsets(Collections.singleton(this.t1p), this.time.timer(Long.MAX_VALUE));
            Assert.assertNotNull((Object)fetchedOffsets);
            Assert.assertEquals((Object)new OffsetAndMetadata(offset, leaderEpoch, metadata), fetchedOffsets.get(this.t1p));
        }
    }

    private void receiveFencedInstanceIdException() {
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.FENCED_INSTANCE_ID);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), (OffsetCommitCallback)new MockCommitCallback());
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
    }

    private ConsumerCoordinator prepareCoordinatorForCloseTest(boolean useGroupManagement, boolean autoCommit, Optional<String> groupInstanceId) {
        this.rebalanceConfig = this.buildRebalanceConfig(groupInstanceId);
        ConsumerCoordinator coordinator = this.buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, autoCommit);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        if (useGroupManagement) {
            this.subscriptions.subscribe(Collections.singleton("test1"), (ConsumerRebalanceListener)this.rebalanceListener);
            this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
            this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
            coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        } else {
            this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        }
        this.subscriptions.seek(this.t1p, 100L);
        coordinator.poll(this.time.timer(Long.MAX_VALUE));
        return coordinator;
    }

    private void makeCoordinatorUnknown(ConsumerCoordinator coordinator, Errors error) {
        this.time.sleep(10000L);
        coordinator.sendHeartbeatRequest();
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(error));
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)coordinator.coordinatorUnknown());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeVerifyTimeout(ConsumerCoordinator coordinator, long closeTimeoutMs, long expectedMinTimeMs, long expectedMaxTimeMs) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try {
            boolean coordinatorUnknown = coordinator.coordinatorUnknown();
            Future<?> future = executor.submit(() -> coordinator.close(this.time.timer(Math.min(closeTimeoutMs, 30000L))));
            if (!coordinatorUnknown) {
                this.client.waitForRequests(1, 1000L);
            } else {
                Thread.sleep(200L);
            }
            if (expectedMinTimeMs > 0L) {
                this.time.sleep(expectedMinTimeMs - 1L);
                try {
                    future.get(500L, TimeUnit.MILLISECONDS);
                    Assert.fail((String)"Close completed ungracefully without waiting for timeout");
                }
                catch (TimeoutException timeoutException) {
                    // empty catch block
                }
            }
            if (expectedMaxTimeMs >= 0L) {
                this.time.sleep(expectedMaxTimeMs - expectedMinTimeMs + 2L);
            }
            future.get(2000L, TimeUnit.MILLISECONDS);
        }
        finally {
            executor.shutdownNow();
        }
    }

    private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean shouldLeaveGroup) {
        AtomicBoolean commitRequested = new AtomicBoolean();
        AtomicBoolean leaveGroupRequested = new AtomicBoolean();
        this.client.prepareResponse(body -> {
            commitRequested.set(true);
            OffsetCommitRequest commitRequest = (OffsetCommitRequest)body;
            return commitRequest.data().groupId().equals("test-group");
        }, (AbstractResponse)new OffsetCommitResponse(new OffsetCommitResponseData()));
        this.client.prepareResponse(body -> {
            leaveGroupRequested.set(true);
            LeaveGroupRequest leaveRequest = (LeaveGroupRequest)body;
            return leaveRequest.data().groupId().equals("test-group");
        }, (AbstractResponse)new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
        coordinator.close();
        Assert.assertTrue((String)"Commit not requested", (boolean)commitRequested.get());
        Assert.assertEquals((String)("leaveGroupRequested should be " + shouldLeaveGroup), (Object)shouldLeaveGroup, (Object)leaveGroupRequested.get());
        if (shouldLeaveGroup) {
            Assert.assertEquals((long)1L, (long)this.rebalanceListener.revokedCount);
            Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.revoked);
        }
    }

    private ConsumerCoordinator buildCoordinator(GroupRebalanceConfig rebalanceConfig, Metrics metrics, List<ConsumerPartitionAssignor> assignors, boolean autoCommitEnabled, SubscriptionState subscriptions) {
        return new ConsumerCoordinator(rebalanceConfig, new LogContext(), this.consumerClient, assignors, this.metadata, subscriptions, metrics, "consumertest-group", (Time)this.time, autoCommitEnabled, 2000, null, false);
    }

    private ConsumerCoordinator buildCoordinator(GroupRebalanceConfig rebalanceConfig, Metrics metrics, List<ConsumerPartitionAssignor> assignors, boolean autoCommitEnabled) {
        return this.buildCoordinator(rebalanceConfig, metrics, assignors, autoCommitEnabled, this.subscriptions);
    }

    private Collection<TopicPartition> getRevoked(List<TopicPartition> owned, List<TopicPartition> assigned) {
        switch (this.protocol) {
            case EAGER: {
                return TestUtils.toSet(owned);
            }
            case COOPERATIVE: {
                ArrayList<TopicPartition> revoked = new ArrayList<TopicPartition>(owned);
                revoked.removeAll(assigned);
                return TestUtils.toSet(revoked);
            }
        }
        throw new IllegalStateException("This should not happen");
    }

    private Collection<TopicPartition> getLost(List<TopicPartition> owned) {
        switch (this.protocol) {
            case EAGER: {
                return Collections.emptySet();
            }
            case COOPERATIVE: {
                return TestUtils.toSet(owned);
            }
        }
        throw new IllegalStateException("This should not happen");
    }

    private Collection<TopicPartition> getAdded(List<TopicPartition> owned, List<TopicPartition> assigned) {
        switch (this.protocol) {
            case EAGER: {
                return TestUtils.toSet(assigned);
            }
            case COOPERATIVE: {
                ArrayList<TopicPartition> added = new ArrayList<TopicPartition>(assigned);
                added.removeAll(owned);
                return TestUtils.toSet(added);
            }
        }
        throw new IllegalStateException("This should not happen");
    }

    private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
        return FindCoordinatorResponse.prepareResponse((Errors)error, (Node)node);
    }

    private HeartbeatResponse heartbeatResponse(Errors error) {
        return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(error.code()));
    }

    private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId, Map<String, List<String>> subscriptions, Errors error) {
        ArrayList<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<JoinGroupResponseData.JoinGroupResponseMember>();
        for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) {
            ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue());
            ByteBuffer buf = ConsumerProtocol.serializeSubscription((ConsumerPartitionAssignor.Subscription)subscription);
            metadata.add(new JoinGroupResponseData.JoinGroupResponseMember().setMemberId(subscriptionEntry.getKey()).setMetadata(buf.array()));
        }
        return new JoinGroupResponse(new JoinGroupResponseData().setErrorCode(error.code()).setGenerationId(generationId).setProtocolName(this.partitionAssignor.name()).setLeader(memberId).setMemberId(memberId).setMembers(metadata));
    }

    private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
        return new JoinGroupResponse(new JoinGroupResponseData().setErrorCode(error.code()).setGenerationId(generationId).setProtocolName(this.partitionAssignor.name()).setLeader(leaderId).setMemberId(memberId).setMembers(Collections.emptyList()));
    }

    private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
        ByteBuffer buf = ConsumerProtocol.serializeAssignment((ConsumerPartitionAssignor.Assignment)new ConsumerPartitionAssignor.Assignment(partitions));
        return new SyncGroupResponse(new SyncGroupResponseData().setErrorCode(error.code()).setAssignment(Utils.toArray((ByteBuffer)buf)));
    }

    private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Errors> responseData) {
        return new OffsetCommitResponse(responseData);
    }

    private OffsetFetchResponse offsetFetchResponse(Errors topLevelError) {
        return new OffsetFetchResponse(topLevelError, Collections.emptyMap());
    }

    private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors partitionLevelError, String metadata, long offset) {
        return this.offsetFetchResponse(tp, partitionLevelError, metadata, offset, Optional.empty());
    }

    private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors partitionLevelError, String metadata, long offset, Optional<Integer> epoch) {
        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, epoch, metadata, partitionLevelError);
        return new OffsetFetchResponse(Errors.NONE, Collections.singletonMap(tp, data));
    }

    private OffsetCommitCallback callback(AtomicBoolean success) {
        return (offsets, exception) -> {
            if (exception == null) {
                success.set(true);
            }
        };
    }

    private void joinAsFollowerAndReceiveAssignment(ConsumerCoordinator coordinator, List<TopicPartition> assignment) {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(assignment, Errors.NONE));
        coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
    }

    private void prepareOffsetCommitRequest(Map<TopicPartition, Long> expectedOffsets, Errors error) {
        this.prepareOffsetCommitRequest(expectedOffsets, error, false);
    }

    private void prepareOffsetCommitRequestDisconnect(Map<TopicPartition, Long> expectedOffsets) {
        this.prepareOffsetCommitRequest(expectedOffsets, Errors.NONE, true);
    }

    private void prepareOffsetCommitRequest(Map<TopicPartition, Long> expectedOffsets, Errors error, boolean disconnected) {
        Map<TopicPartition, Errors> errors = this.partitionErrors(expectedOffsets.keySet(), error);
        this.client.prepareResponse(this.offsetCommitRequestMatcher(expectedOffsets), (AbstractResponse)this.offsetCommitResponse(errors), disconnected);
    }

    private void prepareJoinAndSyncResponse(String consumerId, int generation, List<String> subscription, List<TopicPartition> assignment) {
        this.partitionAssignor.prepare(Collections.singletonMap(consumerId, assignment));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(generation, consumerId, Collections.singletonMap(consumerId, subscription), Errors.NONE));
        this.client.prepareResponse(body -> {
            SyncGroupRequest sync = (SyncGroupRequest)body;
            return sync.data.memberId().equals(consumerId) && sync.data.generationId() == generation && sync.groupAssignments().containsKey(consumerId);
        }, (AbstractResponse)this.syncGroupResponse(assignment, Errors.NONE));
    }

    private Map<TopicPartition, Errors> partitionErrors(Collection<TopicPartition> partitions, Errors error) {
        HashMap<TopicPartition, Errors> errors = new HashMap<TopicPartition, Errors>();
        for (TopicPartition partition : partitions) {
            errors.put(partition, error);
        }
        return errors;
    }

    private void respondToOffsetCommitRequest(Map<TopicPartition, Long> expectedOffsets, Errors error) {
        Map<TopicPartition, Errors> errors = this.partitionErrors(expectedOffsets.keySet(), error);
        this.client.respond(this.offsetCommitRequestMatcher(expectedOffsets), (AbstractResponse)this.offsetCommitResponse(errors));
    }

    private MockClient.RequestMatcher offsetCommitRequestMatcher(Map<TopicPartition, Long> expectedOffsets) {
        return body -> {
            OffsetCommitRequest req = (OffsetCommitRequest)body;
            Map offsets = req.offsets();
            if (offsets.size() != expectedOffsets.size()) {
                return false;
            }
            for (Map.Entry expectedOffset : expectedOffsets.entrySet()) {
                if (!offsets.containsKey(expectedOffset.getKey())) {
                    return false;
                }
                Long actualOffset = (Long)offsets.get(expectedOffset.getKey());
                if (actualOffset.equals(expectedOffset.getValue())) continue;
                return false;
            }
            return true;
        };
    }

    private OffsetCommitCallback callback(Map<TopicPartition, OffsetAndMetadata> expectedOffsets, AtomicBoolean success) {
        return (offsets, exception) -> {
            if (expectedOffsets.equals(offsets) && exception == null) {
                success.set(true);
            }
        };
    }

    private static class MockCommitCallback
    implements OffsetCommitCallback {
        public int invoked = 0;
        public Exception exception = null;

        private MockCommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            ++this.invoked;
            this.exception = exception;
        }
    }
}

