package io.smallrye.reactive.messaging.kafka.commit;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.vertx.mutiny.core.Vertx;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.jboss.logging.Logger;

@Experimental("Experimental API")
/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaCheckpointCommit.class */
public abstract class KafkaCheckpointCommit extends ContextHolder implements KafkaCommitHandler {
    protected KafkaLogging log;
    protected final Map<TopicPartition, ProcessingState<?>> processingStateMap;
    protected final KafkaConnectorIncomingConfiguration config;
    protected final KafkaConsumer<?, ?> consumer;
    protected final BiConsumer<Throwable, Boolean> reportFailure;

    public KafkaCheckpointCommit(Vertx vertx, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, KafkaConsumer<?, ?> kafkaConsumer, BiConsumer<Throwable, Boolean> biConsumer, int i) {
        super(vertx, i);
        this.log = (KafkaLogging) Logger.getMessageLogger(KafkaLogging.class, "io.smallrye.reactive.messaging.kafka");
        this.processingStateMap = new HashMap();
        this.config = kafkaConnectorIncomingConfiguration;
        this.consumer = kafkaConsumer;
        this.reportFailure = biConsumer;
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public <K, V> Uni<IncomingKafkaRecord<K, V>> received(IncomingKafkaRecord<K, V> incomingKafkaRecord) {
        return Uni.createFrom().item(incomingKafkaRecord).emitOn(this::runOnContext).onItem().transform(incomingKafkaRecord2 -> {
            TopicPartition topicPartition = new TopicPartition(incomingKafkaRecord.getTopic(), incomingKafkaRecord.getPartition());
            incomingKafkaRecord2.injectMetadata(new StateStore(topicPartition, incomingKafkaRecord.getOffset(), () -> {
                return this.processingStateMap.get(topicPartition);
            }));
            return incomingKafkaRecord2;
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> incomingKafkaRecord) {
        TopicPartition topicPartition = new TopicPartition(incomingKafkaRecord.getTopic(), incomingKafkaRecord.getPartition());
        ProcessingState processingState = StateStore.getProcessingState(incomingKafkaRecord);
        boolean isPersist = StateStore.isPersist(incomingKafkaRecord);
        if (processingState == null) {
            return Uni.createFrom().voidItem();
        }
        Uni emitOn = Uni.createFrom().item(processingState).emitOn(this::runOnContext).onItem().invoke(processingState2 -> {
            this.processingStateMap.put(topicPartition, processingState2);
        }).chain(processingState3 -> {
            return isPersist ? persistProcessingState(topicPartition, processingState) : Uni.createFrom().voidItem();
        }).emitOn(this::runOnContext);
        Objects.requireNonNull(incomingKafkaRecord);
        return emitOn.emitOn(incomingKafkaRecord::runOnMessageContext).replaceWithVoid();
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public void terminate(boolean z) {
        Uni emitOn = this.consumer.getAssignments().chain((v1) -> {
            return persistStateFor(v1);
        }).emitOn(this::runOnContext);
        Map<TopicPartition, ProcessingState<?>> map = this.processingStateMap;
        Objects.requireNonNull(map);
        emitOn.invoke(map::clear).await().atMost(Duration.ofMillis(getTimeoutInMillis()));
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public void partitionsAssigned(Collection<TopicPartition> collection) {
        List<Tuple2> list = (List) Multi.createFrom().iterable(collection).emitOn(this::runOnContext).onItem().transformToUniAndConcatenate(topicPartition -> {
            return fetchProcessingState(topicPartition).map(processingState -> {
                return Tuple2.of(topicPartition, processingState);
            });
        }).emitOn(this::runOnContext).onItem().invoke(tuple2 -> {
            this.processingStateMap.put((TopicPartition) tuple2.getItem1(), (ProcessingState) tuple2.getItem2());
        }).collect().asList().await().atMost(Duration.ofMillis(getTimeoutInMillis()));
        Consumer<?, ?> unwrap = this.consumer.unwrap();
        for (Tuple2 tuple22 : list) {
            ProcessingState processingState = (ProcessingState) tuple22.getItem2();
            unwrap.seek((TopicPartition) tuple22.getItem1(), processingState != null ? processingState.getOffset().longValue() : 0L);
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler
    public void partitionsRevoked(Collection<TopicPartition> collection) {
        persistStateFor(collection).await().atMost(Duration.ofMillis(getTimeoutInMillis()));
    }

    private Uni<List<Void>> persistStateFor(Collection<TopicPartition> collection) {
        return Multi.createFrom().iterable(collection).emitOn(this::runOnContext).onItem().transform(topicPartition -> {
            return Tuple2.of(topicPartition, this.processingStateMap.get(topicPartition));
        }).skip().where(tuple2 -> {
            return tuple2.getItem2() == null;
        }).onItem().transformToUniAndConcatenate(tuple22 -> {
            return persistProcessingState((TopicPartition) tuple22.getItem1(), (ProcessingState) tuple22.getItem2());
        }).collect().asList();
    }

    protected abstract Uni<ProcessingState<?>> fetchProcessingState(TopicPartition topicPartition);

    protected abstract Uni<Void> persistProcessingState(TopicPartition topicPartition, ProcessingState<?> processingState);
}
