/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.persistence.mongodb.storage;

import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.kie.kogito.persistence.mongodb.model.MongoEntityMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StorageUtils {
    private static final Logger LOGGER = LoggerFactory.getLogger(StorageUtils.class);

    private StorageUtils() {
    }

    public static <V, E> Multi<V> watchCollectionEntries(MongoCollection<E> collection, Bson operationType, MongoEntityMapper<V, E> mapper) {
        return StorageUtils.createMulti(collection, operationType, csd -> {
            Object document = csd.getFullDocument();
            return document == null ? null : mapper.mapToModel(document);
        });
    }

    public static <E> Multi<String> watchCollectionKeys(MongoCollection<E> collection, Bson operationType) {
        return StorageUtils.createMulti(collection, operationType, csd -> {
            BsonDocument keyDocument = csd.getDocumentKey();
            return keyDocument == null ? null : keyDocument.getString((Object)"_id").getValue();
        });
    }

    private static <T, E> Multi<T> createMulti(MongoCollection<E> collection, Bson operationType, Function<ChangeStreamDocument<E>, T> mapper) {
        ChangeStreamIterable changeStream = collection.watch(Collections.singletonList(Aggregates.match((Bson)operationType))).fullDocument(FullDocument.UPDATE_LOOKUP);
        MongoChangeStreamCursor cursor = changeStream.cursor();
        return Multi.createFrom().emitter(emitter -> {
            try {
                while (cursor.hasNext()) {
                    emitter.emit(cursor.next());
                }
            }
            catch (IllegalStateException ex) {
                LOGGER.warn("MongoDB cursor exception: " + ex.getMessage());
            }
        }).runSubscriptionOn((Executor)Infrastructure.getDefaultWorkerPool()).onTermination().invoke(() -> cursor.close()).onFailure().recoverWithCompletion().onItem().transform(mapper);
    }
}

