/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest.v2;

import io.confluent.kafkarest.ConsumerInstanceId;
import io.confluent.kafkarest.ConsumerRecordAndSize;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.kafkarest.entities.v2.ConsumerAssignmentRequest;
import io.confluent.kafkarest.entities.v2.ConsumerCommittedRequest;
import io.confluent.kafkarest.entities.v2.ConsumerCommittedResponse;
import io.confluent.kafkarest.entities.v2.ConsumerOffsetCommitRequest;
import io.confluent.kafkarest.entities.v2.ConsumerSeekRequest;
import io.confluent.kafkarest.entities.v2.ConsumerSeekToRequest;
import io.confluent.kafkarest.entities.v2.ConsumerSubscriptionRecord;
import io.confluent.kafkarest.entities.v2.TopicPartition;
import io.confluent.kafkarest.entities.v2.TopicPartitionOffsetMetadata;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.Vector;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.ws.rs.InternalServerErrorException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;

public abstract class KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> {
    private ConsumerInstanceId instanceId;
    private Consumer<KafkaKeyT, KafkaValueT> consumer;
    private final Clock clock = Clock.systemUTC();
    private final Duration consumerInstanceTimeout;
    private final ConsumerInstanceConfig consumerInstanceConfig;
    private final Queue<ConsumerRecord<KafkaKeyT, KafkaValueT>> consumerRecords = new ArrayDeque<ConsumerRecord<KafkaKeyT, KafkaValueT>>();
    volatile Instant expiration;

    KafkaConsumerState(KafkaRestConfig config, ConsumerInstanceConfig consumerInstanceConfig, ConsumerInstanceId instanceId, Consumer<KafkaKeyT, KafkaValueT> consumer) {
        this.instanceId = instanceId;
        this.consumer = consumer;
        this.consumerInstanceTimeout = Duration.ofMillis(config.getInt("consumer.instance.timeout.ms").intValue());
        this.expiration = this.clock.instant().plus(this.consumerInstanceTimeout);
        this.consumerInstanceConfig = consumerInstanceConfig;
    }

    public ConsumerInstanceId getId() {
        return this.instanceId;
    }

    public ConsumerInstanceConfig getConsumerInstanceConfig() {
        return this.consumerInstanceConfig;
    }

    public abstract ConsumerRecordAndSize<ClientKeyT, ClientValueT> createConsumerRecord(ConsumerRecord<KafkaKeyT, KafkaValueT> var1);

    public synchronized List<TopicPartitionOffset> commitOffsets(String async, ConsumerOffsetCommitRequest offsetCommitRequest) {
        if (offsetCommitRequest == null) {
            if (async == null) {
                this.consumer.commitSync();
            } else {
                this.consumer.commitAsync();
            }
        } else {
            HashMap<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<org.apache.kafka.common.TopicPartition, OffsetAndMetadata>();
            for (TopicPartitionOffsetMetadata t : offsetCommitRequest.getOffsets()) {
                if (t.getMetadata() == null) {
                    offsetMap.put(new org.apache.kafka.common.TopicPartition(t.getTopic(), t.getPartition().intValue()), new OffsetAndMetadata(t.getOffset() + 1L));
                    continue;
                }
                offsetMap.put(new org.apache.kafka.common.TopicPartition(t.getTopic(), t.getPartition().intValue()), new OffsetAndMetadata(t.getOffset() + 1L, t.getMetadata()));
            }
            this.consumer.commitSync(offsetMap);
        }
        Vector<TopicPartitionOffset> result = new Vector<TopicPartitionOffset>();
        return result;
    }

    public synchronized void seekToBeginning(ConsumerSeekToRequest seekToRequest) {
        if (seekToRequest != null) {
            Vector<org.apache.kafka.common.TopicPartition> topicPartitions = new Vector<org.apache.kafka.common.TopicPartition>();
            for (TopicPartition t : seekToRequest.getPartitions()) {
                topicPartitions.add(new org.apache.kafka.common.TopicPartition(t.getTopic(), t.getPartition().intValue()));
            }
            this.consumer.seekToBeginning(topicPartitions);
        }
    }

    public synchronized void seekToEnd(ConsumerSeekToRequest seekToRequest) {
        if (seekToRequest != null) {
            Vector<org.apache.kafka.common.TopicPartition> topicPartitions = new Vector<org.apache.kafka.common.TopicPartition>();
            for (TopicPartition t : seekToRequest.getPartitions()) {
                topicPartitions.add(new org.apache.kafka.common.TopicPartition(t.getTopic(), t.getPartition().intValue()));
            }
            this.consumer.seekToEnd(topicPartitions);
        }
    }

    public synchronized void seek(ConsumerSeekRequest request) {
        if (request == null) {
            return;
        }
        for (ConsumerSeekRequest.PartitionOffset partition2 : request.getOffsets()) {
            this.consumer.seek(new org.apache.kafka.common.TopicPartition(partition2.getTopic(), partition2.getPartition()), new OffsetAndMetadata(partition2.getOffset(), partition2.getMetadata().orElse("")));
        }
        Map<org.apache.kafka.common.TopicPartition, Optional> metadata = request.getTimestamps().stream().collect(Collectors.toMap(partition -> new org.apache.kafka.common.TopicPartition(partition.getTopic(), partition.getPartition()), ConsumerSeekRequest.PartitionTimestamp::getMetadata));
        Map offsets = this.consumer.offsetsForTimes(request.getTimestamps().stream().collect(Collectors.toMap(partition -> new org.apache.kafka.common.TopicPartition(partition.getTopic(), partition.getPartition()), partition -> partition.getTimestamp().toEpochMilli())));
        for (Map.Entry offset : offsets.entrySet()) {
            this.consumer.seek((org.apache.kafka.common.TopicPartition)offset.getKey(), new OffsetAndMetadata(((OffsetAndTimestamp)offset.getValue()).offset(), metadata.get(offset.getKey()).orElse("")));
        }
    }

