package com.couchbase.client.core.io.netty.chunk;

import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.core.deps.io.netty.buffer.ByteBufUtil;
import com.couchbase.client.core.deps.io.netty.channel.ChannelConfig;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.HttpResponse;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.DecodingFailureException;
import com.couchbase.client.core.json.stream.JsonStreamParser;
import com.couchbase.client.core.msg.RequestContext;
import com.couchbase.client.core.msg.chunk.ChunkHeader;
import com.couchbase.client.core.msg.chunk.ChunkRow;
import com.couchbase.client.core.msg.chunk.ChunkTrailer;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:com/couchbase/client/core/io/netty/chunk/BaseChunkResponseParser.class */
public abstract class BaseChunkResponseParser<H extends ChunkHeader, ROW extends ChunkRow, T extends ChunkTrailer> implements ChunkResponseParser<H, ROW, T> {
    private JsonStreamParser parser;
    private CouchbaseException decodingFailure;
    private boolean headerComplete;
    private ChannelConfig channelConfig;
    private Sinks.One<T> trailer;
    private Flux<ROW> rows;
    private Sinks.Many<ROW> rowSink;
    private final AtomicLong requested = new AtomicLong(0);
    private volatile RequestContext requestContext;
    private volatile HttpResponse responseHeader;

    protected abstract JsonStreamParser.Builder parserBuilder();

    @Override // com.couchbase.client.core.io.netty.chunk.ChunkResponseParser
    public final void cleanup() {
        if (this.parser != null) {
            this.parser.close();
        }
        this.parser = null;
        this.decodingFailure = null;
        this.headerComplete = false;
        doCleanup();
    }

    @Override // com.couchbase.client.core.io.netty.chunk.ChunkResponseParser
    public void updateRequestContext(RequestContext requestContext) {
        this.requestContext = requestContext;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RequestContext requestContext() {
        return this.requestContext;
    }

    @Override // com.couchbase.client.core.io.netty.chunk.ChunkResponseParser
    public void updateResponseHeader(HttpResponse httpResponse) {
        this.responseHeader = httpResponse;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HttpResponse responseHeader() {
        return this.responseHeader;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void markHeaderComplete() {
        this.headerComplete = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isHeaderComplete() {
        return this.headerComplete;
    }

    protected abstract void doCleanup();

    @Override // com.couchbase.client.core.io.netty.chunk.ChunkResponseParser
    public void feed(ByteBuf byteBuf) {
        try {
            try {
                if (this.decodingFailure != null) {
                    byteBuf.release();
                    return;
                }
                if (byteBuf.nioBufferCount() != -1) {
                    for (ByteBuffer byteBuffer : byteBuf.nioBuffers()) {
                        this.parser.feed(byteBuffer);
                    }
                } else {
                    this.parser.feed(ByteBuffer.wrap(ByteBufUtil.getBytes(byteBuf)));
                }
                byteBuf.release();
            } catch (Exception e) {
                this.decodingFailure = new DecodingFailureException(e);
                failRows(this.decodingFailure);
                failTrailer(this.decodingFailure);
                byteBuf.release();
            }
        } catch (Throwable th) {
            byteBuf.release();
            throw th;
        }
    }

    @Override // com.couchbase.client.core.io.netty.chunk.ChunkResponseParser
    public void initialize(ChannelConfig channelConfig) {
        cleanup();
        this.parser = parserBuilder().build();
        this.channelConfig = channelConfig;
        this.trailer = Sinks.one();
        this.requested.set(0L);
        this.rowSink = Sinks.many().unicast().onBackpressureBuffer();
        this.rows = this.rowSink.asFlux().doOnRequest(j -> {
            this.requested.addAndGet(j);
            if (channelConfig.isAutoRead()) {
                return;
            }
            channelConfig.setAutoRead(true);
        }).doOnTerminate(() -> {
            channelConfig.setAutoRead(true);
        }).doOnCancel(() -> {
            channelConfig.setAutoRead(true);
        }).publish().refCount();
    }

    @Override // com.couchbase.client.core.io.netty.chunk.ChunkResponseParser
    public Flux<ROW> rows() {
        return this.rows;
    }

    @Override // com.couchbase.client.core.io.netty.chunk.ChunkResponseParser
    public Mono<T> trailer() {
        return this.trailer.asMono();
    }

    @Override // com.couchbase.client.core.io.netty.chunk.ChunkResponseParser
    public void endOfInput() {
        if (this.decodingFailure != null) {
            return;
        }
        try {
            this.parser.endOfInput();
            signalComplete();
        } catch (DecodingFailureException e) {
            this.decodingFailure = e;
            failRows(e);
            failTrailer(e);
        }
    }

    @Override // com.couchbase.client.core.io.netty.chunk.ChunkResponseParser
    public Optional<CouchbaseException> decodingFailure() {
        return Optional.ofNullable(this.decodingFailure);
    }

    protected abstract void signalComplete();

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitRow(ROW row) {
        this.rowSink.emitNext(row, Reactor.emitFailureHandler());
        this.requested.decrementAndGet();
        if (this.requested.get() > 0 || !this.channelConfig.isAutoRead() || this.rowSink.currentSubscriberCount() <= 0) {
            return;
        }
        this.channelConfig.setAutoRead(false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void failRows(Throwable th) {
        this.rowSink.emitError(th, Reactor.emitFailureHandler());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void completeRows() {
        this.rowSink.emitComplete(Reactor.emitFailureHandler());
    }

    private void failTrailer(Throwable th) {
        this.trailer.emitError(th, Reactor.emitFailureHandler());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void completeTrailer(T t) {
        this.trailer.emitValue(t, Reactor.emitFailureHandler());
    }
}
