/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.Range;
import io.lettuce.core.XAddArgs;
import io.lettuce.core.XClaimArgs;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XPendingArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.api.async.RedisStreamAsyncCommands;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.NullUnmarked;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisStreamCommands;
import org.springframework.data.redis.connection.lettuce.LettuceConnection;
import org.springframework.data.redis.connection.lettuce.LettuceConverters;
import org.springframework.data.redis.connection.lettuce.RangeConverter;
import org.springframework.data.redis.connection.lettuce.StreamConverters;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.util.Assert;

@NullUnmarked
class LettuceStreamCommands
implements RedisStreamCommands {
    private final LettuceConnection connection;

    LettuceStreamCommands(@NonNull LettuceConnection connection) {
        this.connection = connection;
    }

    @Override
    public Long xAck(byte @NonNull [] key, @NonNull String group, RecordId ... recordIds) {
        Assert.notNull((Object)key, (String)"Key must not be null");
        Assert.hasText((String)group, (String)"Group name must not be null or empty");
        Assert.notNull((Object)recordIds, (String)"recordIds must not be null");
        String[] ids = LettuceStreamCommands.entryIdsToString(recordIds);
        return (Long)this.connection.invoke().just(RedisStreamAsyncCommands::xack, key, LettuceConverters.toBytes(group), ids);
    }

    @Override
    public RecordId xAdd(@NonNull MapRecord<byte @NonNull [], byte @NonNull [], byte @NonNull []> record, @NonNull RedisStreamCommands.XAddOptions options) {
        Assert.notNull(record.getStream(), (String)"Stream must not be null");
        Assert.notNull(record, (String)"Record must not be null");
        XAddArgs args = new XAddArgs();
        args.id(record.getId().getValue());
        if (options.hasMaxlen()) {
            args.maxlen(options.getMaxlen().longValue());
        }
        if (options.hasMinId()) {
            args.minId(options.getMinId().toString());
        }
        args.nomkstream(options.isNoMkStream());
        args.approximateTrimming(options.isApproximateTrimming());
        return (RecordId)this.connection.invoke().from(RedisStreamAsyncCommands::xadd, (byte[])record.getStream(), args, (Map)record.getValue()).get(RecordId::of);
    }

    @Override
    public List<@NonNull RecordId> xClaimJustId(byte @NonNull [] key, @NonNull String group, @NonNull String newOwner, @NonNull RedisStreamCommands.XClaimOptions options) {
        String[] ids = options.getIdsAsStringArray();
        io.lettuce.core.Consumer from = io.lettuce.core.Consumer.from((Object)LettuceConverters.toBytes(group), (Object)LettuceConverters.toBytes(newOwner));
        XClaimArgs args = StreamConverters.toXClaimArgs(options).justid();
        return this.connection.invoke().fromMany(RedisStreamAsyncCommands::xclaim, key, from, args, ids).toList(it -> RecordId.of(it.getId()));
    }

    @Override
    public List<@NonNull ByteRecord> xClaim(byte @NonNull [] key, @NonNull String group, @NonNull String newOwner, @NonNull RedisStreamCommands.XClaimOptions options) {
        String[] ids = options.getIdsAsStringArray();
        io.lettuce.core.Consumer from = io.lettuce.core.Consumer.from((Object)LettuceConverters.toBytes(group), (Object)LettuceConverters.toBytes(newOwner));
        XClaimArgs args = StreamConverters.toXClaimArgs(options);
        return this.connection.invoke().fromMany(RedisStreamAsyncCommands::xclaim, key, from, args, ids).toList(StreamConverters.byteRecordConverter());
    }

    @Override
    public Long xDel(byte @NonNull [] key, RecordId ... recordIds) {
        Assert.notNull((Object)key, (String)"Key must not be null");
        Assert.notNull((Object)recordIds, (String)"recordIds must not be null");
        return (Long)this.connection.invoke().just(RedisStreamAsyncCommands::xdel, key, LettuceStreamCommands.entryIdsToString(recordIds));
    }

    @Override
    public String xGroupCreate(byte @NonNull [] key, @NonNull String groupName, @NonNull ReadOffset readOffset) {
        return this.xGroupCreate(key, groupName, readOffset, false);
    }

    @Override
    public String xGroupCreate(byte @NonNull [] key, @NonNull String groupName, @NonNull ReadOffset readOffset, boolean mkSteam) {
        Assert.notNull((Object)key, (String)"Key must not be null");
        Assert.hasText((String)groupName, (String)"Group name must not be null or empty");
        Assert.notNull((Object)readOffset, (String)"ReadOffset must not be null");
        XReadArgs.StreamOffset streamOffset = XReadArgs.StreamOffset.from((Object)key, (String)readOffset.getOffset());
        return (String)this.connection.invoke().just(RedisStreamAsyncCommands::xgroupCreate, streamOffset, LettuceConverters.toBytes(groupName), XGroupCreateArgs.Builder.mkstream((boolean)mkSteam));
    }

    @Override
    public Boolean xGroupDelConsumer(byte @NonNull [] key, @NonNull Consumer consumer) {
        Assert.notNull((Object)key, (String)"Key must not be null");
        Assert.notNull((Object)consumer, (String)"Consumer must not be null");
        io.lettuce.core.Consumer<byte[]> lettuceConsumer = LettuceStreamCommands.toConsumer(consumer);
        return (Boolean)this.connection.invoke().from(RedisStreamAsyncCommands::xgroupDelconsumer, key, lettuceConsumer).get(Objects::nonNull);
    }

    @Override
    public Boolean xGroupDestroy(byte @NonNull [] key, @NonNull String groupName) {
        Assert.notNull((Object)key, (String)"Key must not be null");
        Assert.hasText((String)groupName, (String)"Group name must not be null or empty");
        return (Boolean)this.connection.invoke().just(RedisStreamAsyncCommands::xgroupDestroy, key, LettuceConverters.toBytes(groupName));
    }

    @Override
    public StreamInfo.XInfoStream xInfo(byte @NonNull [] key) {
        Assert.notNull((Object)key, (String)"Key must not be null");
        return (StreamInfo.XInfoStream)this.connection.invoke().from(RedisStreamAsyncCommands::xinfoStream, key).get(StreamInfo.XInfoStream::fromList);
    }

    @Override
    public StreamInfo.XInfoGroups xInfoGroups(byte @NonNull [] key) {
        Assert.notNull((Object)key, (String)"Key must not be null");
        return (StreamInfo.XInfoGroups)this.connection.invoke().from(RedisStreamAsyncCommands::xinfoGroups, key).get(StreamInfo.XInfoGroups::fromList);
    }

    @Override
    public StreamInfo.XInfoConsumers xInfoConsumers(byte @NonNull [] key, @NonNull String groupName) {
        Assert.notNull((Object)key, (String)"Key must not be null");
        Assert.notNull((Object)groupName, (String)"GroupName must not be null");
        return (StreamInfo.XInfoConsumers)this.connection.invoke().from(RedisStreamAsyncCommands::xinfoConsumers, key, LettuceConverters.toBytes(groupName)).get(it -> StreamInfo.XInfoConsumers.fromList(groupName, it));
    }

    @Override
    public Long xLen(byte @NonNull [] key) {
        Assert.notNull((Object)key, (String)"Key must not be null");
        return (Long)this.connection.invoke().just(RedisStreamAsyncCommands::xlen, key);
    }

    @Override
    public PendingMessagesSummary xPending(byte @NonNull [] key, @NonNull String groupName) {
        byte[] group = LettuceConverters.toBytes(groupName);
        return (PendingMessagesSummary)this.connection.invoke().from(RedisStreamAsyncCommands::xpending, key, group).get(it -> StreamConverters.toPendingMessagesInfo(groupName, it));
    }

    @Override
    public PendingMessages xPending(byte @NonNull [] key, @NonNull String groupName, @NonNull RedisStreamCommands.XPendingOptions options) {
        byte[] group = LettuceConverters.toBytes(groupName);
        Range<String> range = RangeConverter.toRangeWithDefault(options.getRange(), "-", "+", Function.identity());
        io.lettuce.core.Limit limit = options.isLimited() ? io.lettuce.core.Limit.from((long)options.getCount()) : io.lettuce.core.Limit.unlimited();
        XPendingArgs xPendingArgs = XPendingArgs.Builder.xpending((Object)group, range, (io.lettuce.core.Limit)limit);
        if (options.hasConsumer()) {
            io.lettuce.core.Consumer consumer = io.lettuce.core.Consumer.from((Object)group, (Object)LettuceConverters.toBytes(options.getConsumerName()));
            xPendingArgs.consumer(consumer);
        }
        if (options.hasMinIdleTime()) {
            xPendingArgs.idle(options.getMinIdleTime());
        }
        return (PendingMessages)this.connection.invoke().from(RedisStreamAsyncCommands::xpending, key, xPendingArgs).get(it -> StreamConverters.toPendingMessages(groupName, options.getRange(), it));
    }

    @Override
    public List<@NonNull ByteRecord> xRange(byte @NonNull [] key, @NonNull org.springframework.data.domain.Range<String> range, @NonNull Limit limit) {
        Assert.notNull((Object)key, (String)"Key must not be null");
        Assert.notNull(range, (String)"Range must not be null");
        Assert.notNull((Object)limit, (String)"Limit must not be null");
        Range lettuceRange = RangeConverter.toRange(range, Function.identity());
        io.lettuce.core.Limit lettuceLimit = LettuceConverters.toLimit(limit);
        return this.connection.invoke().fromMany(RedisStreamAsyncCommands::xrange, key, lettuceRange, lettuceLimit).toList(StreamConverters.byteRecordConverter());
    }

    @Override
    public List<@NonNull ByteRecord> xRead(@NonNull StreamReadOptions readOptions, StreamOffset<byte[]> ... streams) {
        Assert.notNull((Object)readOptions, (String)"StreamReadOptions must not be null");
        Assert.notNull(streams, (String)"StreamOffsets must not be null");
        XReadArgs.StreamOffset<byte[]>[] streamOffsets = LettuceStreamCommands.toStreamOffsets(streams);
        XReadArgs args = StreamConverters.toReadArgs(readOptions);
        if (readOptions.isBlocking()) {
            return this.connection.invoke(this.getAsyncDedicatedConnection()).fromMany(RedisStreamAsyncCommands::xread, args, streamOffsets).toList(StreamConverters.byteRecordConverter());
        }
        return this.connection.invoke().fromMany(RedisStreamAsyncCommands::xread, args, streamOffsets).toList(StreamConverters.byteRecordConverter());
    }

    @Override
    public List<@NonNull ByteRecord> xReadGroup(@NonNull Consumer consumer, @NonNull StreamReadOptions readOptions, StreamOffset<byte[]> ... streams) {
        Assert.notNull((Object)consumer, (String)"Consumer must not be null");
        Assert.notNull((Object)readOptions, (String)"StreamReadOptions must not be null");
        Assert.notNull(streams, (String)"StreamOffsets must not be null");
        XReadArgs.StreamOffset<byte[]>[] streamOffsets = LettuceStreamCommands.toStreamOffsets(streams);
        XReadArgs args = StreamConverters.toReadArgs(readOptions);
        io.lettuce.core.Consumer<byte[]> lettuceConsumer = LettuceStreamCommands.toConsumer(consumer);
        if (readOptions.isBlocking()) {
            return this.connection.invoke(this.getAsyncDedicatedConnection()).fromMany(RedisStreamAsyncCommands::xreadgroup, lettuceConsumer, args, streamOffsets).toList(StreamConverters.byteRecordConverter());
        }
        return this.connection.invoke().fromMany(RedisStreamAsyncCommands::xreadgroup, lettuceConsumer, args, streamOffsets).toList(StreamConverters.byteRecordConverter());
    }

    @Override
    public List<@NonNull ByteRecord> xRevRange(byte @NonNull [] key, @NonNull org.springframework.data.domain.Range<String> range, @NonNull Limit limit) {
        Assert.notNull((Object)key, (String)"Key must not be null");
        Assert.notNull(range, (String)"Range must not be null");
        Assert.notNull((Object)limit, (String)"Limit must not be null");
        Range lettuceRange = RangeConverter.toRange(range, Function.identity());
        io.lettuce.core.Limit lettuceLimit = LettuceConverters.toLimit(limit);
        return this.connection.invoke().fromMany(RedisStreamAsyncCommands::xrevrange, key, lettuceRange, lettuceLimit).toList(StreamConverters.byteRecordConverter());
    }

    @Override
    public Long xTrim(byte @NonNull [] key, long count) {
        return this.xTrim(key, count, false);
    }

    @Override
    public Long xTrim(byte @NonNull [] key, long count, boolean approximateTrimming) {
        Assert.notNull((Object)key, (String)"Key must not be null");
        return (Long)this.connection.invoke().just(RedisStreamAsyncCommands::xtrim, key, approximateTrimming, count);
    }

    RedisClusterAsyncCommands<byte[], byte[]> getAsyncDedicatedConnection() {
        return this.connection.getAsyncDedicatedConnection();
    }

    private static XReadArgs.StreamOffset<byte[]>[] toStreamOffsets(StreamOffset<byte[]>[] streams) {
        return (XReadArgs.StreamOffset[])Arrays.stream(streams).map(it -> XReadArgs.StreamOffset.from((Object)((byte[])it.getKey()), (String)it.getOffset().getOffset())).toArray(XReadArgs.StreamOffset[]::new);
    }

    private static io.lettuce.core.Consumer<byte[]> toConsumer(Consumer consumer) {
        return io.lettuce.core.Consumer.from((Object)LettuceConverters.toBytes(consumer.getGroup()), (Object)LettuceConverters.toBytes(consumer.getName()));
    }

    private static String[] entryIdsToString(RecordId[] recordIds) {
        if (recordIds.length == 1) {
            return new String[]{recordIds[0].getValue()};
        }
        return (String[])Arrays.stream(recordIds).map(RecordId::getValue).toArray(String[]::new);
    }
}

