package org.apache.kafka.clients.consumer;

import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfigTest;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.MockConsumerInterceptor;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/consumer/KafkaConsumerTest.class */
public class KafkaConsumerTest {
    private final String topic = "test";
    private final TopicPartition tp0 = new TopicPartition("test", 0);
    private final TopicPartition tp1 = new TopicPartition("test", 1);
    private final String topic2 = "test2";
    private final TopicPartition t2p0 = new TopicPartition("test2", 0);
    private final String topic3 = "test3";
    private final TopicPartition t3p0 = new TopicPartition("test3", 0);
    private final int sessionTimeoutMs = 10000;
    private final int heartbeatIntervalMs = 1000;
    private final int autoCommitIntervalMs = 500;
    private final String groupId = "mock-group";
    private final String memberId = "memberId";
    private final String leaderId = "leaderId";
    private final Optional<String> groupInstanceId = Optional.of("mock-instance");
    private final String partitionRevoked = "Hit partition revoke ";
    private final String partitionAssigned = "Hit partition assign ";
    private final String partitionLost = "Hit partition lost ";
    private final Collection<TopicPartition> singleTopicPartition = Collections.singleton(new TopicPartition("test", 0));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/KafkaConsumerTest$FetchInfo.class */
    public static class FetchInfo {
        long offset;
        int count;

        FetchInfo(long j, int i) {
            this.offset = j;
            this.count = i;
        }
    }

