package dev.langchain4j.reactor;

import dev.langchain4j.service.TokenStream;
import dev.langchain4j.spi.services.TokenStreamAdapter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Objects;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:dev/langchain4j/reactor/TokenStreamToFluxAdapter.class */
public class TokenStreamToFluxAdapter implements TokenStreamAdapter {
    public boolean canAdaptTokenStreamTo(Type type) {
        if (!(type instanceof ParameterizedType)) {
            return false;
        }
        ParameterizedType parameterizedType = (ParameterizedType) type;
        if (parameterizedType.getRawType() != Flux.class) {
            return false;
        }
        Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
        return actualTypeArguments.length == 1 && actualTypeArguments[0] == String.class;
    }

    public Object adapt(TokenStream tokenStream) {
        Sinks.Many onBackpressureBuffer = Sinks.many().unicast().onBackpressureBuffer();
        Objects.requireNonNull(onBackpressureBuffer);
        TokenStream onComplete = tokenStream.onNext((v1) -> {
            r1.tryEmitNext(v1);
        }).onComplete(response -> {
            onBackpressureBuffer.tryEmitComplete();
        });
        Objects.requireNonNull(onBackpressureBuffer);
        onComplete.onError(onBackpressureBuffer::tryEmitError).start();
        return onBackpressureBuffer.asFlux();
    }
}