    public synchronized void assign(ConsumerAssignmentRequest assignmentRequest) {
        if (assignmentRequest != null) {
            Vector<org.apache.kafka.common.TopicPartition> topicPartitions = new Vector<org.apache.kafka.common.TopicPartition>();
            for (TopicPartition t : assignmentRequest.getPartitions()) {
                topicPartitions.add(new org.apache.kafka.common.TopicPartition(t.getTopic(), t.getPartition().intValue()));
            }
            this.consumer.assign(topicPartitions);
        }
    }

    public synchronized void close() {
        if (this.consumer != null) {
            this.consumer.close();
        }
        this.consumer = null;
    }

    public synchronized void subscribe(ConsumerSubscriptionRecord subscription) {
        if (subscription == null) {
            return;
        }
        if (this.consumer != null) {
            if (subscription.getTopics() != null) {
                this.consumer.subscribe(subscription.getTopics());
            } else if (subscription.getTopicPattern() != null) {
                Pattern topicPattern = Pattern.compile(subscription.getTopicPattern());
                NoOpOnRebalance noOpOnRebalance = new NoOpOnRebalance();
                this.consumer.subscribe(topicPattern, (ConsumerRebalanceListener)noOpOnRebalance);
            }
        }
    }

    public synchronized void unsubscribe() {
        if (this.consumer != null) {
            this.consumer.unsubscribe();
        }
    }

    public synchronized Set<String> subscription() {
        Set currSubscription = null;
        if (this.consumer != null) {
            currSubscription = this.consumer.subscription();
        }
        return currSubscription;
    }

    public synchronized Set<org.apache.kafka.common.TopicPartition> assignment() {
        Set currAssignment = null;
        if (this.consumer != null) {
            currAssignment = this.consumer.assignment();
        }
        return currAssignment;
    }

    public synchronized ConsumerCommittedResponse committed(ConsumerCommittedRequest request) {
        Vector<TopicPartitionOffsetMetadata> offsets = new Vector<TopicPartitionOffsetMetadata>();
        if (this.consumer != null) {
            for (TopicPartition t : request.getPartitions()) {
                org.apache.kafka.common.TopicPartition partition = new org.apache.kafka.common.TopicPartition(t.getTopic(), t.getPartition().intValue());
                OffsetAndMetadata offsetMetadata = this.consumer.committed(partition);
                if (offsetMetadata == null) continue;
                offsets.add(new TopicPartitionOffsetMetadata(partition.topic(), partition.partition(), offsetMetadata.offset(), offsetMetadata.metadata()));
            }
        }
        return new ConsumerCommittedResponse(offsets);
    }

    synchronized long getBeginningOffset(String topic, int partition) {
        if (this.consumer == null) {
            throw new IllegalStateException("KafkaConsumerState has been closed.");
        }
        Map response = this.consumer.beginningOffsets(Collections.singletonList(new org.apache.kafka.common.TopicPartition(topic, partition)));
        if (response.size() != 1) {
            throw new InternalServerErrorException(String.format("Expected one offset, but got %d instead.", response.size()));
        }
        return (Long)response.values().stream().findAny().get();
    }

    synchronized long getEndOffset(String topic, int partition) {
        if (this.consumer == null) {
            throw new IllegalStateException("KafkaConsumerState has been closed.");
        }
        Map response = this.consumer.endOffsets(Collections.singletonList(new org.apache.kafka.common.TopicPartition(topic, partition)));
        if (response.size() != 1) {
            throw new InternalServerErrorException(String.format("Expected one offset, but got %d instead.", response.size()));
        }
        return (Long)response.values().stream().findAny().get();
    }

    synchronized Optional<Long> getOffsetForTime(String topic, int partition, Instant timestamp) {
        if (this.consumer == null) {
            throw new IllegalStateException("KafkaConsumerState has been closed.");
        }
        Map response = this.consumer.offsetsForTimes(Collections.singletonMap(new org.apache.kafka.common.TopicPartition(topic, partition), timestamp.toEpochMilli()));
        if (response.size() != 1) {
            throw new InternalServerErrorException(String.format("Expected one offset, but got %d instead.", response.size()));
        }
        return response.values().stream().filter(Objects::nonNull).findAny().map(OffsetAndTimestamp::offset);
    }

    public synchronized boolean expired(Instant now) {
        return !this.expiration.isAfter(now);
    }

    public synchronized void updateExpiration() {
        this.expiration = this.clock.instant().plus(this.consumerInstanceTimeout);
    }

    synchronized ConsumerRecord<KafkaKeyT, KafkaValueT> peek() {
        return this.consumerRecords.peek();
    }

    synchronized boolean hasNext() {
        if (this.hasNextCached()) {
            return true;
        }
        this.getOrCreateConsumerRecords();
        return this.hasNextCached();
    }

    synchronized boolean hasNextCached() {
        return !this.consumerRecords.isEmpty();
    }

    synchronized ConsumerRecord<KafkaKeyT, KafkaValueT> next() {
        return this.consumerRecords.poll();
    }

    private synchronized void getOrCreateConsumerRecords() {
        ConsumerRecords polledRecords = this.consumer.poll(0L);
        for (ConsumerRecord consumerRecord : polledRecords) {
            this.consumerRecords.add(consumerRecord);
        }
    }

    private class NoOpOnRebalance
    implements ConsumerRebalanceListener {
        public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions) {
        }

        public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions) {
        }
    }
}