    @Test
    public void testMetricsReporterAutoGeneratedClientId() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty(AbstractConfigTest.TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, new StringDeserializer(), new StringDeserializer());
        Assert.assertEquals(kafkaConsumer.getClientId(), ((MockMetricsReporter) kafkaConsumer.metrics.reporters().get(0)).clientId);
        kafkaConsumer.close();
    }

    @Test
    public void testConstructorClose() {
        Properties properties = new Properties();
        properties.setProperty("client.id", "testConstructorClose");
        properties.setProperty("bootstrap.servers", "invalid-23-8409-adsfsdj");
        properties.setProperty(AbstractConfigTest.TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
        int i = MockMetricsReporter.INIT_COUNT.get();
        int i2 = MockMetricsReporter.CLOSE_COUNT.get();
        try {
            new KafkaConsumer(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
            Assert.fail("should have caught an exception and returned");
        } catch (KafkaException e) {
            Assert.assertEquals(i + 1, MockMetricsReporter.INIT_COUNT.get());
            Assert.assertEquals(i2 + 1, MockMetricsReporter.CLOSE_COUNT.get());
            Assert.assertEquals("Failed to construct kafka consumer", e.getMessage());
        }
    }

    @Test
    public void testOsDefaultSocketBufferSizes() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("send.buffer.bytes", -1);
        hashMap.put("receive.buffer.bytes", -1);
        new KafkaConsumer(hashMap, new ByteArrayDeserializer(), new ByteArrayDeserializer()).close();
    }

    @Test(expected = KafkaException.class)
    public void testInvalidSocketSendBufferSize() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("send.buffer.bytes", -2);
        new KafkaConsumer(hashMap, new ByteArrayDeserializer(), new ByteArrayDeserializer());
    }

    @Test(expected = KafkaException.class)
    public void testInvalidSocketReceiveBufferSize() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("receive.buffer.bytes", -2);
        new KafkaConsumer(hashMap, new ByteArrayDeserializer(), new ByteArrayDeserializer());
    }

    @Test
    public void shouldIgnoreGroupInstanceIdForEmptyGroupId() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("group.instance.id", "instance_id");
        new KafkaConsumer(hashMap, new ByteArrayDeserializer(), new ByteArrayDeserializer()).close();
    }

    @Test
    public void testSubscription() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer("mock-group");
        newConsumer.subscribe(Collections.singletonList("test"));
        Assert.assertEquals(Collections.singleton("test"), newConsumer.subscription());
        Assert.assertTrue(newConsumer.assignment().isEmpty());
        newConsumer.subscribe(Collections.emptyList());
        Assert.assertTrue(newConsumer.subscription().isEmpty());
        Assert.assertTrue(newConsumer.assignment().isEmpty());
        newConsumer.assign(Collections.singletonList(this.tp0));
        Assert.assertTrue(newConsumer.subscription().isEmpty());
        Assert.assertEquals(Collections.singleton(this.tp0), newConsumer.assignment());
        newConsumer.unsubscribe();
        Assert.assertTrue(newConsumer.subscription().isEmpty());
        Assert.assertTrue(newConsumer.assignment().isEmpty());
        newConsumer.close();
    }

    @Test(expected = IllegalArgumentException.class)
    public void testSubscriptionOnNullTopicCollection() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer("mock-group");
        Throwable th = null;
        try {
            newConsumer.subscribe((List) null);
            if (newConsumer != null) {
                if (0 == 0) {
                    newConsumer.close();
                    return;
                }
                try {
                    newConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (newConsumer != null) {
                if (0 != 0) {
                    try {
                        newConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testSubscriptionOnNullTopic() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer("mock-group");
        Throwable th = null;
        try {
            newConsumer.subscribe(Collections.singletonList(null));
            if (newConsumer != null) {
                if (0 == 0) {
                    newConsumer.close();
                    return;
                }
                try {
                    newConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (newConsumer != null) {
                if (0 != 0) {
                    try {
                        newConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testSubscriptionOnEmptyTopic() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer("mock-group");
        Throwable th = null;
        try {
            newConsumer.subscribe(Collections.singletonList("  "));
            if (newConsumer != null) {
                if (0 == 0) {
                    newConsumer.close();
                    return;
                }
                try {
                    newConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (newConsumer != null) {
                if (0 != 0) {
                    try {
                        newConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testSubscriptionOnNullPattern() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer("mock-group");
        Throwable th = null;
        try {
            newConsumer.subscribe((Pattern) null);
            if (newConsumer != null) {
                if (0 == 0) {
                    newConsumer.close();
                    return;
                }
                try {
                    newConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (newConsumer != null) {
                if (0 != 0) {
                    try {
                        newConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testSubscriptionOnEmptyPattern() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer("mock-group");
        Throwable th = null;
        try {
            newConsumer.subscribe(Pattern.compile(""));
            if (newConsumer != null) {
                if (0 == 0) {
                    newConsumer.close();
                    return;
                }
                try {
                    newConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (newConsumer != null) {
                if (0 != 0) {
                    try {
                        newConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testSubscriptionWithEmptyPartitionAssignment() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty("partition.assignment.strategy", "");
        properties.setProperty("group.id", "mock-group");
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer(properties);
        Throwable th = null;
        try {
            try {
                newConsumer.subscribe(Collections.singletonList("test"));
                if (newConsumer != null) {
                    if (0 == 0) {
                        newConsumer.close();
                        return;
                    }
                    try {
                        newConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (newConsumer != null) {
                if (th != null) {
                    try {
                        newConsumer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    newConsumer.close();
                }
            }
            throw th4;
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testSeekNegative() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer((String) null);
        Throwable th = null;
        try {
            newConsumer.assign(Collections.singleton(new TopicPartition("nonExistTopic", 0)));
            newConsumer.seek(new TopicPartition("nonExistTopic", 0), -1L);
            if (newConsumer != null) {
                if (0 == 0) {
                    newConsumer.close();
                    return;
                }
                try {
                    newConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (newConsumer != null) {
                if (0 != 0) {
                    try {
                        newConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testAssignOnNullTopicPartition() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer((String) null);
        Throwable th = null;
        try {
            newConsumer.assign((Collection) null);
            if (newConsumer != null) {
                if (0 == 0) {
                    newConsumer.close();
                    return;
                }
                try {
                    newConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (newConsumer != null) {
                if (0 != 0) {
                    try {
                        newConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAssignOnEmptyTopicPartition() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer("mock-group");
        Throwable th = null;
        try {
            newConsumer.assign(Collections.emptyList());
            Assert.assertTrue(newConsumer.subscription().isEmpty());
            Assert.assertTrue(newConsumer.assignment().isEmpty());
            if (newConsumer != null) {
                if (0 == 0) {
                    newConsumer.close();
                    return;
                }
                try {
                    newConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (newConsumer != null) {
                if (0 != 0) {
                    try {
                        newConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testAssignOnNullTopicInPartition() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer((String) null);
        Throwable th = null;
        try {
            try {
                newConsumer.assign(Collections.singleton(new TopicPartition((String) null, 0)));
                if (newConsumer != null) {
                    if (0 == 0) {
                        newConsumer.close();
                        return;
                    }
                    try {
                        newConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (newConsumer != null) {
                if (th != null) {
                    try {
                        newConsumer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    newConsumer.close();
                }
            }
            throw th4;
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testAssignOnEmptyTopicInPartition() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer((String) null);
        Throwable th = null;
        try {
            newConsumer.assign(Collections.singleton(new TopicPartition("  ", 0)));
            if (newConsumer != null) {
                if (0 == 0) {
                    newConsumer.close();
                    return;
                }
                try {
                    newConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (newConsumer != null) {
                if (0 != 0) {
                    try {
                        newConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testInterceptorConstructorClose() {
        try {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9999");
            properties.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, new StringDeserializer(), new StringDeserializer());
            Assert.assertEquals(1L, MockConsumerInterceptor.INIT_COUNT.get());
            Assert.assertEquals(0L, MockConsumerInterceptor.CLOSE_COUNT.get());
            kafkaConsumer.close();
            Assert.assertEquals(1L, MockConsumerInterceptor.INIT_COUNT.get());
            Assert.assertEquals(1L, MockConsumerInterceptor.CLOSE_COUNT.get());
            Assert.assertNull(MockConsumerInterceptor.CLUSTER_META.get());
        } finally {
            MockConsumerInterceptor.resetCounters();
        }
    }

    @Test
    public void testPause() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer("mock-group");
        newConsumer.assign(Collections.singletonList(this.tp0));
        Assert.assertEquals(Collections.singleton(this.tp0), newConsumer.assignment());
        Assert.assertTrue(newConsumer.paused().isEmpty());
        newConsumer.pause(Collections.singleton(this.tp0));
        Assert.assertEquals(Collections.singleton(this.tp0), newConsumer.paused());
        newConsumer.resume(Collections.singleton(this.tp0));
        Assert.assertTrue(newConsumer.paused().isEmpty());
        newConsumer.unsubscribe();
        Assert.assertTrue(newConsumer.paused().isEmpty());
        newConsumer.close();
    }

    @Test
    public void testConsumerJmxPrefix() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("send.buffer.bytes", -1);
        hashMap.put("receive.buffer.bytes", -1);
        hashMap.put("client.id", "client-1");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(hashMap, new ByteArrayDeserializer(), new ByteArrayDeserializer());
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        kafkaConsumer.metrics.addMetric(kafkaConsumer.metrics.metricName("test-metric", "grp1", "test metric"), new Avg());
        Assert.assertNotNull(platformMBeanServer.getObjectInstance(new ObjectName("kafka.consumer:type=grp1,client-id=client-1")));
        kafkaConsumer.close();
    }

    private KafkaConsumer<byte[], byte[]> newConsumer(String str) {
        return newConsumer(str, Optional.empty());
    }

    private KafkaConsumer<byte[], byte[]> newConsumer(String str, Optional<Boolean> optional) {
        Properties properties = new Properties();
        properties.setProperty("client.id", "my.consumer");
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty(AbstractConfigTest.TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
        if (str != null) {
            properties.setProperty("group.id", str);
        }
        optional.ifPresent(bool -> {
            properties.setProperty("enable.auto.commit", bool.toString());
        });
        return newConsumer(properties);
    }

    private KafkaConsumer<byte[], byte[]> newConsumer(Properties properties) {
        return new KafkaConsumer<>(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
    }

    @Test
    public void verifyHeartbeatSent() throws Exception {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RoundRobinAssignor roundRobinAssignor = new RoundRobinAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, roundRobinAssignor, true, this.groupInstanceId);
        newConsumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(newConsumer));
        Node prepareRebalance = prepareRebalance(mockClient, node, roundRobinAssignor, Collections.singletonList(this.tp0), null);
        mockClient.prepareResponseFrom(fetchResponse(this.tp0, 0L, 0), node);
        newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.singleton(this.tp0), newConsumer.assignment());
        AtomicBoolean prepareHeartbeatResponse = prepareHeartbeatResponse(mockClient, prepareRebalance, Errors.NONE);
        mockTime.sleep(1000L);
        Thread.sleep(1000L);
        newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        Assert.assertTrue(prepareHeartbeatResponse.get());
        newConsumer.close(Duration.ofMillis(0L));
    }

    @Test
    public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RoundRobinAssignor roundRobinAssignor = new RoundRobinAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, roundRobinAssignor, true, this.groupInstanceId);
        newConsumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(newConsumer));
        Node prepareRebalance = prepareRebalance(mockClient, node, roundRobinAssignor, Collections.singletonList(this.tp0), null);
        newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        newConsumer.poll(Duration.ZERO);
        mockClient.respondFrom(fetchResponse(this.tp0, 0L, 5), node);
        mockClient.poll(0L, mockTime.milliseconds());
        mockClient.prepareResponseFrom(fetchResponse(this.tp0, 5L, 0), node);
        AtomicBoolean prepareHeartbeatResponse = prepareHeartbeatResponse(mockClient, prepareRebalance, Errors.NONE);
        mockTime.sleep(1000L);
        Thread.sleep(1000L);
        newConsumer.poll(Duration.ZERO);
        Assert.assertTrue(prepareHeartbeatResponse.get());
        newConsumer.close(Duration.ofMillis(0L));
    }

    @Test
    public void verifyPollTimesOutDuringMetadataUpdate() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RoundRobinAssignor roundRobinAssignor = new RoundRobinAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, roundRobinAssignor, true, this.groupInstanceId);
        newConsumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(newConsumer));
        prepareRebalance(mockClient, node, roundRobinAssignor, Collections.singletonList(this.tp0), null);
        newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(0L, mockClient.requests().size());
    }

    @Test
    public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RoundRobinAssignor roundRobinAssignor = new RoundRobinAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, roundRobinAssignor, true, this.groupInstanceId);
        newConsumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(newConsumer));
        prepareRebalance(mockClient, node, roundRobinAssignor, Collections.singletonList(this.tp0), null);
        newConsumer.poll(0L);
        Queue<ClientRequest> requests = mockClient.requests();
        Assert.assertEquals(1L, requests.size());
        Assert.assertEquals(FetchRequest.Builder.class, requests.peek().requestBuilder().getClass());
    }

    @Test
    public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, new RoundRobinAssignor(), true, this.groupInstanceId);
        newConsumer.assign(Collections.singleton(this.tp0));
        newConsumer.seekToBeginning(Collections.singleton(this.tp0));
        mockClient.prepareResponse(listOffsetsResponse(Collections.singletonMap(this.tp0, 50L)));
        mockClient.prepareResponse(fetchResponse(this.tp0, 50L, 5));
        Assert.assertEquals(5L, newConsumer.poll(Duration.ofMillis(1L)).count());
        Assert.assertEquals(55L, newConsumer.position(this.tp0));
        newConsumer.close(Duration.ofMillis(0L));
    }

    @Test
    public void testFetchProgressWithMissingPartitionPosition() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 2));
        KafkaConsumer<String, String> newConsumerNoAutoCommit = newConsumerNoAutoCommit(mockTime, mockClient, subscriptionState, createMetadata);
        newConsumerNoAutoCommit.assign(Arrays.asList(this.tp0, this.tp1));
        newConsumerNoAutoCommit.seekToEnd(Collections.singleton(this.tp0));
        newConsumerNoAutoCommit.seekToBeginning(Collections.singleton(this.tp1));
        mockClient.prepareResponse(abstractRequest -> {
            Map partitionTimestamps = ((ListOffsetRequest) abstractRequest).partitionTimestamps();
            return ((ListOffsetRequest.PartitionData) partitionTimestamps.get(this.tp0)).timestamp == -1 && ((ListOffsetRequest.PartitionData) partitionTimestamps.get(this.tp1)).timestamp == -2;
        }, (AbstractResponse) listOffsetsResponse(Collections.singletonMap(this.tp0, 50L), Collections.singletonMap(this.tp1, Errors.NOT_LEADER_OR_FOLLOWER)));
        mockClient.prepareResponse(abstractRequest2 -> {
            FetchRequest fetchRequest = (FetchRequest) abstractRequest2;
            return fetchRequest.fetchData().keySet().equals(Collections.singleton(this.tp0)) && ((FetchRequest.PartitionData) fetchRequest.fetchData().get(this.tp0)).fetchOffset == 50;
        }, (AbstractResponse) fetchResponse(this.tp0, 50L, 5));
        ConsumerRecords poll = newConsumerNoAutoCommit.poll(Duration.ofMillis(1L));
        Assert.assertEquals(5L, poll.count());
        Assert.assertEquals(Collections.singleton(this.tp0), poll.partitions());
    }

    private void initMetadata(MockClient mockClient, Map<String, Integer> map) {
        mockClient.updateMetadata(TestUtils.metadataUpdateWith(1, map));
    }

    @Test(expected = NoOffsetForPartitionException.class)
    public void testMissingOffsetNoResetPolicy() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, new RoundRobinAssignor(), true, "mock-group", this.groupInstanceId, false);
        newConsumer.assign(Collections.singletonList(this.tp0));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
        mockClient.prepareResponseFrom(offsetResponse(Collections.singletonMap(this.tp0, -1L), Errors.NONE), new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()));
        newConsumer.poll(Duration.ZERO);
    }

    @Test
    public void testResetToCommittedOffset() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, new RoundRobinAssignor(), true, "mock-group", this.groupInstanceId, false);
        newConsumer.assign(Collections.singletonList(this.tp0));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
        mockClient.prepareResponseFrom(offsetResponse(Collections.singletonMap(this.tp0, 539L), Errors.NONE), new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()));
        newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(539L, newConsumer.position(this.tp0));
    }

    @Test
    public void testResetUsingAutoResetPolicy() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, new RoundRobinAssignor(), true, "mock-group", this.groupInstanceId, false);
        newConsumer.assign(Collections.singletonList(this.tp0));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
        mockClient.prepareResponseFrom(offsetResponse(Collections.singletonMap(this.tp0, -1L), Errors.NONE), new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()));
        mockClient.prepareResponse(listOffsetsResponse(Collections.singletonMap(this.tp0, 50L)));
        newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(50L, newConsumer.position(this.tp0));
    }

    @Test
    public void testOffsetIsValidAfterSeek() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, new RoundRobinAssignor(), true, "mock-group", Optional.empty(), false);
        newConsumer.assign(Collections.singletonList(this.tp0));
        newConsumer.seek(this.tp0, 20L);
        newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(subscriptionState.validPosition(this.tp0).offset, 20L);
    }

    @Test
    public void testCommitsFetchedDuringAssign() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 2));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, new RoundRobinAssignor(), true, this.groupInstanceId);
        newConsumer.assign(Collections.singletonList(this.tp0));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
        Node node2 = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        mockClient.prepareResponseFrom(offsetResponse(Collections.singletonMap(this.tp0, 10000L), Errors.NONE), node2);
        Assert.assertEquals(10000L, ((OffsetAndMetadata) newConsumer.committed(Collections.singleton(this.tp0)).get(this.tp0)).offset());
        newConsumer.assign(Arrays.asList(this.tp0, this.tp1));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, 10000L);
        mockClient.prepareResponseFrom(offsetResponse(hashMap, Errors.NONE), node2);
        Assert.assertEquals(10000L, ((OffsetAndMetadata) newConsumer.committed(Collections.singleton(this.tp0)).get(this.tp0)).offset());
        hashMap.remove(this.tp0);
        hashMap.put(this.tp1, 20000L);
        mockClient.prepareResponseFrom(offsetResponse(hashMap, Errors.NONE), node2);
        Assert.assertEquals(20000L, ((OffsetAndMetadata) newConsumer.committed(Collections.singleton(this.tp1)).get(this.tp1)).offset());
        newConsumer.close(Duration.ofMillis(0L));
    }

    @Test
    public void testFetchStableOffsetThrowInCommitted() {
        Assert.assertThrows(UnsupportedVersionException.class, () -> {
            setupThrowableConsumer().committed(Collections.singleton(this.tp0));
        });
    }

    @Test
    public void testFetchStableOffsetThrowInPoll() {
        Assert.assertThrows(UnsupportedVersionException.class, () -> {
            setupThrowableConsumer().poll(Duration.ZERO);
        });
    }

    @Test
    public void testFetchStableOffsetThrowInPosition() {
        Assert.assertThrows(UnsupportedVersionException.class, () -> {
            setupThrowableConsumer().position(this.tp0);
        });
    }

    private KafkaConsumer<String, String> setupThrowableConsumer() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 2));
        mockClient.setNodeApiVersions(NodeApiVersions.create(ApiKeys.OFFSET_FETCH.id, (short) 0, (short) 6));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, new RoundRobinAssignor(), true, "mock-group", this.groupInstanceId, true);
        newConsumer.assign(Collections.singletonList(this.tp0));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
        mockClient.prepareResponseFrom(offsetResponse(Collections.singletonMap(this.tp0, 10000L), Errors.NONE), new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()));
        return newConsumer;
    }

    @Test
    public void testNoCommittedOffsets() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 2));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, new RoundRobinAssignor(), true, this.groupInstanceId);
        newConsumer.assign(Arrays.asList(this.tp0, this.tp1));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
        mockClient.prepareResponseFrom(offsetResponse(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.tp0, 10000L), Utils.mkEntry(this.tp1, -1L)}), Errors.NONE), new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()));
        Map committed = newConsumer.committed(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
        Assert.assertEquals(2L, committed.size());
        Assert.assertEquals(10000L, ((OffsetAndMetadata) committed.get(this.tp0)).offset());
        Assert.assertNull(committed.get(this.tp1));
        newConsumer.close(Duration.ofMillis(0L));
    }

    @Test
    public void testAutoCommitSentBeforePositionUpdate() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RoundRobinAssignor roundRobinAssignor = new RoundRobinAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, roundRobinAssignor, true, this.groupInstanceId);
        newConsumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(newConsumer));
        Node prepareRebalance = prepareRebalance(mockClient, node, roundRobinAssignor, Collections.singletonList(this.tp0), null);
        newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        newConsumer.poll(Duration.ZERO);
        mockClient.respondFrom(fetchResponse(this.tp0, 0L, 5), node);
        mockClient.poll(0L, mockTime.milliseconds());
        mockTime.sleep(500L);
        mockClient.prepareResponseFrom(fetchResponse(this.tp0, 5L, 0), node);
        AtomicBoolean prepareOffsetCommitResponse = prepareOffsetCommitResponse(mockClient, prepareRebalance, this.tp0, 0L);
        newConsumer.poll(Duration.ZERO);
        Assert.assertTrue(prepareOffsetCommitResponse.get());
        newConsumer.close(Duration.ofMillis(0L));
    }

    @Test
    public void testRegexSubscription() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        HashMap hashMap = new HashMap();
        hashMap.put("test", 1);
        hashMap.put("unmatched", 1);
        initMetadata(mockClient, hashMap);
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RoundRobinAssignor roundRobinAssignor = new RoundRobinAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, roundRobinAssignor, true, this.groupInstanceId);
        prepareRebalance(mockClient, node, Collections.singleton("test"), roundRobinAssignor, Collections.singletonList(this.tp0), null);
        newConsumer.subscribe(Pattern.compile("test"), getConsumerRebalanceListener(newConsumer));
        mockClient.prepareMetadataUpdate(TestUtils.metadataUpdateWith(1, hashMap));
        newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.singleton("test"), newConsumer.subscription());
        Assert.assertEquals(Collections.singleton(this.tp0), newConsumer.assignment());
        newConsumer.close(Duration.ofMillis(0L));
    }

    @Test
    public void testChangingRegexSubscription() {
        RoundRobinAssignor roundRobinAssignor = new RoundRobinAssignor();
        TopicPartition topicPartition = new TopicPartition("other", 0);
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        HashMap hashMap = new HashMap();
        hashMap.put("test", 1);
        hashMap.put("other", 1);
        initMetadata(mockClient, hashMap);
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, roundRobinAssignor, false, this.groupInstanceId);
        Node prepareRebalance = prepareRebalance(mockClient, node, Collections.singleton("test"), roundRobinAssignor, Collections.singletonList(this.tp0), null);
        newConsumer.subscribe(Pattern.compile("test"), getConsumerRebalanceListener(newConsumer));
        newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(Collections.singleton("test"), newConsumer.subscription());
        newConsumer.subscribe(Pattern.compile("other"), getConsumerRebalanceListener(newConsumer));
        mockClient.prepareMetadataUpdate(TestUtils.metadataUpdateWith(1, hashMap));
        prepareRebalance(mockClient, node, Collections.singleton("other"), roundRobinAssignor, Collections.singletonList(topicPartition), prepareRebalance);
        newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(Collections.singleton("other"), newConsumer.subscription());
        newConsumer.close(Duration.ofMillis(0L));
    }

    @Test
    public void testWakeupWithFetchDataAvailable() throws Exception {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RoundRobinAssignor roundRobinAssignor = new RoundRobinAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, roundRobinAssignor, true, this.groupInstanceId);
        newConsumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(newConsumer));
        prepareRebalance(mockClient, node, roundRobinAssignor, Collections.singletonList(this.tp0), null);
        newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        newConsumer.poll(Duration.ZERO);
        mockClient.respondFrom(fetchResponse(this.tp0, 0L, 5), node);
        mockClient.poll(0L, mockTime.milliseconds());
        newConsumer.wakeup();
        Assert.assertThrows(WakeupException.class, () -> {
            newConsumer.poll(Duration.ZERO);
        });
        Assert.assertEquals(0L, newConsumer.position(this.tp0));
        Assert.assertEquals(5L, newConsumer.poll(Duration.ZERO).count());
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        newSingleThreadScheduledExecutor.scheduleAtFixedRate(() -> {
            mockTime.sleep(10000L);
        }, 0L, 10L, TimeUnit.MILLISECONDS);
        newConsumer.close();
        newSingleThreadScheduledExecutor.shutdownNow();
        newSingleThreadScheduledExecutor.awaitTermination(5L, TimeUnit.SECONDS);
    }

    @Test
    public void testPollThrowsInterruptExceptionIfInterrupted() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RoundRobinAssignor roundRobinAssignor = new RoundRobinAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, roundRobinAssignor, false, this.groupInstanceId);
        newConsumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(newConsumer));
        prepareRebalance(mockClient, node, roundRobinAssignor, Collections.singletonList(this.tp0), null);
        newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        newConsumer.poll(Duration.ZERO);
        try {
            Thread.currentThread().interrupt();
            Assert.assertThrows(InterruptException.class, () -> {
                newConsumer.poll(Duration.ZERO);
            });
            Thread.interrupted();
            newConsumer.close(Duration.ofMillis(0L));
        } catch (Throwable th) {
            Thread.interrupted();
            newConsumer.close(Duration.ofMillis(0L));
            throw th;
        }
    }

    @Test
    public void fetchResponseWithUnexpectedPartitionIsIgnored() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RangeAssignor rangeAssignor = new RangeAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, rangeAssignor, true, this.groupInstanceId);
        newConsumer.subscribe(Collections.singletonList("test"), getConsumerRebalanceListener(newConsumer));
        prepareRebalance(mockClient, node, rangeAssignor, Collections.singletonList(this.tp0), null);
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new FetchInfo(0L, 1));
        hashMap.put(this.t2p0, new FetchInfo(0L, 10));
        mockClient.prepareResponseFrom(fetchResponse(hashMap), node);
        newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        Assert.assertEquals(0L, newConsumer.poll(Duration.ZERO).count());
        newConsumer.close(Duration.ofMillis(0L));
    }

    @Test
    public void testSubscriptionChangesWithAutoCommitEnabled() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        HashMap hashMap = new HashMap();
        hashMap.put("test", 1);
        hashMap.put("test2", 1);
        hashMap.put("test3", 1);
        initMetadata(mockClient, hashMap);
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RangeAssignor rangeAssignor = new RangeAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, rangeAssignor, true, this.groupInstanceId);
        newConsumer.subscribe(Arrays.asList("test", "test2"), getConsumerRebalanceListener(newConsumer));
        Assert.assertEquals(2L, newConsumer.subscription().size());
        Assert.assertTrue(newConsumer.subscription().contains("test") && newConsumer.subscription().contains("test2"));
        Assert.assertTrue(newConsumer.assignment().isEmpty());
        Node prepareRebalance = prepareRebalance(mockClient, node, rangeAssignor, Arrays.asList(this.tp0, this.t2p0), null);
        newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(2L, newConsumer.subscription().size());
        Assert.assertTrue(newConsumer.subscription().contains("test") && newConsumer.subscription().contains("test2"));
        Assert.assertEquals(2L, newConsumer.assignment().size());
        Assert.assertTrue(newConsumer.assignment().contains(this.tp0) && newConsumer.assignment().contains(this.t2p0));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, new FetchInfo(0L, 1));
        hashMap2.put(this.t2p0, new FetchInfo(0L, 10));
        mockClient.respondFrom(fetchResponse(hashMap2), node);
        mockClient.poll(0L, mockTime.milliseconds());
        ConsumerRecords poll = newConsumer.poll(Duration.ofMillis(1L));
        hashMap2.put(this.tp0, new FetchInfo(1L, 0));
        hashMap2.put(this.t2p0, new FetchInfo(10L, 0));
        mockClient.respondFrom(fetchResponse(hashMap2), node);
        mockClient.poll(0L, mockTime.milliseconds());
        Assert.assertEquals(11L, poll.count());
        Assert.assertEquals(1L, newConsumer.position(this.tp0));
        Assert.assertEquals(10L, newConsumer.position(this.t2p0));
        newConsumer.subscribe(Arrays.asList("test", "test3"), getConsumerRebalanceListener(newConsumer));
        Assert.assertEquals(2L, newConsumer.subscription().size());
        Assert.assertTrue(newConsumer.subscription().contains("test") && newConsumer.subscription().contains("test3"));
        Assert.assertEquals(2L, newConsumer.assignment().size());
        Assert.assertTrue(newConsumer.assignment().contains(this.tp0) && newConsumer.assignment().contains(this.t2p0));
        HashMap hashMap3 = new HashMap();
        hashMap3.put(this.tp0, 1L);
        hashMap3.put(this.t2p0, 10L);
        AtomicBoolean prepareOffsetCommitResponse = prepareOffsetCommitResponse(mockClient, prepareRebalance, hashMap3);
        prepareRebalance(mockClient, node, rangeAssignor, Arrays.asList(this.tp0, this.t3p0), prepareRebalance);
        HashMap hashMap4 = new HashMap();
        hashMap4.put(this.tp0, new FetchInfo(1L, 1));
        hashMap4.put(this.t3p0, new FetchInfo(0L, 100));
        mockClient.prepareResponse(fetchResponse(hashMap4));
        Assert.assertEquals(101L, newConsumer.poll(Duration.ofMillis(1L)).count());
        Assert.assertEquals(2L, newConsumer.position(this.tp0));
        Assert.assertEquals(100L, newConsumer.position(this.t3p0));
        Assert.assertTrue(prepareOffsetCommitResponse.get());
        Assert.assertEquals(2L, newConsumer.subscription().size());
        Assert.assertTrue(newConsumer.subscription().contains("test") && newConsumer.subscription().contains("test3"));
        Assert.assertEquals(2L, newConsumer.assignment().size());
        Assert.assertTrue(newConsumer.assignment().contains(this.tp0) && newConsumer.assignment().contains(this.t3p0));
        newConsumer.unsubscribe();
        Assert.assertTrue(newConsumer.subscription().isEmpty());
        Assert.assertTrue(newConsumer.assignment().isEmpty());
        mockClient.requests().clear();
        newConsumer.close();
    }

    @Test
    public void testSubscriptionChangesWithAutoCommitDisabled() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        HashMap hashMap = new HashMap();
        hashMap.put("test", 1);
        hashMap.put("test2", 1);
        initMetadata(mockClient, hashMap);
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RangeAssignor rangeAssignor = new RangeAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, rangeAssignor, false, this.groupInstanceId);
        initializeSubscriptionWithSingleTopic(newConsumer, getConsumerRebalanceListener(newConsumer));
        prepareRebalance(mockClient, node, rangeAssignor, Collections.singletonList(this.tp0), null);
        newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(Collections.singleton("test"), newConsumer.subscription());
        Assert.assertEquals(Collections.singleton(this.tp0), newConsumer.assignment());
        newConsumer.poll(Duration.ZERO);
        newConsumer.subscribe(Collections.singleton("test2"), getConsumerRebalanceListener(newConsumer));
        Assert.assertEquals(Collections.singleton("test2"), newConsumer.subscription());
        Assert.assertEquals(Collections.singleton(this.tp0), newConsumer.assignment());
        Iterator<ClientRequest> it = mockClient.requests().iterator();
        while (it.hasNext()) {
            Assert.assertNotSame(ApiKeys.OFFSET_COMMIT, it.next().requestBuilder().apiKey());
        }
        newConsumer.unsubscribe();
        Assert.assertEquals(Collections.emptySet(), newConsumer.subscription());
        Assert.assertEquals(Collections.emptySet(), newConsumer.assignment());
        Iterator<ClientRequest> it2 = mockClient.requests().iterator();
        while (it2.hasNext()) {
            Assert.assertNotSame(ApiKeys.OFFSET_COMMIT, it2.next().requestBuilder().apiKey());
        }
        mockClient.requests().clear();
        newConsumer.close();
    }

    @Test
    public void testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        CooperativeStickyAssignor cooperativeStickyAssignor = new CooperativeStickyAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, cooperativeStickyAssignor, false, this.groupInstanceId);
        initializeSubscriptionWithSingleTopic(newConsumer, getExceptionConsumerRebalanceListener());
        prepareRebalance(mockClient, node, cooperativeStickyAssignor, Collections.singletonList(this.tp0), null);
        Assert.assertEquals("Hit partition assign " + this.singleTopicPartition, ((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        })).getCause().getMessage());
        newConsumer.getClass();
        Assert.assertEquals("Hit partition revoke " + this.singleTopicPartition, ((RuntimeException) Assert.assertThrows(RuntimeException.class, newConsumer::unsubscribe)).getCause().getMessage());
    }

    @Test
    public void testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration() throws Exception {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        CooperativeStickyAssignor cooperativeStickyAssignor = new CooperativeStickyAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, cooperativeStickyAssignor, false, this.groupInstanceId);
        initializeSubscriptionWithSingleTopic(newConsumer, getExceptionConsumerRebalanceListener());
        Node prepareRebalance = prepareRebalance(mockClient, node, cooperativeStickyAssignor, Collections.singletonList(this.tp0), null);
        Assert.assertEquals("Hit partition assign " + this.singleTopicPartition, ((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        })).getCause().getMessage());
        AtomicBoolean prepareHeartbeatResponse = prepareHeartbeatResponse(mockClient, prepareRebalance, Errors.UNKNOWN_MEMBER_ID);
        mockTime.sleep(1000L);
        prepareHeartbeatResponse.getClass();
        TestUtils.waitForCondition(prepareHeartbeatResponse::get, "Heartbeat response did not occur within timeout.");
        newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        Assert.assertTrue(prepareHeartbeatResponse.get());
        newConsumer.getClass();
        Assert.assertEquals("Hit partition lost " + this.singleTopicPartition, ((RuntimeException) Assert.assertThrows(RuntimeException.class, newConsumer::unsubscribe)).getCause().getMessage());
    }

    private void initializeSubscriptionWithSingleTopic(KafkaConsumer<String, String> kafkaConsumer, ConsumerRebalanceListener consumerRebalanceListener) {
        kafkaConsumer.subscribe(Collections.singleton("test"), consumerRebalanceListener);
        Assert.assertEquals(Collections.singleton("test"), kafkaConsumer.subscription());
        Assert.assertEquals(Collections.emptySet(), kafkaConsumer.assignment());
    }

    @Test
    public void testManualAssignmentChangeWithAutoCommitEnabled() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        HashMap hashMap = new HashMap();
        hashMap.put("test", 1);
        hashMap.put("test2", 1);
        initMetadata(mockClient, hashMap);
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, new RangeAssignor(), true, this.groupInstanceId);
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
        Node node2 = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        newConsumer.assign(Collections.singleton(this.tp0));
        newConsumer.seekToBeginning(Collections.singleton(this.tp0));
        mockClient.prepareResponseFrom(offsetResponse(Collections.singletonMap(this.tp0, 0L), Errors.NONE), node2);
        Assert.assertEquals(0L, ((OffsetAndMetadata) newConsumer.committed(Collections.singleton(this.tp0)).get(this.tp0)).offset());
        Assert.assertEquals(newConsumer.assignment(), Collections.singleton(this.tp0));
        mockClient.prepareResponse(listOffsetsResponse(Collections.singletonMap(this.tp0, 10L)));
        mockClient.prepareResponse(fetchResponse(this.tp0, 10L, 1));
        Assert.assertEquals(1L, newConsumer.poll(Duration.ofMillis(1L)).count());
        Assert.assertEquals(11L, newConsumer.position(this.tp0));
        AtomicBoolean prepareOffsetCommitResponse = prepareOffsetCommitResponse(mockClient, node2, this.tp0, 11L);
        newConsumer.assign(Collections.singleton(this.t2p0));
        Assert.assertEquals(newConsumer.assignment(), Collections.singleton(this.t2p0));
        Assert.assertTrue(prepareOffsetCommitResponse.get());
        mockClient.requests().clear();
        newConsumer.close();
    }

    @Test
    public void testManualAssignmentChangeWithAutoCommitDisabled() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        HashMap hashMap = new HashMap();
        hashMap.put("test", 1);
        hashMap.put("test2", 1);
        initMetadata(mockClient, hashMap);
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, new RangeAssignor(), false, this.groupInstanceId);
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
        Node node2 = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        newConsumer.assign(Collections.singleton(this.tp0));
        newConsumer.seekToBeginning(Collections.singleton(this.tp0));
        mockClient.prepareResponseFrom(offsetResponse(Collections.singletonMap(this.tp0, 0L), Errors.NONE), node2);
        Assert.assertEquals(0L, ((OffsetAndMetadata) newConsumer.committed(Collections.singleton(this.tp0)).get(this.tp0)).offset());
        Assert.assertEquals(newConsumer.assignment(), Collections.singleton(this.tp0));
        mockClient.prepareResponse(listOffsetsResponse(Collections.singletonMap(this.tp0, 10L)));
        mockClient.prepareResponse(fetchResponse(this.tp0, 10L, 1));
        Assert.assertEquals(1L, newConsumer.poll(Duration.ofMillis(1L)).count());
        Assert.assertEquals(11L, newConsumer.position(this.tp0));
        newConsumer.assign(Collections.singleton(this.t2p0));
        Assert.assertEquals(newConsumer.assignment(), Collections.singleton(this.t2p0));
        Iterator<ClientRequest> it = mockClient.requests().iterator();
        while (it.hasNext()) {
            Assert.assertNotSame(it.next().requestBuilder().apiKey(), ApiKeys.OFFSET_COMMIT);
        }
        mockClient.requests().clear();
        newConsumer.close();
    }

    @Test
    public void testOffsetOfPausedPartitions() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 2));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, new RangeAssignor(), true, this.groupInstanceId);
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
        Node node2 = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        Set mkSet = Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1});
        newConsumer.assign(mkSet);
        Assert.assertEquals(mkSet, newConsumer.assignment());
        newConsumer.pause(mkSet);
        newConsumer.seekToEnd(mkSet);
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, 0L);
        hashMap.put(this.tp1, 0L);
        mockClient.prepareResponseFrom(offsetResponse(hashMap, Errors.NONE), node2);
        Assert.assertEquals(0L, ((OffsetAndMetadata) newConsumer.committed(Collections.singleton(this.tp0)).get(this.tp0)).offset());
        hashMap.remove(this.tp0);
        hashMap.put(this.tp1, 0L);
        mockClient.prepareResponseFrom(offsetResponse(hashMap, Errors.NONE), node2);
        Assert.assertEquals(0L, ((OffsetAndMetadata) newConsumer.committed(Collections.singleton(this.tp1)).get(this.tp1)).offset());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, 3L);
        hashMap2.put(this.tp1, 3L);
        mockClient.prepareResponse(listOffsetsResponse(hashMap2));
        Assert.assertEquals(3L, newConsumer.position(this.tp0));
        Assert.assertEquals(3L, newConsumer.position(this.tp1));
        mockClient.requests().clear();
        newConsumer.unsubscribe();
        newConsumer.close();
    }

    @Test(expected = IllegalStateException.class)
    public void testPollWithNoSubscription() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer((String) null);
        Throwable th = null;
        try {
            newConsumer.poll(Duration.ZERO);
            if (newConsumer != null) {
                if (0 == 0) {
                    newConsumer.close();
                    return;
                }
                try {
                    newConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (newConsumer != null) {
                if (0 != 0) {
                    try {
                        newConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testPollWithEmptySubscription() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer("mock-group");
        Throwable th = null;
        try {
            newConsumer.subscribe(Collections.emptyList());
            newConsumer.poll(Duration.ZERO);
            if (newConsumer != null) {
                if (0 == 0) {
                    newConsumer.close();
                    return;
                }
                try {
                    newConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (newConsumer != null) {
                if (0 != 0) {
                    try {
                        newConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testPollWithEmptyUserAssignment() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer("mock-group");
        Throwable th = null;
        try {
            newConsumer.assign(Collections.emptySet());
            newConsumer.poll(Duration.ZERO);
            if (newConsumer != null) {
                if (0 == 0) {
                    newConsumer.close();
                    return;
                }
                try {
                    newConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (newConsumer != null) {
                if (0 != 0) {
                    try {
                        newConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testGracefulClose() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, Errors.NONE);
        consumerCloseTest(5000L, Arrays.asList(offsetCommitResponse(hashMap), new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()))), 0L, false);
    }

    @Test
    public void testCloseTimeout() throws Exception {
        consumerCloseTest(5000L, Collections.emptyList(), 5000L, false);
    }

    @Test
    public void testLeaveGroupTimeout() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, Errors.NONE);
        consumerCloseTest(5000L, Collections.singletonList(offsetCommitResponse(hashMap)), 5000L, false);
    }

    @Test
    public void testCloseNoWait() throws Exception {
        consumerCloseTest(0L, Collections.emptyList(), 0L, false);
    }

    @Test
    public void testCloseInterrupt() throws Exception {
        consumerCloseTest(Long.MAX_VALUE, Collections.emptyList(), 0L, true);
    }

    @Test
    public void testCloseShouldBeIdempotent() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer((String) null);
        newConsumer.close();
        newConsumer.close();
        newConsumer.close();
    }

    @Test
    public void testOperationsBySubscribingConsumerWithDefaultGroupId() {
        try {
            newConsumer(null, Optional.of(Boolean.TRUE));
            Assert.fail("Expected an InvalidConfigurationException");
        } catch (KafkaException e) {
            Assert.assertEquals(InvalidConfigurationException.class, e.getCause().getClass());
        }
        try {
            newConsumer((String) null).subscribe(Collections.singleton("test"));
            Assert.fail("Expected an InvalidGroupIdException");
        } catch (InvalidGroupIdException e2) {
        }
        try {
            newConsumer((String) null).committed(Collections.singleton(this.tp0)).get(this.tp0);
            Assert.fail("Expected an InvalidGroupIdException");
        } catch (InvalidGroupIdException e3) {
        }
        try {
            newConsumer((String) null).commitAsync();
            Assert.fail("Expected an InvalidGroupIdException");
        } catch (InvalidGroupIdException e4) {
        }
        try {
            newConsumer((String) null).commitSync();
            Assert.fail("Expected an InvalidGroupIdException");
        } catch (InvalidGroupIdException e5) {
        }
    }

    @Test
    public void testOperationsByAssigningConsumerWithDefaultGroupId() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer((String) null);
        newConsumer.assign(Collections.singleton(this.tp0));
        try {
            newConsumer.committed(Collections.singleton(this.tp0)).get(this.tp0);
            Assert.fail("Expected an InvalidGroupIdException");
        } catch (InvalidGroupIdException e) {
        }
        try {
            newConsumer.commitAsync();
            Assert.fail("Expected an InvalidGroupIdException");
        } catch (InvalidGroupIdException e2) {
        }
        try {
            newConsumer.commitSync();
            Assert.fail("Expected an InvalidGroupIdException");
        } catch (InvalidGroupIdException e3) {
        }
    }

    @Test
    public void testMetricConfigRecordingLevel() {
        Throwable th;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9000");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
        Throwable th2 = null;
        try {
            try {
                Assert.assertEquals(Sensor.RecordingLevel.INFO, kafkaConsumer.metrics.config().recordLevel());
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                properties.put("metrics.recording.level", "DEBUG");
                kafkaConsumer = new KafkaConsumer(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    Assert.assertEquals(Sensor.RecordingLevel.DEBUG, kafkaConsumer.metrics.config().recordLevel());
                    if (kafkaConsumer != null) {
                        if (0 == 0) {
                            kafkaConsumer.close();
                            return;
                        }
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testShouldAttemptToRejoinGroupAfterSyncGroupFailed() throws Exception {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RoundRobinAssignor roundRobinAssignor = new RoundRobinAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, roundRobinAssignor, false, this.groupInstanceId);
        newConsumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(newConsumer));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
        Node node2 = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        mockClient.prepareResponseFrom(joinGroupFollowerResponse(roundRobinAssignor, 1, "memberId", "leaderId", Errors.NONE), node2);
        mockClient.prepareResponseFrom(syncGroupResponse(Collections.singletonList(this.tp0), Errors.NONE), node2);
        mockClient.prepareResponseFrom(fetchResponse(this.tp0, 0L, 1), node);
        mockClient.prepareResponseFrom(fetchResponse(this.tp0, 1L, 0), node);
        newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        newConsumer.poll(Duration.ZERO);
        mockClient.prepareResponseFrom(abstractRequest -> {
            return true;
        }, (AbstractResponse) new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())), node2);
        mockClient.prepareResponseFrom(new JoinGroupResponse(new JoinGroupResponseData().setErrorCode(Errors.NONE.code()).setGenerationId(1).setProtocolName(roundRobinAssignor.name()).setLeader("memberId").setMemberId("memberId").setMembers(Collections.singletonList(new JoinGroupResponseData.JoinGroupResponseMember().setMemberId("memberId").setMetadata(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(Collections.singletonList("test"))).array())))), node2);
        mockClient.prepareResponseFrom((AbstractResponse) syncGroupResponse(Collections.singletonList(this.tp0), Errors.NONE), node2, true);
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
        mockClient.prepareResponseFrom(joinGroupFollowerResponse(roundRobinAssignor, 1, "memberId", "leaderId", Errors.NONE), node2);
        mockClient.prepareResponseFrom(syncGroupResponse(Collections.singletonList(this.tp0), Errors.NONE), node2);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            return (abstractRequest2 instanceof FetchRequest) && ((FetchRequest) abstractRequest2).fetchData().containsKey(this.tp0);
        }, (AbstractResponse) fetchResponse(this.tp0, 1L, 1), node);
        mockTime.sleep(1000L);
        Thread.sleep(1000L);
        newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        Assert.assertFalse(newConsumer.poll(Duration.ZERO).isEmpty());
        newConsumer.close(Duration.ofMillis(0L));
    }

    private void consumerCloseTest(long j, List<? extends AbstractResponse> list, long j2, boolean z) throws Exception {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RoundRobinAssignor roundRobinAssignor = new RoundRobinAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, roundRobinAssignor, false, Optional.empty());
        newConsumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(newConsumer));
        Node prepareRebalance = prepareRebalance(mockClient, node, roundRobinAssignor, Collections.singletonList(this.tp0), null);
        mockClient.prepareMetadataUpdate(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 1)));
        newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        mockClient.prepareResponseFrom(fetchResponse(this.tp0, 0L, 1), node);
        mockClient.prepareResponseFrom(fetchResponse(this.tp0, 1L, 0), node);
        newConsumer.poll(Duration.ZERO);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        AtomicReference atomicReference = new AtomicReference();
        try {
            Future<?> submit = newSingleThreadExecutor.submit(() -> {
                newConsumer.commitAsync();
                try {
                    newConsumer.close(Duration.ofMillis(j));
                } catch (Exception e) {
                    atomicReference.set(e);
                }
            });
            try {
                submit.get(100L, TimeUnit.MILLISECONDS);
                if (j != 0) {
                    Assert.fail("Close completed without waiting for commit or leave response");
                }
            } catch (TimeoutException e) {
            }
            mockClient.waitForRequests(2, 1000L);
            for (int i = 0; i < list.size(); i++) {
                mockClient.waitForRequests(1, 1000L);
                mockClient.respondFrom(list.get(i), prepareRebalance);
                if (i != list.size() - 1) {
                    try {
                        submit.get(100L, TimeUnit.MILLISECONDS);
                        Assert.fail("Close completed without waiting for response");
                    } catch (TimeoutException e2) {
                    }
                }
            }
            if (j2 > 0) {
                mockTime.sleep(j2);
            }
            if (z) {
                Assert.assertTrue("Close terminated prematurely", submit.cancel(true));
                TestUtils.waitForCondition(() -> {
                    return atomicReference.get() != null;
                }, "InterruptException did not occur within timeout.");
                Assert.assertTrue("Expected exception not thrown " + atomicReference, atomicReference.get() instanceof InterruptException);
            } else {
                submit.get(500L, TimeUnit.MILLISECONDS);
                Assert.assertNull("Unexpected exception during close", atomicReference.get());
            }
        } finally {
            newSingleThreadExecutor.shutdownNow();
        }
    }

    @Test(expected = AuthenticationException.class)
    public void testPartitionsForAuthenticationFailure() {
        consumerWithPendingAuthenticationError().partitionsFor("some other topic");
    }

    @Test(expected = AuthenticationException.class)
    public void testBeginningOffsetsAuthenticationFailure() {
        consumerWithPendingAuthenticationError().beginningOffsets(Collections.singleton(this.tp0));
    }

    @Test(expected = AuthenticationException.class)
    public void testEndOffsetsAuthenticationFailure() {
        consumerWithPendingAuthenticationError().endOffsets(Collections.singleton(this.tp0));
    }

    @Test(expected = AuthenticationException.class)
    public void testPollAuthenticationFailure() {
        KafkaConsumer<String, String> consumerWithPendingAuthenticationError = consumerWithPendingAuthenticationError();
        consumerWithPendingAuthenticationError.subscribe(Collections.singleton("test"));
        consumerWithPendingAuthenticationError.poll(Duration.ZERO);
    }

    @Test(expected = AuthenticationException.class)
    public void testOffsetsForTimesAuthenticationFailure() {
        consumerWithPendingAuthenticationError().offsetsForTimes(Collections.singletonMap(this.tp0, 0L));
    }

    @Test(expected = AuthenticationException.class)
    public void testCommitSyncAuthenticationFailure() {
        KafkaConsumer<String, String> consumerWithPendingAuthenticationError = consumerWithPendingAuthenticationError();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new OffsetAndMetadata(10L));
        consumerWithPendingAuthenticationError.commitSync(hashMap);
    }

    @Test(expected = AuthenticationException.class)
    public void testCommittedAuthenticationFailure() {
        consumerWithPendingAuthenticationError().committed(Collections.singleton(this.tp0)).get(this.tp0);
    }

    @Test
    public void testRebalanceException() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RoundRobinAssignor roundRobinAssignor = new RoundRobinAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, roundRobinAssignor, true, this.groupInstanceId);
        newConsumer.subscribe(Collections.singleton("test"), getExceptionConsumerRebalanceListener());
        Node node2 = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
        mockClient.prepareResponseFrom(joinGroupFollowerResponse(roundRobinAssignor, 1, "memberId", "leaderId", Errors.NONE), node2);
        mockClient.prepareResponseFrom(syncGroupResponse(Collections.singletonList(this.tp0), Errors.NONE), node2);
        try {
            newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
            Assert.fail("Should throw exception");
        } catch (Throwable th) {
            Assert.assertEquals("Hit partition assign " + this.singleTopicPartition, th.getCause().getMessage());
        }
        Assert.assertEquals(Collections.singleton(this.tp0), subscriptionState.assignedPartitions());
        try {
            newConsumer.close(Duration.ofMillis(0L));
            Assert.fail("Should throw exception");
        } catch (Throwable th2) {
            Assert.assertEquals("Hit partition revoke " + this.singleTopicPartition, th2.getCause().getCause().getMessage());
        }
        newConsumer.close(Duration.ofMillis(0L));
        Assert.assertTrue(subscriptionState.assignedPartitions().isEmpty());
    }

    @Test
    public void testReturnRecordsDuringRebalance() {
        Time mockTime = new MockTime(1L);
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient(mockTime, (Metadata) createMetadata);
        ConsumerPartitionAssignor cooperativeStickyAssignor = new CooperativeStickyAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, cooperativeStickyAssignor, true, this.groupInstanceId);
        initMetadata(mockClient, Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test", 1), Utils.mkEntry("test2", 1), Utils.mkEntry("test3", 1)}));
        newConsumer.subscribe(Arrays.asList("test", "test2"), getConsumerRebalanceListener(newConsumer));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        Node prepareRebalance = prepareRebalance(mockClient, node, cooperativeStickyAssignor, Arrays.asList(this.tp0, this.t2p0), null);
        newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(Utils.mkSet(new String[]{"test", "test2"}), newConsumer.subscription());
        Assert.assertEquals(Collections.emptySet(), newConsumer.assignment());
        newConsumer.poll(Duration.ofMillis(100L));
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.tp0, this.t2p0}), newConsumer.assignment());
        Map<TopicPartition, FetchInfo> hashMap = new HashMap<>();
        hashMap.put(this.tp0, new FetchInfo(0L, 1));
        hashMap.put(this.t2p0, new FetchInfo(0L, 10));
        mockClient.respondFrom(fetchResponse(hashMap), node);
        Assert.assertEquals(11L, newConsumer.poll(Duration.ZERO).count());
        Assert.assertEquals(1L, newConsumer.position(this.tp0));
        Assert.assertEquals(10L, newConsumer.position(this.t2p0));
        hashMap.clear();
        hashMap.put(this.tp0, new FetchInfo(1L, 1));
        hashMap.put(this.t2p0, new FetchInfo(10L, 20));
        mockClient.respondFrom(fetchResponse(hashMap), node);
        newConsumer.subscribe(Arrays.asList("test", "test3"), getConsumerRebalanceListener(newConsumer));
        Assert.assertEquals(Utils.mkSet(new String[]{"test", "test3"}), newConsumer.subscription());
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.tp0, this.t2p0}), newConsumer.assignment());
        Map<TopicPartition, Long> hashMap2 = new HashMap<>();
        hashMap2.put(this.t2p0, 10L);
        AtomicBoolean prepareOffsetCommitResponse = prepareOffsetCommitResponse(mockClient, prepareRebalance, hashMap2);
        ConsumerRecords poll = newConsumer.poll(Duration.ZERO);
        hashMap.clear();
        hashMap.put(this.tp0, new FetchInfo(2L, 1));
        mockClient.respondFrom(fetchResponse(hashMap), node);
        Assert.assertEquals(Utils.mkSet(new String[]{"test", "test3"}), newConsumer.subscription());
        Assert.assertEquals(Collections.singleton(this.tp0), newConsumer.assignment());
        Assert.assertEquals(1L, poll.count());
        Assert.assertEquals(2L, newConsumer.position(this.tp0));
        Assert.assertTrue(prepareOffsetCommitResponse.get());
        mockClient.respondFrom(joinGroupFollowerResponse(cooperativeStickyAssignor, 2, "memberId", "leaderId", Errors.NONE), prepareRebalance);
        mockClient.prepareResponseFrom(syncGroupResponse(Arrays.asList(this.tp0, this.t3p0), Errors.NONE), prepareRebalance);
        ConsumerRecords poll2 = newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(Utils.mkSet(new String[]{"test", "test3"}), newConsumer.subscription());
        Assert.assertEquals(Collections.singleton(this.tp0), newConsumer.assignment());
        Assert.assertEquals(1L, poll2.count());
        Assert.assertEquals(3L, newConsumer.position(this.tp0));
        hashMap.clear();
        hashMap.put(this.tp0, new FetchInfo(3L, 1));
        mockClient.respondFrom(fetchResponse(hashMap), node);
        ConsumerRecords poll3 = newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(Utils.mkSet(new String[]{"test", "test3"}), newConsumer.subscription());
        Assert.assertEquals(Utils.mkSet(new TopicPartition[]{this.tp0, this.t3p0}), newConsumer.assignment());
        Assert.assertEquals(1L, poll3.count());
        Assert.assertEquals(4L, newConsumer.position(this.tp0));
        Assert.assertEquals(0L, newConsumer.position(this.t3p0));
        hashMap.clear();
        hashMap.put(this.tp0, new FetchInfo(4L, 1));
        hashMap.put(this.t3p0, new FetchInfo(0L, 100));
        mockClient.respondFrom(fetchResponse(hashMap), node);
        Assert.assertEquals(101L, newConsumer.poll(Duration.ZERO).count());
        Assert.assertEquals(5L, newConsumer.position(this.tp0));
        Assert.assertEquals(100L, newConsumer.position(this.t3p0));
        mockClient.requests().clear();
        newConsumer.unsubscribe();
        newConsumer.close();
    }

    @Test
    public void testGetGroupMetadata() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RoundRobinAssignor roundRobinAssignor = new RoundRobinAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, roundRobinAssignor, true, this.groupInstanceId);
        ConsumerGroupMetadata groupMetadata = newConsumer.groupMetadata();
        Assert.assertEquals("mock-group", groupMetadata.groupId());
        Assert.assertEquals("", groupMetadata.memberId());
        Assert.assertEquals(-1L, groupMetadata.generationId());
        Assert.assertEquals(this.groupInstanceId, groupMetadata.groupInstanceId());
        newConsumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(newConsumer));
        prepareRebalance(mockClient, node, roundRobinAssignor, Collections.singletonList(this.tp0), null);
        mockClient.prepareResponseFrom(fetchResponse(this.tp0, 0L, 0), node);
        newConsumer.updateAssignmentMetadataIfNeeded(mockTime.timer(Long.MAX_VALUE));
        ConsumerGroupMetadata groupMetadata2 = newConsumer.groupMetadata();
        Assert.assertEquals("mock-group", groupMetadata2.groupId());
        Assert.assertEquals("memberId", groupMetadata2.memberId());
        Assert.assertEquals(1L, groupMetadata2.generationId());
        Assert.assertEquals(this.groupInstanceId, groupMetadata2.groupInstanceId());
    }

    private KafkaConsumer<String, String> consumerWithPendingAuthenticationError() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RangeAssignor rangeAssignor = new RangeAssignor();
        mockClient.createPendingAuthenticationError(node, 0L);
        return newConsumer(mockTime, mockClient, subscriptionState, createMetadata, rangeAssignor, false, this.groupInstanceId);
    }

    private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer<String, String> kafkaConsumer) {
        return new ConsumerRebalanceListener() { // from class: org.apache.kafka.clients.consumer.KafkaConsumerTest.1
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                Iterator<TopicPartition> it = collection.iterator();
                while (it.hasNext()) {
                    kafkaConsumer.seek(it.next(), 0L);
                }
            }
        };
    }

    private ConsumerRebalanceListener getExceptionConsumerRebalanceListener() {
        return new ConsumerRebalanceListener() { // from class: org.apache.kafka.clients.consumer.KafkaConsumerTest.2
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                throw new RuntimeException("Hit partition revoke " + collection);
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                throw new RuntimeException("Hit partition assign " + collection);
            }

            public void onPartitionsLost(Collection<TopicPartition> collection) {
                throw new RuntimeException("Hit partition lost " + collection);
            }
        };
    }

    private ConsumerMetadata createMetadata(SubscriptionState subscriptionState) {
        return new ConsumerMetadata(0L, Long.MAX_VALUE, false, false, subscriptionState, new LogContext(), new ClusterResourceListeners());
    }

    private Node prepareRebalance(MockClient mockClient, Node node, Set<String> set, ConsumerPartitionAssignor consumerPartitionAssignor, List<TopicPartition> list, Node node2) {
        if (node2 == null) {
            mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
            node2 = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        }
        mockClient.prepareResponseFrom(abstractRequest -> {
            Iterator it = ((JoinGroupRequest) abstractRequest).data().protocols().iterator();
            Assert.assertTrue(it.hasNext());
            return set.equals(new HashSet(ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(((JoinGroupRequestData.JoinGroupRequestProtocol) it.next()).metadata())).topics()));
        }, (AbstractResponse) joinGroupFollowerResponse(consumerPartitionAssignor, 1, "memberId", "leaderId", Errors.NONE), node2);
        mockClient.prepareResponseFrom(syncGroupResponse(list, Errors.NONE), node2);
        return node2;
    }

    private Node prepareRebalance(MockClient mockClient, Node node, ConsumerPartitionAssignor consumerPartitionAssignor, List<TopicPartition> list, Node node2) {
        if (node2 == null) {
            mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
            node2 = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        }
        mockClient.prepareResponseFrom(joinGroupFollowerResponse(consumerPartitionAssignor, 1, "memberId", "leaderId", Errors.NONE), node2);
        mockClient.prepareResponseFrom(syncGroupResponse(list, Errors.NONE), node2);
        return node2;
    }

    private AtomicBoolean prepareHeartbeatResponse(MockClient mockClient, Node node, Errors errors) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        mockClient.prepareResponseFrom(abstractRequest -> {
            atomicBoolean.set(true);
            return true;
        }, (AbstractResponse) new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(errors.code())), node);
        return atomicBoolean;
    }

    private AtomicBoolean prepareOffsetCommitResponse(MockClient mockClient, Node node, Map<TopicPartition, Long> map) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        HashMap hashMap = new HashMap();
        Iterator<TopicPartition> it = map.keySet().iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), Errors.NONE);
        }
        mockClient.prepareResponseFrom(abstractRequest -> {
            Map offsets = ((OffsetCommitRequest) abstractRequest).offsets();
            for (Map.Entry entry : map.entrySet()) {
                if (!((Long) offsets.get(entry.getKey())).equals(entry.getValue())) {
                    atomicBoolean.set(false);
                    return false;
                }
            }
            return true;
        }, (AbstractResponse) offsetCommitResponse(hashMap), node);
        return atomicBoolean;
    }

    private AtomicBoolean prepareOffsetCommitResponse(MockClient mockClient, Node node, TopicPartition topicPartition, long j) {
        return prepareOffsetCommitResponse(mockClient, node, Collections.singletonMap(topicPartition, Long.valueOf(j)));
    }

    private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Errors> map) {
        return new OffsetCommitResponse(map);
    }

    private JoinGroupResponse joinGroupFollowerResponse(ConsumerPartitionAssignor consumerPartitionAssignor, int i, String str, String str2, Errors errors) {
        return new JoinGroupResponse(new JoinGroupResponseData().setErrorCode(errors.code()).setGenerationId(i).setProtocolName(consumerPartitionAssignor.name()).setLeader(str2).setMemberId(str).setMembers(Collections.emptyList()));
    }

    private SyncGroupResponse syncGroupResponse(List<TopicPartition> list, Errors errors) {
        return new SyncGroupResponse(new SyncGroupResponseData().setErrorCode(errors.code()).setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(list)))));
    }

    private OffsetFetchResponse offsetResponse(Map<TopicPartition, Long> map, Errors errors) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), new OffsetFetchResponse.PartitionData(entry.getValue().longValue(), Optional.empty(), "", errors));
        }
        return new OffsetFetchResponse(Errors.NONE, hashMap);
    }

    private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> map) {
        return listOffsetsResponse(map, Collections.emptyMap());
    }

    private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> map, Map<TopicPartition, Errors> map2) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), new ListOffsetResponse.PartitionData(Errors.NONE, -1L, entry.getValue().longValue(), Optional.empty()));
        }
        for (Map.Entry<TopicPartition, Errors> entry2 : map2.entrySet()) {
            hashMap.put(entry2.getKey(), new ListOffsetResponse.PartitionData(entry2.getValue(), -1L, -1L, Optional.empty()));
        }
        return new ListOffsetResponse(hashMap);
    }

    private FetchResponse<MemoryRecords> fetchResponse(Map<TopicPartition, FetchInfo> map) {
        MemoryRecords build;
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Map.Entry<TopicPartition, FetchInfo> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            long j = entry.getValue().offset;
            int i = entry.getValue().count;
            if (i == 0) {
                build = MemoryRecords.EMPTY;
            } else {
                MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, j);
                for (int i2 = 0; i2 < i; i2++) {
                    builder.append(0L, ("key-" + i2).getBytes(), ("value-" + i2).getBytes());
                }
                build = builder.build();
            }
            linkedHashMap.put(key, new FetchResponse.PartitionData(Errors.NONE, 0L, -1L, 0L, (List) null, build));
        }
        return new FetchResponse<>(Errors.NONE, linkedHashMap, 0, 0);
    }

    private FetchResponse fetchResponse(TopicPartition topicPartition, long j, int i) {
        return fetchResponse(Collections.singletonMap(topicPartition, new FetchInfo(j, i)));
    }

    private KafkaConsumer<String, String> newConsumer(Time time, KafkaClient kafkaClient, SubscriptionState subscriptionState, ConsumerMetadata consumerMetadata, ConsumerPartitionAssignor consumerPartitionAssignor, boolean z, Optional<String> optional) {
        return newConsumer(time, kafkaClient, subscriptionState, consumerMetadata, consumerPartitionAssignor, z, "mock-group", optional, false);
    }

    private KafkaConsumer<String, String> newConsumerNoAutoCommit(Time time, KafkaClient kafkaClient, SubscriptionState subscriptionState, ConsumerMetadata consumerMetadata) {
        return newConsumer(time, kafkaClient, subscriptionState, consumerMetadata, new RangeAssignor(), false, "mock-group", this.groupInstanceId, false);
    }

    private KafkaConsumer<String, String> newConsumer(Time time, KafkaClient kafkaClient, SubscriptionState subscriptionState, ConsumerMetadata consumerMetadata, ConsumerPartitionAssignor consumerPartitionAssignor, boolean z, String str, Optional<String> optional, boolean z2) {
        StringDeserializer stringDeserializer = new StringDeserializer();
        StringDeserializer stringDeserializer2 = new StringDeserializer();
        List singletonList = Collections.singletonList(consumerPartitionAssignor);
        ConsumerInterceptors consumerInterceptors = new ConsumerInterceptors(Collections.emptyList());
        Metrics metrics = new Metrics(time);
        ConsumerMetrics consumerMetrics = new ConsumerMetrics("consumer");
        LogContext logContext = new LogContext();
        ConsumerNetworkClient consumerNetworkClient = new ConsumerNetworkClient(logContext, kafkaClient, consumerMetadata, time, 100L, 30000, 1000);
        return new KafkaConsumer<>(logContext, "mock-consumer", new ConsumerCoordinator(new GroupRebalanceConfig(10000, 60000, 1000, str, optional, 100L, true), logContext, consumerNetworkClient, singletonList, consumerMetadata, subscriptionState, metrics, "consumer", time, z, 500, consumerInterceptors, z2), stringDeserializer, stringDeserializer2, new Fetcher(logContext, consumerNetworkClient, 1, Integer.MAX_VALUE, 500, 1048576, Integer.MAX_VALUE, true, "", stringDeserializer, stringDeserializer2, consumerMetadata, subscriptionState, metrics, consumerMetrics.fetcherMetrics, time, 100L, 30000, IsolationLevel.READ_UNCOMMITTED, new ApiVersions()), consumerInterceptors, time, consumerNetworkClient, metrics, subscriptionState, consumerMetadata, 100L, 30000, 30000, singletonList, str);
    }

    @Test
    public void testCloseWithTimeUnit() {
        KafkaConsumer kafkaConsumer = (KafkaConsumer) Mockito.mock(KafkaConsumer.class);
        ((KafkaConsumer) Mockito.doCallRealMethod().when(kafkaConsumer)).close(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any());
        kafkaConsumer.close(1L, TimeUnit.SECONDS);
        ((KafkaConsumer) Mockito.verify(kafkaConsumer)).close(Duration.ofSeconds(1L));
    }

    @Test(expected = InvalidTopicException.class)
    public void testSubscriptionOnInvalidTopic() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Cluster fetch = createMetadata.fetch();
        RoundRobinAssignor roundRobinAssignor = new RoundRobinAssignor();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, "topic abc", false, Collections.emptyList()));
        mockClient.prepareMetadataUpdate(MetadataResponse.prepareResponse(fetch.nodes(), fetch.clusterResource().clusterId(), fetch.controller().id(), arrayList));
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, roundRobinAssignor, true, this.groupInstanceId);
        newConsumer.subscribe(Collections.singleton("topic abc"), getConsumerRebalanceListener(newConsumer));
        newConsumer.poll(Duration.ZERO);
    }

    @Test
    public void testPollTimeMetrics() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, new RoundRobinAssignor(), true, this.groupInstanceId);
        newConsumer.subscribe(Collections.singletonList("test"));
        Metrics metrics = newConsumer.metrics;
        MetricName metricName = metrics.metricName("last-poll-seconds-ago", "consumer-metrics");
        MetricName metricName2 = metrics.metricName("time-between-poll-avg", "consumer-metrics");
        MetricName metricName3 = metrics.metricName("time-between-poll-max", "consumer-metrics");
        Assert.assertEquals(Double.valueOf(-1.0d), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
        Assert.assertEquals(Double.valueOf(Double.NaN), ((Metric) newConsumer.metrics().get(metricName2)).metricValue());
        Assert.assertEquals(Double.valueOf(Double.NaN), ((Metric) newConsumer.metrics().get(metricName3)).metricValue());
        newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(Double.valueOf(0.0d), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
        Assert.assertEquals(Double.valueOf(0.0d), ((Metric) newConsumer.metrics().get(metricName2)).metricValue());
        Assert.assertEquals(Double.valueOf(0.0d), ((Metric) newConsumer.metrics().get(metricName3)).metricValue());
        mockTime.sleep(5000L);
        Assert.assertEquals(Double.valueOf(5.0d), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
        newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(Double.valueOf(2500.0d), ((Metric) newConsumer.metrics().get(metricName2)).metricValue());
        Assert.assertEquals(Double.valueOf(5000.0d), ((Metric) newConsumer.metrics().get(metricName3)).metricValue());
        mockTime.sleep(10000L);
        Assert.assertEquals(Double.valueOf(10.0d), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
        newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(Double.valueOf(5000.0d), ((Metric) newConsumer.metrics().get(metricName2)).metricValue());
        Assert.assertEquals(Double.valueOf(10000.0d), ((Metric) newConsumer.metrics().get(metricName3)).metricValue());
        mockTime.sleep(5000L);
        Assert.assertEquals(Double.valueOf(5.0d), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
        newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(Double.valueOf(5000.0d), ((Metric) newConsumer.metrics().get(metricName2)).metricValue());
        Assert.assertEquals(Double.valueOf(10000.0d), ((Metric) newConsumer.metrics().get(metricName3)).metricValue());
    }

    @Test
    public void testPollIdleRatio() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, new RoundRobinAssignor(), true, this.groupInstanceId);
        MetricName metricName = newConsumer.metrics.metricName("poll-idle-ratio-avg", "consumer-metrics");
        Assert.assertEquals(Double.valueOf(Double.NaN), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
        newConsumer.kafkaConsumerMetrics.recordPollStart(mockTime.milliseconds());
        mockTime.sleep(50L);
        newConsumer.kafkaConsumerMetrics.recordPollEnd(mockTime.milliseconds());
        Assert.assertEquals(Double.valueOf(1.0d), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
        mockTime.sleep(50L);
        newConsumer.kafkaConsumerMetrics.recordPollStart(mockTime.milliseconds());
        newConsumer.kafkaConsumerMetrics.recordPollEnd(mockTime.milliseconds());
        Assert.assertEquals(Double.valueOf(0.5d), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
        mockTime.sleep(25L);
        newConsumer.kafkaConsumerMetrics.recordPollStart(mockTime.milliseconds());
        mockTime.sleep(25L);
        newConsumer.kafkaConsumerMetrics.recordPollEnd(mockTime.milliseconds());
        Assert.assertEquals(Double.valueOf(0.5d), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
    }

    private static boolean consumerMetricPresent(KafkaConsumer kafkaConsumer, String str) {
        return kafkaConsumer.metrics.metrics().containsKey(new MetricName(str, "consumer-metrics", "", Collections.emptyMap()));
    }

    @Test
    public void testClosingConsumerUnregistersConsumerMetrics() {
        MockTime mockTime = new MockTime();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, new RoundRobinAssignor(), true, this.groupInstanceId);
        newConsumer.subscribe(Collections.singletonList("test"));
        Assert.assertTrue(consumerMetricPresent(newConsumer, "last-poll-seconds-ago"));
        Assert.assertTrue(consumerMetricPresent(newConsumer, "time-between-poll-avg"));
        Assert.assertTrue(consumerMetricPresent(newConsumer, "time-between-poll-max"));
        newConsumer.close();
        Assert.assertFalse(consumerMetricPresent(newConsumer, "last-poll-seconds-ago"));
        Assert.assertFalse(consumerMetricPresent(newConsumer, "time-between-poll-avg"));
        Assert.assertFalse(consumerMetricPresent(newConsumer, "time-between-poll-max"));
    }

    @Test(expected = IllegalStateException.class)
    public void testEnforceRebalanceWithManualAssignment() {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer((String) null);
        Throwable th = null;
        try {
            newConsumer.assign(Collections.singleton(new TopicPartition("topic", 0)));
            newConsumer.enforceRebalance();
            if (newConsumer != null) {
                if (0 == 0) {
                    newConsumer.close();
                    return;
                }
                try {
                    newConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (newConsumer != null) {
                if (0 != 0) {
                    try {
                        newConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testEnforceRebalanceTriggersRebalanceOnNextPoll() {
        Time mockTime = new MockTime(1L);
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient(mockTime, (Metadata) createMetadata);
        ConsumerPartitionAssignor roundRobinAssignor = new RoundRobinAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(mockTime, mockClient, subscriptionState, createMetadata, roundRobinAssignor, true, this.groupInstanceId);
        MockRebalanceListener mockRebalanceListener = new MockRebalanceListener();
        initMetadata(mockClient, Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test", 1), Utils.mkEntry("test2", 1), Utils.mkEntry("test3", 1)}));
        newConsumer.subscribe(Arrays.asList("test", "test2"), mockRebalanceListener);
        prepareRebalance(mockClient, (Node) createMetadata.fetch().nodes().get(0), roundRobinAssignor, Arrays.asList(this.tp0, this.t2p0), null);
        newConsumer.poll(Duration.ZERO);
        newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(mockRebalanceListener.revokedCount, 0L);
        Assert.assertEquals(mockRebalanceListener.assignedCount, 1L);
        newConsumer.enforceRebalance();
        newConsumer.poll(Duration.ZERO);
        Assert.assertEquals(mockRebalanceListener.revokedCount, 1L);
    }
}
