package org.springframework.batch.item.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/batch/item/kafka/KafkaItemReader.class */
public class KafkaItemReader<K, V> extends AbstractItemStreamItemReader<V> {
    private static final String TOPIC_PARTITION_OFFSETS = "topic.partition.offsets";
    private static final long DEFAULT_POLL_TIMEOUT = 30;
    private final List<TopicPartition> topicPartitions;
    private Map<TopicPartition, Long> partitionOffsets;
    private KafkaConsumer<K, V> kafkaConsumer;
    private final Properties consumerProperties;
    private Iterator<ConsumerRecord<K, V>> consumerRecords;
    private Duration pollTimeout;
    private boolean saveState;

    public KafkaItemReader(Properties properties, String str, Integer... numArr) {
        this(properties, str, (List<Integer>) Arrays.asList(numArr));
    }

    public KafkaItemReader(Properties properties, String str, List<Integer> list) {
        this.pollTimeout = Duration.ofSeconds(DEFAULT_POLL_TIMEOUT);
        this.saveState = true;
        Assert.notNull(properties, "Consumer properties must not be null");
        Assert.isTrue(properties.containsKey("bootstrap.servers"), "bootstrap.servers property must be provided");
        Assert.isTrue(properties.containsKey("group.id"), "group.id property must be provided");
        Assert.isTrue(properties.containsKey("key.deserializer"), "key.deserializer property must be provided");
        Assert.isTrue(properties.containsKey("value.deserializer"), "value.deserializer property must be provided");
        this.consumerProperties = properties;
        Assert.hasLength(str, "Topic name must not be null or empty");
        Assert.isTrue(!list.isEmpty(), "At least one partition must be provided");
        this.topicPartitions = new ArrayList();
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            this.topicPartitions.add(new TopicPartition(str, it.next().intValue()));
        }
    }

    public void setPollTimeout(Duration duration) {
        Assert.notNull(duration, "pollTimeout must not be null");
        Assert.isTrue(!duration.isZero(), "pollTimeout must not be zero");
        Assert.isTrue(!duration.isNegative(), "pollTimeout must not be negative");
        this.pollTimeout = duration;
    }

    public void setSaveState(boolean z) {
        this.saveState = z;
    }

    public boolean isSaveState() {
        return this.saveState;
    }

    public void setPartitionOffsets(Map<TopicPartition, Long> map) {
        this.partitionOffsets = map;
    }

    @Override // org.springframework.batch.item.ItemStream
    public void open(ExecutionContext executionContext) {
        this.kafkaConsumer = new KafkaConsumer<>(this.consumerProperties);
        if (this.partitionOffsets == null) {
            this.partitionOffsets = new HashMap();
            Iterator<TopicPartition> it = this.topicPartitions.iterator();
            while (it.hasNext()) {
                this.partitionOffsets.put(it.next(), 0L);
            }
        }
        if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) {
            for (Map.Entry<K, V> entry : ((Map) executionContext.get(TOPIC_PARTITION_OFFSETS)).entrySet()) {
                this.partitionOffsets.put((TopicPartition) entry.getKey(), Long.valueOf(((Long) entry.getValue()).longValue() == 0 ? 0L : ((Long) entry.getValue()).longValue() + 1));
            }
        }
        this.kafkaConsumer.assign(this.topicPartitions);
        Map<TopicPartition, Long> map = this.partitionOffsets;
        KafkaConsumer<K, V> kafkaConsumer = this.kafkaConsumer;
        Objects.requireNonNull(kafkaConsumer);
        map.forEach((v1, v2) -> {
            r1.seek(v1, v2);
        });
    }

    @Override // org.springframework.batch.item.ItemReader
    @Nullable
    public V read() {
        if (this.consumerRecords == null || !this.consumerRecords.hasNext()) {
            this.consumerRecords = this.kafkaConsumer.poll(this.pollTimeout).iterator();
        }
        if (!this.consumerRecords.hasNext()) {
            return null;
        }
        ConsumerRecord<K, V> next = this.consumerRecords.next();
        this.partitionOffsets.put(new TopicPartition(next.topic(), next.partition()), Long.valueOf(next.offset()));
        return (V) next.value();
    }

    @Override // org.springframework.batch.item.ItemStream
    public void update(ExecutionContext executionContext) {
        if (this.saveState) {
            executionContext.put(TOPIC_PARTITION_OFFSETS, new HashMap(this.partitionOffsets));
        }
        this.kafkaConsumer.commitSync();
    }

    @Override // org.springframework.batch.item.ItemStream
    public void close() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.close();
        }
    }
}
