/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.admin.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.internals.AdminApiFuture;
import org.apache.kafka.clients.admin.internals.AdminApiHandler;
import org.apache.kafka.clients.admin.internals.AdminApiLookupStrategy;
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.admin.internals.CoordinatorStrategy;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RequestUtils;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

public class ListConsumerGroupOffsetsHandler
implements AdminApiHandler<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> {
    private final boolean requireStable;
    private final Map<String, ListConsumerGroupOffsetsSpec> groupSpecs;
    private final Logger log;
    private final CoordinatorStrategy lookupStrategy;

    public ListConsumerGroupOffsetsHandler(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, boolean requireStable, LogContext logContext) {
        this.log = logContext.logger(ListConsumerGroupOffsetsHandler.class);
        this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
        this.groupSpecs = groupSpecs;
        this.requireStable = requireStable;
    }

    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> newFuture(List<String> groupIds) {
        return AdminApiFuture.forKeys(groupIds.stream().map(CoordinatorKey::byGroupId).collect(Collectors.toSet()));
    }

    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> newFuture(Collection<String> groupIds) {
        return AdminApiFuture.forKeys(ListConsumerGroupOffsetsHandler.coordinatorKeys(groupIds));
    }

    @Override
    public String apiName() {
        return "offsetFetch";
    }

    @Override
    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
        return this.lookupStrategy;
    }

    private void validateKeys(Set<CoordinatorKey> groupIds) {
        Set<CoordinatorKey> keys = ListConsumerGroupOffsetsHandler.coordinatorKeys(this.groupSpecs.keySet());
        if (!keys.containsAll(groupIds)) {
            throw new IllegalArgumentException("Received unexpected group ids " + String.valueOf(groupIds) + " (expected one of " + String.valueOf(keys) + ")");
        }
    }

    private static Set<CoordinatorKey> coordinatorKeys(Collection<String> groupIds) {
        return groupIds.stream().map(CoordinatorKey::byGroupId).collect(Collectors.toSet());
    }

    public OffsetFetchRequest.Builder buildBatchedRequest(Set<CoordinatorKey> groupIds) {
        return OffsetFetchRequest.Builder.forTopicNames(new OffsetFetchRequestData().setRequireStable(this.requireStable).setGroups(groupIds.stream().map(groupId -> {
            ListConsumerGroupOffsetsSpec spec = this.groupSpecs.get(groupId.idValue);
            List topics = null;
            if (spec.topicPartitions() != null) {
                topics = spec.topicPartitions().stream().collect(Collectors.groupingBy(TopicPartition::topic)).entrySet().stream().map(entry -> new OffsetFetchRequestData.OffsetFetchRequestTopics().setName((String)entry.getKey()).setPartitionIndexes(((List)entry.getValue()).stream().map(TopicPartition::partition).collect(Collectors.toList()))).collect(Collectors.toList());
            }
            return new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(groupId.idValue).setTopics(topics);
        }).collect(Collectors.toList())), false);
    }

    @Override
    public Collection<AdminApiHandler.RequestAndKeys<CoordinatorKey>> buildRequest(int brokerId, Set<CoordinatorKey> groupIds) {
        this.validateKeys(groupIds);
        if (this.lookupStrategy.batch()) {
            return Collections.singletonList(new AdminApiHandler.RequestAndKeys<CoordinatorKey>(this.buildBatchedRequest(groupIds), groupIds));
        }
        return groupIds.stream().map(groupId -> {
            Set<CoordinatorKey> keys = Collections.singleton(groupId);
            return new AdminApiHandler.RequestAndKeys<CoordinatorKey>(this.buildBatchedRequest(keys), keys);
        }).collect(Collectors.toList());
    }

    @Override
    public AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleResponse(Node coordinator, Set<CoordinatorKey> groupIds, AbstractResponse abstractResponse) {
        this.validateKeys(groupIds);
        OffsetFetchResponse response = (OffsetFetchResponse)abstractResponse;
        HashMap completed = new HashMap();
        HashMap<CoordinatorKey, Throwable> failed = new HashMap<CoordinatorKey, Throwable>();
        ArrayList<CoordinatorKey> unmapped = new ArrayList<CoordinatorKey>();
        for (CoordinatorKey coordinatorKey : groupIds) {
            String groupId = coordinatorKey.idValue;
            OffsetFetchResponseData.OffsetFetchResponseGroup group = response.group(groupId);
            Errors error = Errors.forCode(group.errorCode());
            if (error != Errors.NONE) {
                this.handleGroupError(coordinatorKey, error, failed, unmapped);
                continue;
            }
            HashMap offsets = new HashMap();
            group.topics().forEach(topic -> topic.partitions().forEach(partition -> {
                TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
                Errors partitionError = Errors.forCode(partition.errorCode());
                if (partitionError == Errors.NONE) {
                    if (partition.committedOffset() < 0L) {
                        offsets.put(tp, null);
                    } else {
                        offsets.put(tp, new OffsetAndMetadata(partition.committedOffset(), RequestUtils.getLeaderEpoch(partition.committedLeaderEpoch()), partition.metadata()));
                    }
                } else {
                    this.log.warn("Skipping return offset for {} due to error {}.", (Object)tp, (Object)partitionError);
                }
            }));
            completed.put(coordinatorKey, offsets);
        }
        return new AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>>(completed, failed, unmapped);
    }

    private void handleGroupError(CoordinatorKey groupId, Errors error, Map<CoordinatorKey, Throwable> failed, List<CoordinatorKey> groupsToUnmap) {
        switch (error) {
            case GROUP_AUTHORIZATION_FAILED: 
            case UNKNOWN_MEMBER_ID: 
            case STALE_MEMBER_EPOCH: {
                this.log.debug("`OffsetFetch` request for group id {} failed due to error {}", (Object)groupId.idValue, (Object)error);
                failed.put(groupId, error.exception());
                break;
            }
            case COORDINATOR_LOAD_IN_PROGRESS: {
                this.log.debug("`OffsetFetch` request for group id {} failed because the coordinator is still in the process of loading state. Will retry", (Object)groupId.idValue);
                break;
            }
            case COORDINATOR_NOT_AVAILABLE: 
            case NOT_COORDINATOR: {
                this.log.debug("`OffsetFetch` request for group id {} returned error {}. Will attempt to find the coordinator again and retry", (Object)groupId.idValue, (Object)error);
                groupsToUnmap.add(groupId);
                break;
            }
            default: {
                this.log.error("`OffsetFetch` request for group id {} failed due to unexpected error {}", (Object)groupId.idValue, (Object)error);
                failed.put(groupId, error.exception());
            }
        }
    }
}

