package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.vertx.core.Context;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/KafkaRecordBatchStream.class */
public class KafkaRecordBatchStream<K, V> extends AbstractMulti<ConsumerRecords<K, V>> {
    private final ReactiveKafkaConsumer<K, V> client;
    private final KafkaConnectorIncomingConfiguration config;
    private final Context context;
    private final Set<KafkaRecordStreamSubscription<K, V, ConsumerRecords<K, V>>> subscriptions = Collections.newSetFromMap(new ConcurrentHashMap());

    public KafkaRecordBatchStream(ReactiveKafkaConsumer<K, V> reactiveKafkaConsumer, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Context context) {
        this.config = kafkaConnectorIncomingConfiguration;
        this.client = reactiveKafkaConsumer;
        this.context = context;
    }

    public void subscribe(MultiSubscriber<? super ConsumerRecords<K, V>> multiSubscriber) {
        KafkaRecordStreamSubscription<K, V, ConsumerRecords<K, V>> kafkaRecordStreamSubscription = new KafkaRecordStreamSubscription<>(this.client, this.config, multiSubscriber, this.context, 1, (consumerRecords, recordQueue) -> {
            recordQueue.offer(consumerRecords);
        });
        this.subscriptions.add(kafkaRecordStreamSubscription);
        multiSubscriber.onSubscribe(kafkaRecordStreamSubscription);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeFromQueueRecordsFromTopicPartitions(Collection<TopicPartition> collection) {
        if (collection.isEmpty()) {
            return;
        }
        this.subscriptions.forEach(kafkaRecordStreamSubscription -> {
            removeFromQueue(kafkaRecordStreamSubscription, collection);
        });
    }

    private void removeFromQueue(KafkaRecordStreamSubscription<K, V, ConsumerRecords<K, V>> kafkaRecordStreamSubscription, Collection<TopicPartition> collection) {
        kafkaRecordStreamSubscription.rewriteQueue(consumerRecords -> {
            HashMap hashMap = new HashMap();
            consumerRecords.partitions().stream().filter(topicPartition -> {
                return !collection.contains(topicPartition);
            }).forEach(topicPartition2 -> {
                hashMap.put(topicPartition2, consumerRecords.records(topicPartition2));
            });
            if (hashMap.isEmpty()) {
                return null;
            }
            return new ConsumerRecords(hashMap);
        });
    }
}
