package io.smallrye.reactive.messaging.kafka.commit;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.vertx.core.json.Json;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.buffer.Buffer;
import jakarta.enterprise.context.ApplicationScoped;
import java.io.File;
import java.nio.file.FileAlreadyExistsException;
import java.util.Optional;
import java.util.function.BiConsumer;
import org.apache.kafka.common.TopicPartition;

@Experimental("Experimental API")
/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaFileCheckpointCommit.class */
public class KafkaFileCheckpointCommit extends KafkaCheckpointCommit {
    public static final String FILE_CHECKPOINT_NAME = "checkpoint-file";
    private final Vertx mutinyVertx;
    private final File stateDir;

    @ApplicationScoped
    @Identifier(KafkaFileCheckpointCommit.FILE_CHECKPOINT_NAME)
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/KafkaFileCheckpointCommit$Factory.class */
    public static class Factory implements KafkaCommitHandler.Factory {
        @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler.Factory
        public KafkaCommitHandler create(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Vertx vertx, KafkaConsumer<?, ?> kafkaConsumer, BiConsumer<Throwable, Boolean> biConsumer) {
            return new KafkaFileCheckpointCommit(kafkaConnectorIncomingConfiguration, vertx, kafkaConsumer, biConsumer, ((Integer) kafkaConnectorIncomingConfiguration.config().getOptionalValue("default.api.timeout.ms", Integer.class).orElse(60000)).intValue(), new File((String) kafkaConnectorIncomingConfiguration.config().getValue("checkpoint-file.stateDir", String.class)));
        }
    }

    public KafkaFileCheckpointCommit(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Vertx vertx, KafkaConsumer<?, ?> kafkaConsumer, BiConsumer<Throwable, Boolean> biConsumer, int i, File file) {
        super(vertx, kafkaConnectorIncomingConfiguration, kafkaConsumer, biConsumer, i);
        this.mutinyVertx = vertx;
        this.stateDir = file;
    }

    private String getStatePath(TopicPartition topicPartition) {
        return this.stateDir.toPath().resolve(topicPartition.topic() + "-" + topicPartition.partition()).toString();
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCheckpointCommit
    protected Uni<ProcessingState<?>> fetchProcessingState(TopicPartition topicPartition) {
        String statePath = getStatePath(topicPartition);
        return this.mutinyVertx.fileSystem().exists(statePath).chain(bool -> {
            return bool.booleanValue() ? this.mutinyVertx.fileSystem().readFile(statePath).map(this::deserializeState).onFailure().invoke(th -> {
                this.log.errorf(th, "Error fetching processing state for partition %s", topicPartition);
            }).onItem().invoke(processingState -> {
                this.log.debugf("Fetched state for partition %s : %s", topicPartition, processingState);
            }) : Uni.createFrom().item(() -> {
                return null;
            });
        });
    }

    private <T> ProcessingState<T> deserializeState(Buffer buffer) {
        return (ProcessingState) Json.decodeValue(buffer.getDelegate(), ProcessingState.class);
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.KafkaCheckpointCommit
    protected Uni<Void> persistProcessingState(TopicPartition topicPartition, ProcessingState<?> processingState) {
        String statePath = getStatePath(topicPartition);
        return processingState != null ? this.mutinyVertx.fileSystem().exists(statePath).chain(bool -> {
            return bool.booleanValue() ? fetchProcessingState(topicPartition).onFailure().recoverWithNull() : this.mutinyVertx.fileSystem().createFile(statePath).onItem().transform(r2 -> {
                return (ProcessingState) null;
            }).onFailure(th -> {
                return Optional.ofNullable(th.getCause()).map((v0) -> {
                    return v0.getClass();
                }).orElse(null) == FileAlreadyExistsException.class;
            }).recoverWithNull();
        }).chain(processingState2 -> {
            if (processingState2 == null || processingState2.getOffset().longValue() <= processingState.getOffset().longValue()) {
                return this.mutinyVertx.fileSystem().writeFile(statePath, serializeState(processingState));
            }
            this.log.warnf("Skipping persist operation : higher offset found on store %d > %d", processingState2.getOffset(), processingState.getOffset());
            return Uni.createFrom().voidItem();
        }).onFailure().invoke(th -> {
            this.log.errorf(th, "Error persisting processing state `%s` for partition %s", processingState, topicPartition);
        }).onItem().invoke(() -> {
            this.log.debugf("Persisted state for partition %s : %s", topicPartition, processingState);
        }) : Uni.createFrom().voidItem();
    }

    private Buffer serializeState(ProcessingState<?> processingState) {
        return Buffer.newInstance(Json.encodeToBuffer(processingState));
    }
}
