/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.RequestFutureListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnstableOffsetCommitException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

public final class ConsumerCoordinator
extends AbstractCoordinator {
    private final GroupRebalanceConfig rebalanceConfig;
    private final Logger log;
    private final List<ConsumerPartitionAssignor> assignors;
    private final ConsumerMetadata metadata;
    private final ConsumerCoordinatorMetrics sensors;
    private final SubscriptionState subscriptions;
    private final OffsetCommitCallback defaultOffsetCommitCallback;
    private final boolean autoCommitEnabled;
    private final int autoCommitIntervalMs;
    private final ConsumerInterceptors<?, ?> interceptors;
    private final AtomicInteger pendingAsyncCommits;
    private final ConcurrentLinkedQueue<OffsetCommitCompletion> completedOffsetCommits;
    private boolean isLeader = false;
    private Set<String> joinedSubscription;
    private MetadataSnapshot metadataSnapshot;
    private MetadataSnapshot assignmentSnapshot;
    private Timer nextAutoCommitTimer;
    private AtomicBoolean asyncCommitFenced;
    private ConsumerGroupMetadata groupMetadata;
    private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
    private final ConsumerPartitionAssignor.RebalanceProtocol protocol;

    public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, LogContext logContext, ConsumerNetworkClient client, List<ConsumerPartitionAssignor> assignors, ConsumerMetadata metadata, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, Time time, boolean autoCommitEnabled, int autoCommitIntervalMs, ConsumerInterceptors<?, ?> interceptors) {
        super(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, time);
        this.rebalanceConfig = rebalanceConfig;
        this.log = logContext.logger(ConsumerCoordinator.class);
        this.metadata = metadata;
        this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion());
        this.subscriptions = subscriptions;
        this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback();
        this.autoCommitEnabled = autoCommitEnabled;
        this.autoCommitIntervalMs = autoCommitIntervalMs;
        this.assignors = assignors;
        this.completedOffsetCommits = new ConcurrentLinkedQueue();
        this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
        this.interceptors = interceptors;
        this.pendingAsyncCommits = new AtomicInteger();
        this.asyncCommitFenced = new AtomicBoolean(false);
        this.groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, -1, "", rebalanceConfig.groupInstanceId);
        if (autoCommitEnabled) {
            this.nextAutoCommitTimer = time.timer(autoCommitIntervalMs);
        }
        if (!assignors.isEmpty()) {
            ArrayList<ConsumerPartitionAssignor.RebalanceProtocol> supportedProtocols = new ArrayList<ConsumerPartitionAssignor.RebalanceProtocol>(assignors.get(0).supportedProtocols());
            for (ConsumerPartitionAssignor assignor : assignors) {
                supportedProtocols.retainAll(assignor.supportedProtocols());
            }
            if (supportedProtocols.isEmpty()) {
                throw new IllegalArgumentException("Specified assignors " + assignors.stream().map(ConsumerPartitionAssignor::name).collect(Collectors.toSet()) + " do not have commonly supported rebalance protocol");
            }
            Collections.sort(supportedProtocols);
            this.protocol = (ConsumerPartitionAssignor.RebalanceProtocol)((Object)supportedProtocols.get(supportedProtocols.size() - 1));
        } else {
            this.protocol = null;
        }
        this.metadata.requestUpdate();
    }

    @Override
    public String protocolType() {
        return "consumer";
    }

    @Override
    protected JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
        this.log.debug("Joining group with current subscription: {}", this.subscriptions.subscription());
        this.joinedSubscription = this.subscriptions.subscription();
        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
        ArrayList<String> topics = new ArrayList<String>(this.joinedSubscription);
        for (ConsumerPartitionAssignor assignor : this.assignors) {
            ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(topics, assignor.subscriptionUserData(this.joinedSubscription), this.subscriptions.assignedPartitionsList());
            ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
            protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol().setName(assignor.name()).setMetadata(Utils.toArray(metadata)));
        }
        return protocolSet;
    }

    public void updatePatternSubscription(Cluster cluster) {
        Set<String> topicsToSubscribe = cluster.topics().stream().filter(this.subscriptions::matchesSubscribedPattern).collect(Collectors.toSet());
        if (this.subscriptions.subscribeFromPattern(topicsToSubscribe)) {
            this.metadata.requestUpdateForNewTopics();
        }
    }

    private ConsumerPartitionAssignor lookupAssignor(String name) {
        for (ConsumerPartitionAssignor assignor : this.assignors) {
            if (!assignor.name().equals(name)) continue;
            return assignor;
        }
        return null;
    }

    private void maybeUpdateJoinedSubscription(Set<TopicPartition> assignedPartitions) {
        if (this.subscriptions.hasPatternSubscription()) {
            HashSet<String> addedTopics = new HashSet<String>();
            for (TopicPartition tp : assignedPartitions) {
                if (this.joinedSubscription.contains(tp.topic())) continue;
                addedTopics.add(tp.topic());
            }
            if (!addedTopics.isEmpty()) {
                HashSet<String> newSubscription = new HashSet<String>(this.subscriptions.subscription());
                HashSet<String> newJoinedSubscription = new HashSet<String>(this.joinedSubscription);
                newSubscription.addAll(addedTopics);
                newJoinedSubscription.addAll(addedTopics);
                if (this.subscriptions.subscribeFromPattern(newSubscription)) {
                    this.metadata.requestUpdateForNewTopics();
                }
                this.joinedSubscription = newJoinedSubscription;
            }
        }
    }

    private Exception invokePartitionsAssigned(Set<TopicPartition> assignedPartitions) {
        this.log.info("Adding newly assigned partitions: {}", (Object)Utils.join(assignedPartitions, ", "));
        ConsumerRebalanceListener listener = this.subscriptions.rebalanceListener();
        try {
            long startMs = this.time.milliseconds();
            listener.onPartitionsAssigned(assignedPartitions);
            this.sensors.assignCallbackSensor.record(this.time.milliseconds() - startMs);
        }
        catch (InterruptException | WakeupException e) {
            throw e;
        }
        catch (Exception e) {
            this.log.error("User provided listener {} failed on invocation of onPartitionsAssigned for partitions {}", new Object[]{listener.getClass().getName(), assignedPartitions, e});
            return e;
        }
        return null;
    }

    private Exception invokePartitionsRevoked(Set<TopicPartition> revokedPartitions) {
        this.log.info("Revoke previously assigned partitions {}", (Object)Utils.join(revokedPartitions, ", "));
        ConsumerRebalanceListener listener = this.subscriptions.rebalanceListener();
        try {
            long startMs = this.time.milliseconds();
            listener.onPartitionsRevoked(revokedPartitions);
            this.sensors.revokeCallbackSensor.record(this.time.milliseconds() - startMs);
        }
        catch (InterruptException | WakeupException e) {
            throw e;
        }
        catch (Exception e) {
            this.log.error("User provided listener {} failed on invocation of onPartitionsRevoked for partitions {}", new Object[]{listener.getClass().getName(), revokedPartitions, e});
            return e;
        }
        return null;
    }

    private Exception invokePartitionsLost(Set<TopicPartition> lostPartitions) {
        this.log.info("Lost previously assigned partitions {}", (Object)Utils.join(lostPartitions, ", "));
        ConsumerRebalanceListener listener = this.subscriptions.rebalanceListener();
        try {
            long startMs = this.time.milliseconds();
            listener.onPartitionsLost(lostPartitions);
            this.sensors.loseCallbackSensor.record(this.time.milliseconds() - startMs);
        }
        catch (InterruptException | WakeupException e) {
            throw e;
        }
        catch (Exception e) {
            this.log.error("User provided listener {} failed on invocation of onPartitionsLost for partitions {}", new Object[]{listener.getClass().getName(), lostPartitions, e});
            return e;
        }
        return null;
    }

    @Override
    protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) {
        ConsumerPartitionAssignor assignor;
        this.log.debug("Executing onJoinComplete with generation {} and memberId {}", (Object)generation, (Object)memberId);
        if (!this.isLeader) {
            this.assignmentSnapshot = null;
        }
        if ((assignor = this.lookupAssignor(assignmentStrategy)) == null) {
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
        }
        this.groupMetadata = new ConsumerGroupMetadata(this.rebalanceConfig.groupId, generation, memberId, this.rebalanceConfig.groupInstanceId);
        HashSet<TopicPartition> ownedPartitions = new HashSet<TopicPartition>(this.subscriptions.assignedPartitions());
        if (assignmentBuffer.remaining() < 2) {
            throw new IllegalStateException("There is insufficient bytes available to read assignment from the sync-group response (actual byte size " + assignmentBuffer.remaining() + ") , this is not expected; it is possible that the leader's assign function is buggy and did not return any assignment for this member, or because static member is configured and the protocol is buggy hence did not get the assignment for this member");
        }
        ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
        HashSet<TopicPartition> assignedPartitions = new HashSet<TopicPartition>(assignment.partitions());
        if (!this.subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {
            this.log.warn("We received an assignment {} that doesn't match our current subscription {}; it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription", assignment.partitions(), (Object)this.subscriptions.prettyString());
            this.requestRejoin();
            return;
        }
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        HashSet<TopicPartition> addedPartitions = new HashSet<TopicPartition>(assignedPartitions);
        addedPartitions.removeAll(ownedPartitions);
        if (this.protocol == ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE) {
            HashSet<TopicPartition> revokedPartitions = new HashSet<TopicPartition>(ownedPartitions);
            revokedPartitions.removeAll(assignedPartitions);
            this.log.info("Updating assignment with\nnow assigned partitions: {}\ncompare with previously owned partitions: {}\nnewly added partitions: {}\nrevoked partitions: {}\n", new Object[]{Utils.join(assignedPartitions, ", "), Utils.join(ownedPartitions, ", "), Utils.join(addedPartitions, ", "), Utils.join(revokedPartitions, ", ")});
            if (!revokedPartitions.isEmpty()) {
                firstException.compareAndSet(null, this.invokePartitionsRevoked(revokedPartitions));
                this.log.debug("Need to revoke partitions {} and re-join the group", revokedPartitions);
                this.requestRejoin();
            }
        }
        this.maybeUpdateJoinedSubscription(assignedPartitions);
        try {
            assignor.onAssignment(assignment, this.groupMetadata);
        }
        catch (Exception e) {
            firstException.compareAndSet(null, e);
        }
        if (this.autoCommitEnabled) {
            this.nextAutoCommitTimer.updateAndReset(this.autoCommitIntervalMs);
        }
        this.subscriptions.assignFromSubscribed(assignedPartitions);
        firstException.compareAndSet(null, this.invokePartitionsAssigned(addedPartitions));
        if (firstException.get() != null) {
            throw new KafkaException("User rebalance callback throws an error", firstException.get());
        }
    }

    void maybeUpdateSubscriptionMetadata() {
        int version = this.metadata.updateVersion();
        if (version > this.metadataSnapshot.version) {
            Cluster cluster = this.metadata.fetch();
            if (this.subscriptions.hasPatternSubscription()) {
                this.updatePatternSubscription(cluster);
            }
            this.metadataSnapshot = new MetadataSnapshot(this.subscriptions, cluster, version);
        }
    }

    public boolean poll(Timer timer) {
        this.maybeUpdateSubscriptionMetadata();
        this.invokeCompletedOffsetCommitCallbacks();
        if (this.subscriptions.hasAutoAssignedPartitions()) {
            if (this.protocol == null) {
                throw new IllegalStateException("User configured partition.assignment.strategy to empty while trying to subscribe for group protocol to auto assign partitions");
            }
            this.pollHeartbeat(timer.currentTimeMs());
            if (this.coordinatorUnknown() && !this.ensureCoordinatorReady(timer)) {
                return false;
            }
            if (this.rejoinNeededOrPending()) {
                if (this.subscriptions.hasPatternSubscription()) {
                    if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0L) {
                        this.metadata.requestUpdate();
                    }
                    if (!this.client.ensureFreshMetadata(timer)) {
                        return false;
                    }
                    this.maybeUpdateSubscriptionMetadata();
                }
                if (!this.ensureActiveGroup(timer)) {
                    return false;
                }
            }
        } else if (this.metadata.updateRequested() && !this.client.hasReadyNodes(timer.currentTimeMs())) {
            this.client.awaitMetadataUpdate(timer);
        }
        this.maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
        return true;
    }

    public long timeToNextPoll(long now) {
        if (!this.autoCommitEnabled) {
            return this.timeToNextHeartbeat(now);
        }
        return Math.min(this.nextAutoCommitTimer.remainingMs(), this.timeToNextHeartbeat(now));
    }

    private void updateGroupSubscription(Set<String> topics) {
        if (this.subscriptions.groupSubscribe(topics)) {
            this.metadata.requestUpdateForNewTopics();
        }
        if (!this.client.ensureFreshMetadata(this.time.timer(Long.MAX_VALUE))) {
            throw new TimeoutException();
        }
        this.maybeUpdateSubscriptionMetadata();
    }

    @Override
    protected Map<String, ByteBuffer> performAssignment(String leaderId, String assignmentStrategy, List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions) {
        ConsumerPartitionAssignor assignor = this.lookupAssignor(assignmentStrategy);
        if (assignor == null) {
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
        }
        HashSet<String> allSubscribedTopics = new HashSet<String>();
        HashMap<String, ConsumerPartitionAssignor.Subscription> subscriptions = new HashMap<String, ConsumerPartitionAssignor.Subscription>();
        HashMap<String, List<TopicPartition>> ownedPartitions = new HashMap<String, List<TopicPartition>>();
        for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {
            ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()));
            subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId()));
            subscriptions.put(memberSubscription.memberId(), subscription);
            allSubscribedTopics.addAll(subscription.topics());
            ownedPartitions.put(memberSubscription.memberId(), subscription.ownedPartitions());
        }
        this.updateGroupSubscription(allSubscribedTopics);
        this.isLeader = true;
        this.log.debug("Performing assignment using strategy {} with subscriptions {}", (Object)assignor.name(), subscriptions);
        Map<String, ConsumerPartitionAssignor.Assignment> assignments = assignor.assign(this.metadata.fetch(), new ConsumerPartitionAssignor.GroupSubscription(subscriptions)).groupAssignment();
        if (this.protocol == ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE) {
            this.validateCooperativeAssignment(ownedPartitions, assignments);
        }
        HashSet<String> assignedTopics = new HashSet<String>();
        for (ConsumerPartitionAssignor.Assignment assigned : assignments.values()) {
            for (TopicPartition tp : assigned.partitions()) {
                assignedTopics.add(tp.topic());
            }
        }
        if (!assignedTopics.containsAll(allSubscribedTopics)) {
            HashSet<String> notAssignedTopics = new HashSet<String>(allSubscribedTopics);
            notAssignedTopics.removeAll(assignedTopics);
            this.log.warn("The following subscribed topics are not assigned to any members: {} ", notAssignedTopics);
        }
        if (!allSubscribedTopics.containsAll(assignedTopics)) {
            HashSet newlyAddedTopics = new HashSet(assignedTopics);
            newlyAddedTopics.removeAll(allSubscribedTopics);
            this.log.info("The following not-subscribed topics are assigned, and their metadata will be fetched from the brokers: {}", newlyAddedTopics);
            allSubscribedTopics.addAll(assignedTopics);
            this.updateGroupSubscription(allSubscribedTopics);
        }
        this.assignmentSnapshot = this.metadataSnapshot;
        this.log.info("Finished assignment for group at generation {}: {}", (Object)this.generation().generationId, assignments);
        HashMap<String, ByteBuffer> groupAssignment = new HashMap<String, ByteBuffer>();
        for (Map.Entry<String, ConsumerPartitionAssignor.Assignment> assignmentEntry : assignments.entrySet()) {
            ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
            groupAssignment.put(assignmentEntry.getKey(), buffer);
        }
        return groupAssignment;
    }

    private void validateCooperativeAssignment(Map<String, List<TopicPartition>> ownedPartitions, Map<String, ConsumerPartitionAssignor.Assignment> assignments) {
        HashSet totalRevokedPartitions = new HashSet();
        HashSet<TopicPartition> totalAddedPartitions = new HashSet<TopicPartition>();
        for (Map.Entry<String, ConsumerPartitionAssignor.Assignment> entry : assignments.entrySet()) {
            ConsumerPartitionAssignor.Assignment assignment = entry.getValue();
            HashSet<TopicPartition> addedPartitions = new HashSet<TopicPartition>(assignment.partitions());
            addedPartitions.removeAll((Collection)ownedPartitions.get(entry.getKey()));
            HashSet revokedPartitions = new HashSet(ownedPartitions.get(entry.getKey()));
            revokedPartitions.removeAll(assignment.partitions());
            totalAddedPartitions.addAll(addedPartitions);
            totalRevokedPartitions.addAll(revokedPartitions);
        }
        totalAddedPartitions.retainAll(totalRevokedPartitions);
        if (!totalAddedPartitions.isEmpty()) {
            this.log.error("With the COOPERATIVE protocol, owned partitions cannot be reassigned to other members; however the assignor has reassigned partitions {} which are still owned by some members", totalAddedPartitions);
            throw new IllegalStateException("Assignor supporting the COOPERATIVE protocol violates its requirements");
        }
    }

    @Override
    protected void onJoinPrepare(int generation, String memberId) {
        this.log.debug("Executing onJoinPrepare with generation {} and memberId {}", (Object)generation, (Object)memberId);
        this.maybeAutoCommitOffsetsSync(this.time.timer(this.rebalanceConfig.rebalanceTimeoutMs));
        Exception exception = null;
        if (generation == AbstractCoordinator.Generation.NO_GENERATION.generationId && memberId.equals(AbstractCoordinator.Generation.NO_GENERATION.memberId)) {
            HashSet<TopicPartition> revokedPartitions = new HashSet<TopicPartition>(this.subscriptions.assignedPartitions());
            if (!revokedPartitions.isEmpty()) {
                this.log.info("Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group");
                exception = this.invokePartitionsLost(revokedPartitions);
                this.subscriptions.assignFromSubscribed(Collections.emptySet());
            }
        } else {
            switch (this.protocol) {
                case EAGER: {
                    HashSet<TopicPartition> revokedPartitions = new HashSet<TopicPartition>(this.subscriptions.assignedPartitions());
                    exception = this.invokePartitionsRevoked(revokedPartitions);
                    this.subscriptions.assignFromSubscribed(Collections.emptySet());
                    break;
                }
                case COOPERATIVE: {
                    HashSet<TopicPartition> ownedPartitions = new HashSet<TopicPartition>(this.subscriptions.assignedPartitions());
                    Set<TopicPartition> revokedPartitions = ownedPartitions.stream().filter(tp -> !this.subscriptions.subscription().contains(tp.topic())).collect(Collectors.toSet());
                    if (revokedPartitions.isEmpty()) break;
                    exception = this.invokePartitionsRevoked(revokedPartitions);
                    ownedPartitions.removeAll(revokedPartitions);
                    this.subscriptions.assignFromSubscribed(ownedPartitions);
                }
            }
        }
        this.isLeader = false;
        this.subscriptions.resetGroupSubscription();
        if (exception != null) {
            throw new KafkaException("User rebalance callback throws an error", exception);
        }
    }

    @Override
    public void onLeavePrepare() {
        AbstractCoordinator.Generation currentGeneration = this.generation();
        String memberId = currentGeneration.memberId;
        this.log.debug("Executing onLeavePrepare with generation {} and memberId {}", (Object)currentGeneration, (Object)memberId);
        HashSet<TopicPartition> droppedPartitions = new HashSet<TopicPartition>(this.subscriptions.assignedPartitions());
        if (this.subscriptions.hasAutoAssignedPartitions() && !droppedPartitions.isEmpty()) {
            Exception e = this.generation() != AbstractCoordinator.Generation.NO_GENERATION ? this.invokePartitionsRevoked(droppedPartitions) : this.invokePartitionsLost(droppedPartitions);
            this.subscriptions.assignFromSubscribed(Collections.emptySet());
            if (e != null) {
                throw new KafkaException("User rebalance callback throws an error", e);
            }
        }
    }

    @Override
    public boolean rejoinNeededOrPending() {
        if (!this.subscriptions.hasAutoAssignedPartitions()) {
            return false;
        }
        if (this.assignmentSnapshot != null && !this.assignmentSnapshot.matches(this.metadataSnapshot)) {
            this.requestRejoin();
            return true;
        }
        if (this.joinedSubscription != null && !this.joinedSubscription.equals(this.subscriptions.subscription())) {
            this.requestRejoin();
            return true;
        }
        return super.rejoinNeededOrPending();
    }

    public boolean refreshCommittedOffsetsIfNeeded(Timer timer) {
        Set<TopicPartition> initializingPartitions = this.subscriptions.initializingPartitions();
        Map<TopicPartition, OffsetAndMetadata> offsets = this.fetchCommittedOffsets(initializingPartitions, timer);
        if (offsets == null) {
            return false;
        }
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            TopicPartition tp = entry.getKey();
            OffsetAndMetadata offsetAndMetadata = entry.getValue();
            if (offsetAndMetadata == null) continue;
            entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer((TopicPartition)entry.getKey(), (int)epoch));
            if (this.subscriptions.isAssigned(tp)) {
                Metadata.LeaderAndEpoch leaderAndEpoch = this.metadata.currentLeader(tp);
                SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(), leaderAndEpoch);
                this.subscriptions.seekUnvalidated(tp, position);
                this.log.info("Setting offset for partition {} to the committed offset {}", (Object)tp, (Object)position);
                continue;
            }
            this.log.info("Ignoring the returned {} since its partition {} is no longer assigned", (Object)offsetAndMetadata, (Object)tp);
        }
        return true;
    }

    public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions, Timer timer) {
        if (partitions.isEmpty()) {
            return Collections.emptyMap();
        }
        AbstractCoordinator.Generation generationForOffsetRequest = this.generationIfStable();
        if (this.pendingCommittedOffsetRequest != null && !this.pendingCommittedOffsetRequest.sameRequest(partitions, generationForOffsetRequest)) {
            this.pendingCommittedOffsetRequest = null;
        }
        do {
            RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future;
            if (!this.ensureCoordinatorReady(timer)) {
                return null;
            }
            if (this.pendingCommittedOffsetRequest != null) {
                future = this.pendingCommittedOffsetRequest.response;
            } else {
                future = this.sendOffsetFetchRequest(partitions);
                this.pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(partitions, generationForOffsetRequest, future);
            }
            this.client.poll(future, timer);
            if (future.isDone()) {
                this.pendingCommittedOffsetRequest = null;
                if (future.succeeded()) {
                    return future.value();
                }
                if (!future.isRetriable()) {
                    throw future.exception();
                }
            } else {
                return null;
            }
            timer.sleep(this.rebalanceConfig.retryBackoffMs);
        } while (timer.notExpired());
        return null;
    }

    public ConsumerGroupMetadata groupMetadata() {
        return this.groupMetadata;
    }

    @Override
    public void close(Timer timer) {
        this.client.disableWakeups();
        try {
            this.maybeAutoCommitOffsetsSync(timer);
            while (this.pendingAsyncCommits.get() > 0 && timer.notExpired()) {
                this.ensureCoordinatorReady(timer);
                this.client.poll(timer);
                this.invokeCompletedOffsetCommitCallbacks();
            }
        }
        finally {
            super.close(timer);
        }
    }

    void invokeCompletedOffsetCommitCallbacks() {
        OffsetCommitCompletion completion;
        if (this.asyncCommitFenced.get()) {
            throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + this.rebalanceConfig.groupInstanceId.orElse("unset_instance_id") + ", current member.id is " + this.memberId());
        }
        while ((completion = this.completedOffsetCommits.poll()) != null) {
            completion.invoke();
        }
    }

    public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
        this.invokeCompletedOffsetCommitCallbacks();
        if (!this.coordinatorUnknown()) {
            this.doCommitOffsetsAsync(offsets, callback);
        } else {
            this.pendingAsyncCommits.incrementAndGet();
            this.lookupCoordinator().addListener(new RequestFutureListener<Void>(){

                @Override
                public void onSuccess(Void value) {
                    ConsumerCoordinator.this.pendingAsyncCommits.decrementAndGet();
                    ConsumerCoordinator.this.doCommitOffsetsAsync(offsets, callback);
                    ConsumerCoordinator.this.client.pollNoWakeup();
                }

                @Override
                public void onFailure(RuntimeException e) {
                    ConsumerCoordinator.this.pendingAsyncCommits.decrementAndGet();
                    ConsumerCoordinator.this.completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e)));
                }
            });
        }
        this.client.pollNoWakeup();
    }

    private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        RequestFuture<Void> future = this.sendOffsetCommitRequest(offsets);
        final OffsetCommitCallback cb = callback == null ? this.defaultOffsetCommitCallback : callback;
        future.addListener(new RequestFutureListener<Void>(){

            @Override
            public void onSuccess(Void value) {
                if (ConsumerCoordinator.this.interceptors != null) {
                    ConsumerCoordinator.this.interceptors.onCommit(offsets);
                }
                ConsumerCoordinator.this.completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
            }

            @Override
            public void onFailure(RuntimeException e) {
                RuntimeException commitException = e;
                if (e instanceof RetriableException) {
                    commitException = new RetriableCommitFailedException(e);
                }
                ConsumerCoordinator.this.completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
                if (commitException instanceof FencedInstanceIdException) {
                    ConsumerCoordinator.this.asyncCommitFenced.set(true);
                }
            }
        });
    }

    public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer) {
        this.invokeCompletedOffsetCommitCallbacks();
        if (offsets.isEmpty()) {
            return true;
        }
        do {
            if (this.coordinatorUnknown() && !this.ensureCoordinatorReady(timer)) {
                return false;
            }
            RequestFuture<Void> future = this.sendOffsetCommitRequest(offsets);
            this.client.poll(future, timer);
            this.invokeCompletedOffsetCommitCallbacks();
            if (future.succeeded()) {
                if (this.interceptors != null) {
                    this.interceptors.onCommit(offsets);
                }
                return true;
            }
            if (future.failed() && !future.isRetriable()) {
                throw future.exception();
            }
            timer.sleep(this.rebalanceConfig.retryBackoffMs);
        } while (timer.notExpired());
        return false;
    }

    public void maybeAutoCommitOffsetsAsync(long now) {
        if (this.autoCommitEnabled) {
            this.nextAutoCommitTimer.update(now);
            if (this.nextAutoCommitTimer.isExpired()) {
                this.nextAutoCommitTimer.reset(this.autoCommitIntervalMs);
                this.doAutoCommitOffsetsAsync();
            }
        }
    }

    private void doAutoCommitOffsetsAsync() {
        Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = this.subscriptions.allConsumed();
        this.log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);
        this.commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
            if (exception != null) {
                if (exception instanceof RetriableCommitFailedException) {
                    this.log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", (Object)offsets, (Object)exception);
                    this.nextAutoCommitTimer.updateAndReset(this.rebalanceConfig.retryBackoffMs);
                } else {
                    this.log.warn("Asynchronous auto-commit of offsets {} failed: {}", (Object)offsets, (Object)exception.getMessage());
                }
            } else {
                this.log.debug("Completed asynchronous auto-commit of offsets {}", (Object)offsets);
            }
        });
    }

    private void maybeAutoCommitOffsetsSync(Timer timer) {
        if (this.autoCommitEnabled) {
            Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = this.subscriptions.allConsumed();
            try {
                this.log.debug("Sending synchronous auto-commit of offsets {}", allConsumedOffsets);
                if (!this.commitOffsetsSync(allConsumedOffsets, timer)) {
                    this.log.debug("Auto-commit of offsets {} timed out before completion", allConsumedOffsets);
                }
            }
            catch (InterruptException | WakeupException e) {
                this.log.debug("Auto-commit of offsets {} was interrupted before completion", allConsumedOffsets);
                throw e;
            }
            catch (Exception e) {
                this.log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumedOffsets, (Object)e.getMessage());
            }
        }
    }

    private RequestFuture<Void> sendOffsetCommitRequest(Map<TopicPartition, OffsetAndMetadata> offsets) {
        AbstractCoordinator.Generation generation;
        if (offsets.isEmpty()) {
            return RequestFuture.voidSuccess();
        }
        Node coordinator = this.checkAndGetCoordinator();
        if (coordinator == null) {
            return RequestFuture.coordinatorNotAvailable();
        }
        HashMap<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<String, OffsetCommitRequestData.OffsetCommitRequestTopic>();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            OffsetAndMetadata offsetAndMetadata = entry.getValue();
            if (offsetAndMetadata.offset() < 0L) {
                return RequestFuture.failure(new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset()));
            }
            OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap.getOrDefault(topicPartition.topic(), new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topicPartition.topic()));
            topic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(topicPartition.partition()).setCommittedOffset(offsetAndMetadata.offset()).setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(-1)).setCommittedMetadata(offsetAndMetadata.metadata()));
            requestTopicDataMap.put(topicPartition.topic(), topic);
        }
        if (this.subscriptions.hasAutoAssignedPartitions()) {
            generation = this.generationIfStable();
            if (generation == null) {
                this.log.info("Failing OffsetCommit request since the consumer is not part of an active group");
                if (this.rebalanceInProgress()) {
                    return RequestFuture.failure(new RebalanceInProgressException("Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation."));
                }
                return RequestFuture.failure(new CommitFailedException("Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group."));
            }
        } else {
            generation = AbstractCoordinator.Generation.NO_GENERATION;
        }
        OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(new OffsetCommitRequestData().setGroupId(this.rebalanceConfig.groupId).setGenerationId(generation.generationId).setMemberId(generation.memberId).setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null)).setTopics(new ArrayList<OffsetCommitRequestData.OffsetCommitRequestTopic>(requestTopicDataMap.values())));
        this.log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, (Object)coordinator);
        return this.client.send(coordinator, builder).compose(new OffsetCommitResponseHandler(offsets));
    }

    private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
        Node coordinator = this.checkAndGetCoordinator();
        if (coordinator == null) {
            return RequestFuture.coordinatorNotAvailable();
        }
        this.log.debug("Fetching committed offsets for partitions: {}", partitions);
        OffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder(this.rebalanceConfig.groupId, true, new ArrayList<TopicPartition>(partitions));
        return this.client.send(coordinator, requestBuilder).compose(new OffsetFetchResponseHandler());
    }

    ConsumerPartitionAssignor.RebalanceProtocol getProtocol() {
        return this.protocol;
    }

    private static class OffsetCommitCompletion {
        private final OffsetCommitCallback callback;
        private final Map<TopicPartition, OffsetAndMetadata> offsets;
        private final Exception exception;

        private OffsetCommitCompletion(OffsetCommitCallback callback, Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            this.callback = callback;
            this.offsets = offsets;
            this.exception = exception;
        }

        public void invoke() {
            if (this.callback != null) {
                this.callback.onComplete(this.offsets, this.exception);
            }
        }
    }

    private static class MetadataSnapshot {
        private final int version;
        private final Map<String, Integer> partitionsPerTopic;

        private MetadataSnapshot(SubscriptionState subscription, Cluster cluster, int version) {
            HashMap<String, Integer> partitionsPerTopic = new HashMap<String, Integer>();
            for (String topic : subscription.metadataTopics()) {
                Integer numPartitions = cluster.partitionCountForTopic(topic);
                if (numPartitions == null) continue;
                partitionsPerTopic.put(topic, numPartitions);
            }
            this.partitionsPerTopic = partitionsPerTopic;
            this.version = version;
        }

        boolean matches(MetadataSnapshot other) {
            return this.version == other.version || this.partitionsPerTopic.equals(other.partitionsPerTopic);
        }
    }

    private class ConsumerCoordinatorMetrics {
        private final String metricGrpName;
        private final Sensor commitSensor;
        private final Sensor revokeCallbackSensor;
        private final Sensor assignCallbackSensor;
        private final Sensor loseCallbackSensor;

        private ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
            this.commitSensor = metrics.sensor("commit-latency");
            this.commitSensor.add(metrics.metricName("commit-latency-avg", this.metricGrpName, "The average time taken for a commit request"), new Avg());
            this.commitSensor.add(metrics.metricName("commit-latency-max", this.metricGrpName, "The max time taken for a commit request"), new Max());
            this.commitSensor.add(ConsumerCoordinator.this.createMeter(metrics, this.metricGrpName, "commit", "commit calls"));
            this.revokeCallbackSensor = metrics.sensor("partition-revoked-latency");
            this.revokeCallbackSensor.add(metrics.metricName("partition-revoked-latency-avg", this.metricGrpName, "The average time taken for a partition-revoked rebalance listener callback"), new Avg());
            this.revokeCallbackSensor.add(metrics.metricName("partition-revoked-latency-max", this.metricGrpName, "The max time taken for a partition-revoked rebalance listener callback"), new Max());
            this.assignCallbackSensor = metrics.sensor("partition-assigned-latency");
            this.assignCallbackSensor.add(metrics.metricName("partition-assigned-latency-avg", this.metricGrpName, "The average time taken for a partition-assigned rebalance listener callback"), new Avg());
            this.assignCallbackSensor.add(metrics.metricName("partition-assigned-latency-max", this.metricGrpName, "The max time taken for a partition-assigned rebalance listener callback"), new Max());
            this.loseCallbackSensor = metrics.sensor("partition-lost-latency");
            this.loseCallbackSensor.add(metrics.metricName("partition-lost-latency-avg", this.metricGrpName, "The average time taken for a partition-lost rebalance listener callback"), new Avg());
            this.loseCallbackSensor.add(metrics.metricName("partition-lost-latency-max", this.metricGrpName, "The max time taken for a partition-lost rebalance listener callback"), new Max());
            Measurable numParts = (config, now) -> ConsumerCoordinator.this.subscriptions.numAssignedPartitions();
            metrics.addMetric(metrics.metricName("assigned-partitions", this.metricGrpName, "The number of partitions currently assigned to this consumer"), numParts);
        }
    }

    private class OffsetFetchResponseHandler
    extends AbstractCoordinator.CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> {
        private OffsetFetchResponseHandler() {
        }

        @Override
        public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
            if (response.hasError()) {
                Errors error = response.error();
                ConsumerCoordinator.this.log.debug("Offset fetch failed: {}", (Object)error.message());
                if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                    future.raise(error);
                } else if (error == Errors.NOT_COORDINATOR) {
                    ConsumerCoordinator.this.markCoordinatorUnknown();
                    future.raise(error);
                } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    future.raise(GroupAuthorizationException.forGroupId(((ConsumerCoordinator)ConsumerCoordinator.this).rebalanceConfig.groupId));
                } else {
                    future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
                }
                return;
            }
            HashSet<String> unauthorizedTopics = null;
            HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>(response.responseData().size());
            HashSet<TopicPartition> unstableTxnOffsetTopicPartitions = new HashSet<TopicPartition>();
            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                TopicPartition tp = entry.getKey();
                OffsetFetchResponse.PartitionData partitionData = entry.getValue();
                if (partitionData.hasError()) {
                    Errors error = partitionData.error;
                    ConsumerCoordinator.this.log.debug("Failed to fetch offset for partition {}: {}", (Object)tp, (Object)error.message());
                    if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                        future.raise(new KafkaException("Topic or Partition " + tp + " does not exist"));
                        return;
                    }
                    if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                        if (unauthorizedTopics == null) {
                            unauthorizedTopics = new HashSet<String>();
                        }
                        unauthorizedTopics.add(tp.topic());
                        continue;
                    }
                    if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
                        unstableTxnOffsetTopicPartitions.add(tp);
                        continue;
                    }
                    future.raise(new KafkaException("Unexpected error in fetch offset response for partition " + tp + ": " + error.message()));
                    return;
                }
                if (partitionData.offset >= 0L) {
                    offsets.put(tp, new OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, partitionData.metadata));
                    continue;
                }
                ConsumerCoordinator.this.log.info("Found no committed offset for partition {}", (Object)tp);
                offsets.put(tp, null);
            }
            if (unauthorizedTopics != null) {
                future.raise(new TopicAuthorizationException(unauthorizedTopics));
            } else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
                ConsumerCoordinator.this.log.info("The following partitions still have unstable offsets which are not cleared on the broker side: {}, this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log", unstableTxnOffsetTopicPartitions);
                future.raise(new UnstableOffsetCommitException("There are unstable offsets for the requested topic partitions"));
            } else {
                future.complete(offsets);
            }
        }
    }

    private class OffsetCommitResponseHandler
    extends AbstractCoordinator.CoordinatorResponseHandler<OffsetCommitResponse, Void> {
        private final Map<TopicPartition, OffsetAndMetadata> offsets;

        private OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
            this.offsets = offsets;
        }

        @Override
        public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
            ConsumerCoordinator.this.sensors.commitSensor.record(this.response.requestLatencyMs());
            HashSet<String> unauthorizedTopics = new HashSet<String>();
            for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) {
                for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) {
                    TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
                    OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
                    long offset = offsetAndMetadata.offset();
                    Errors error = Errors.forCode(partition.errorCode());
                    if (error == Errors.NONE) {
                        ConsumerCoordinator.this.log.debug("Committed offset {} for partition {}", (Object)offset, (Object)tp);
                        continue;
                    }
                    if (error.exception() instanceof RetriableException) {
                        ConsumerCoordinator.this.log.warn("Offset commit failed on partition {} at offset {}: {}", new Object[]{tp, offset, error.message()});
                    } else {
                        ConsumerCoordinator.this.log.error("Offset commit failed on partition {} at offset {}: {}", new Object[]{tp, offset, error.message()});
                    }
                    if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                        future.raise(GroupAuthorizationException.forGroupId(((ConsumerCoordinator)ConsumerCoordinator.this).rebalanceConfig.groupId));
                        return;
                    }
                    if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                        unauthorizedTopics.add(tp.topic());
                        continue;
                    }
                    if (error == Errors.OFFSET_METADATA_TOO_LARGE || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
                        future.raise(error);
                        return;
                    }
                    if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                        future.raise(error);
                        return;
                    }
                    if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR || error == Errors.REQUEST_TIMED_OUT) {
                        ConsumerCoordinator.this.markCoordinatorUnknown();
                        future.raise(error);
                        return;
                    }
                    if (error == Errors.FENCED_INSTANCE_ID) {
                        ConsumerCoordinator.this.log.error("Received fatal exception: group.instance.id gets fenced");
                        future.raise(error);
                        return;
                    }
                    if (error == Errors.REBALANCE_IN_PROGRESS) {
                        ConsumerCoordinator.this.requestRejoin();
                        future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the consumer group is executing a rebalance at the moment. You can try completing the rebalance by calling poll() and then retry commit again"));
                        return;
                    }
                    if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) {
                        ConsumerCoordinator.this.resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error);
                        future.raise(new CommitFailedException());
                        return;
                    }
                    future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
                    return;
                }
            }
            if (!unauthorizedTopics.isEmpty()) {
                ConsumerCoordinator.this.log.error("Not authorized to commit to topics {}", unauthorizedTopics);
                future.raise(new TopicAuthorizationException(unauthorizedTopics));
            } else {
                future.complete(null);
            }
        }
    }

    private class DefaultOffsetCommitCallback
    implements OffsetCommitCallback {
        private DefaultOffsetCommitCallback() {
        }

        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null) {
                ConsumerCoordinator.this.log.error("Offset commit with offsets {} failed", offsets, (Object)exception);
            }
        }
    }

    private static class PendingCommittedOffsetRequest {
        private final Set<TopicPartition> requestedPartitions;
        private final AbstractCoordinator.Generation requestedGeneration;
        private final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> response;

        private PendingCommittedOffsetRequest(Set<TopicPartition> requestedPartitions, AbstractCoordinator.Generation generationAtRequestTime, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> response) {
            this.requestedPartitions = Objects.requireNonNull(requestedPartitions);
            this.response = Objects.requireNonNull(response);
            this.requestedGeneration = generationAtRequestTime;
        }

        private boolean sameRequest(Set<TopicPartition> currentRequest, AbstractCoordinator.Generation currentGeneration) {
            return Objects.equals(this.requestedGeneration, currentGeneration) && this.requestedPartitions.equals(currentRequest);
        }
    }
}

