/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.redis.runtime.datasource;

import io.quarkus.redis.datasource.keys.ReactiveKeyScanCursor;
import io.quarkus.redis.runtime.datasource.AbstractRedisCommands;
import io.quarkus.redis.runtime.datasource.Marshaller;
import io.quarkus.redis.runtime.datasource.RedisCommand;
import io.quarkus.redis.runtime.datasource.RedisCommandExecutor;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.redis.client.Command;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

public class ScanReactiveCursorImpl<K>
extends AbstractRedisCommands
implements ReactiveKeyScanCursor<K> {
    private final Type typeOfKey;
    private long cursor = 0L;
    private boolean initial = true;
    private final List<String> extra = new ArrayList<String>();

    public ScanReactiveCursorImpl(RedisCommandExecutor redis, Marshaller marshaller, Type typeOfKey, List<String> extra) {
        super(redis, marshaller);
        this.typeOfKey = typeOfKey;
        this.extra.addAll(extra);
    }

    @Override
    public long cursorId() {
        return this.cursor;
    }

    @Override
    public boolean hasNext() {
        return this.initial || this.cursor != 0L;
    }

    @Override
    public Uni<Set<K>> next() {
        this.initial = false;
        return this.execute(RedisCommand.of(Command.SCAN).put(Long.toUnsignedString(this.cursor)).putAll(this.extra)).invoke(response -> {
            this.cursor = Long.parseUnsignedLong(response.get(0).toString());
        }).map(response -> this.marshaller.decodeAsSet(response.get(1), this.typeOfKey));
    }

    @Override
    public Multi<K> toMulti() {
        return Multi.createBy().repeating().uni(this::next).whilst(m -> this.hasNext()).onItem().transformToMultiAndConcatenate(set -> Multi.createFrom().items(set.stream()));
    }
}

