package com.couchbase.client.core.endpoint.dcp;

import com.couchbase.client.core.ResponseEvent;
import com.couchbase.client.core.endpoint.AbstractEndpoint;
import com.couchbase.client.core.endpoint.AbstractGenericHandler;
import com.couchbase.client.core.endpoint.ResponseStatusConverter;
import com.couchbase.client.core.endpoint.kv.KeyValueStatus;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.AbstractCouchbaseRequest;
import com.couchbase.client.core.message.CouchbaseResponse;
import com.couchbase.client.core.message.dcp.AbstractDCPRequest;
import com.couchbase.client.core.message.dcp.AbstractDCPResponse;
import com.couchbase.client.core.message.dcp.ControlParameter;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.FailoverLogEntry;
import com.couchbase.client.core.message.dcp.MutationMessage;
import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
import com.couchbase.client.core.message.dcp.RemoveMessage;
import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
import com.couchbase.client.core.message.dcp.StreamEndMessage;
import com.couchbase.client.core.message.dcp.StreamRequestRequest;
import com.couchbase.client.core.message.dcp.StreamRequestResponse;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.deps.com.lmax.disruptor.EventSink;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.DefaultFullBinaryMemcacheRequest;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.FullBinaryMemcacheRequest;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.FullBinaryMemcacheResponse;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import rx.functions.Action1;

/* loaded from: input_file:com/couchbase/client/core/endpoint/dcp/DCPHandler.class */
public class DCPHandler extends AbstractGenericHandler<FullBinaryMemcacheResponse, BinaryMemcacheRequest, DCPRequest> {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) DCPHandler.class);
    public static final byte OP_OPEN_CONNECTION = 80;
    public static final byte OP_STREAM_REQUEST = 83;
    public static final byte OP_STREAM_END = 85;
    public static final byte OP_SNAPSHOT_MARKER = 86;
    public static final byte OP_MUTATION = 87;
    public static final byte OP_REMOVE = 88;
    public static final byte OP_CONTROL = 94;
    public static final byte OP_BUFFER_ACK = 93;
    private final Map<String, DCPConnection> connections;

    public DCPHandler(AbstractEndpoint abstractEndpoint, EventSink<ResponseEvent> eventSink, boolean z) {
        this(abstractEndpoint, eventSink, new ArrayDeque(), z);
    }

    public DCPHandler(AbstractEndpoint abstractEndpoint, EventSink<ResponseEvent> eventSink, Queue<DCPRequest> queue, boolean z) {
        super(abstractEndpoint, eventSink, queue, z);
        this.connections = new ConcurrentHashMap();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.couchbase.client.core.endpoint.AbstractGenericHandler
    public BinaryMemcacheRequest encodeRequest(ChannelHandlerContext channelHandlerContext, DCPRequest dCPRequest) throws Exception {
        BinaryMemcacheRequest handleStreamRequestRequest;
        if (dCPRequest instanceof OpenConnectionRequest) {
            OpenConnectionRequest openConnectionRequest = (OpenConnectionRequest) dCPRequest;
            handleStreamRequestRequest = handleOpenConnectionRequest(channelHandlerContext, openConnectionRequest);
            DCPConnection dCPConnection = new DCPConnection(env(), openConnectionRequest.connectionName(), openConnectionRequest.bucket());
            this.connections.put(dCPConnection.name(), dCPConnection);
        } else {
            if (!(dCPRequest instanceof StreamRequestRequest)) {
                throw new IllegalArgumentException("Unknown incoming DCPRequest type " + dCPRequest.getClass());
            }
            handleStreamRequestRequest = handleStreamRequestRequest(channelHandlerContext, (StreamRequestRequest) dCPRequest);
        }
        if (dCPRequest.partition() >= 0) {
            handleStreamRequestRequest.setReserved(dCPRequest.partition());
        }
        return handleStreamRequestRequest;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.couchbase.client.core.endpoint.AbstractGenericHandler
    public CouchbaseResponse decodeResponse(ChannelHandlerContext channelHandlerContext, FullBinaryMemcacheResponse fullBinaryMemcacheResponse) throws Exception {
        DCPRequest currentRequest = currentRequest();
        AbstractDCPResponse abstractDCPResponse = null;
        if (fullBinaryMemcacheResponse.getOpcode() == 80 && (currentRequest instanceof OpenConnectionRequest)) {
            abstractDCPResponse = new OpenConnectionResponse(ResponseStatusConverter.fromBinary(fullBinaryMemcacheResponse.getStatus()), this.connections.get(((OpenConnectionRequest) currentRequest).connectionName()), currentRequest);
            if (env().dcpConnectionBufferSize() > 0) {
                channelHandlerContext.writeAndFlush(controlRequest(channelHandlerContext, ControlParameter.CONNECTION_BUFFER_SIZE, env().dcpConnectionBufferSize()));
            }
        } else if (fullBinaryMemcacheResponse.getOpcode() == 83 && (currentRequest instanceof StreamRequestRequest)) {
            ByteBuf content = fullBinaryMemcacheResponse.content();
            ArrayList arrayList = null;
            long j = 0;
            KeyValueStatus valueOf = KeyValueStatus.valueOf(fullBinaryMemcacheResponse.getStatus());
            switch (valueOf) {
                case SUCCESS:
                    arrayList = new ArrayList(content.readableBytes() / 16);
                    while (content.readableBytes() >= 16) {
                        arrayList.add(new FailoverLogEntry(content.readLong(), content.readLong()));
                    }
                    break;
                case ERR_ROLLBACK:
                    j = content.readLong();
                    break;
                default:
                    LOGGER.warn("Unexpected status of StreamRequestResponse: {} (0x{}, {})", valueOf, Integer.toHexString(valueOf.code()), valueOf.description());
                    break;
            }
            abstractDCPResponse = new StreamRequestResponse(ResponseStatusConverter.fromBinary(fullBinaryMemcacheResponse.getStatus()), arrayList, j, currentRequest, this.connections.get(DCPConnection.connectionName(fullBinaryMemcacheResponse.getOpaque())));
        } else if (fullBinaryMemcacheResponse.getOpcode() == 94 || fullBinaryMemcacheResponse.getOpcode() == 93) {
            KeyValueStatus valueOf2 = KeyValueStatus.valueOf(fullBinaryMemcacheResponse.getStatus());
            if (valueOf2 != KeyValueStatus.SUCCESS) {
                LOGGER.warn("Unexpected status of service response (opcode={}): {} (0x{}, {})", Integer.toHexString(fullBinaryMemcacheResponse.getOpcode()), valueOf2, Integer.toHexString(valueOf2.code()), valueOf2.description());
            }
        } else {
            DCPRequest currentRequest2 = currentRequest();
            final DCPConnection dCPConnection = this.connections.get(DCPConnection.connectionName(fullBinaryMemcacheResponse.getOpaque()));
            AbstractDCPRequest abstractDCPRequest = new AbstractDCPRequest(dCPConnection.bucket(), null) { // from class: com.couchbase.client.core.endpoint.dcp.DCPHandler.1
            };
            abstractDCPRequest.observable().subscribe(new Action1<CouchbaseResponse>() { // from class: com.couchbase.client.core.endpoint.dcp.DCPHandler.2
                public void call(CouchbaseResponse couchbaseResponse) {
                }
            }, new Action1<Throwable>() { // from class: com.couchbase.client.core.endpoint.dcp.DCPHandler.3
                public void call(Throwable th) {
                    dCPConnection.subject().onError(th);
                }
            });
            try {
                currentRequest(abstractDCPRequest);
                handleDCPRequest(channelHandlerContext, dCPConnection, fullBinaryMemcacheResponse);
                currentRequest(currentRequest2);
            } catch (Throwable th) {
                currentRequest(currentRequest2);
                throw th;
            }
        }
        if (currentRequest != null && currentRequest.partition() >= 0 && abstractDCPResponse != null) {
            abstractDCPResponse.partition(currentRequest.partition());
        }
        if (abstractDCPResponse != null || currentRequest == null) {
            finishedDecoding();
        }
        return abstractDCPResponse;
    }

    private void handleDCPRequest(ChannelHandlerContext channelHandlerContext, DCPConnection dCPConnection, FullBinaryMemcacheResponse fullBinaryMemcacheResponse) {
        AbstractCouchbaseRequest abstractCouchbaseRequest = null;
        int i = 0;
        switch (fullBinaryMemcacheResponse.getOpcode()) {
            case OP_STREAM_END /* 85 */:
                ByteBuf extras = fullBinaryMemcacheResponse.getExtras();
                ByteBuf buffer = channelHandlerContext.alloc().buffer(fullBinaryMemcacheResponse.getExtrasLength());
                buffer.writeBytes(extras, extras.readerIndex(), extras.readableBytes());
                int readInt = buffer.readInt();
                buffer.release();
                abstractCouchbaseRequest = new StreamEndMessage(StreamEndMessage.Reason.valueOf(readInt), dCPConnection.bucket());
                dCPConnection.removeStream(fullBinaryMemcacheResponse.getOpaque());
                break;
            case OP_SNAPSHOT_MARKER /* 86 */:
                long j = 0;
                long j2 = 0;
                if (fullBinaryMemcacheResponse.getExtrasLength() > 0) {
                    ByteBuf extras2 = fullBinaryMemcacheResponse.getExtras();
                    ByteBuf buffer2 = channelHandlerContext.alloc().buffer(fullBinaryMemcacheResponse.getExtrasLength());
                    buffer2.writeBytes(extras2, extras2.readerIndex(), extras2.readableBytes());
                    j = buffer2.readLong();
                    j2 = buffer2.readLong();
                    i = buffer2.readInt();
                    buffer2.release();
                }
                abstractCouchbaseRequest = new SnapshotMarkerMessage(fullBinaryMemcacheResponse.getStatus(), j, j2, i, dCPConnection.bucket());
                break;
            case OP_MUTATION /* 87 */:
                int i2 = 0;
                int i3 = 0;
                if (fullBinaryMemcacheResponse.getExtrasLength() > 0) {
                    ByteBuf extras3 = fullBinaryMemcacheResponse.getExtras();
                    ByteBuf buffer3 = channelHandlerContext.alloc().buffer(fullBinaryMemcacheResponse.getExtrasLength());
                    buffer3.writeBytes(extras3, extras3.readerIndex(), extras3.readableBytes());
                    buffer3.skipBytes(16);
                    i = buffer3.readInt();
                    i2 = buffer3.readInt();
                    i3 = buffer3.readInt();
                    buffer3.release();
                }
                abstractCouchbaseRequest = new MutationMessage(fullBinaryMemcacheResponse.getStatus(), fullBinaryMemcacheResponse.getKey(), fullBinaryMemcacheResponse.content().retain(), i2, i, i3, fullBinaryMemcacheResponse.getCAS(), dCPConnection.bucket());
                break;
            case OP_REMOVE /* 88 */:
                abstractCouchbaseRequest = new RemoveMessage(fullBinaryMemcacheResponse.getStatus(), fullBinaryMemcacheResponse.getKey(), fullBinaryMemcacheResponse.getCAS(), dCPConnection.bucket());
                break;
            default:
                LOGGER.info("Unhandled DCP message: {}, {}", Byte.valueOf(fullBinaryMemcacheResponse.getOpcode()), fullBinaryMemcacheResponse);
                break;
        }
        if (abstractCouchbaseRequest != null) {
            dCPConnection.subject().onNext(abstractCouchbaseRequest);
        }
        updateConnectionStats(channelHandlerContext, dCPConnection, fullBinaryMemcacheResponse);
        if (dCPConnection.streamsCount() == 0) {
            dCPConnection.subject().onCompleted();
            this.connections.remove(dCPConnection.name());
        }
    }

    private void updateConnectionStats(ChannelHandlerContext channelHandlerContext, DCPConnection dCPConnection, FullBinaryMemcacheResponse fullBinaryMemcacheResponse) {
        dCPConnection.inc(fullBinaryMemcacheResponse.getTotalBodyLength());
        if (dCPConnection.totalReceivedBytes() >= env().dcpConnectionBufferSize() * env().dcpConnectionBufferAckThreshold()) {
            channelHandlerContext.writeAndFlush(bufferAckRequest(channelHandlerContext, dCPConnection.totalReceivedBytes()));
            dCPConnection.reset();
        }
    }

    private BinaryMemcacheRequest bufferAckRequest(ChannelHandlerContext channelHandlerContext, int i) {
        ByteBuf writeInt = channelHandlerContext.alloc().buffer(4).writeInt(i);
        DefaultBinaryMemcacheRequest defaultBinaryMemcacheRequest = new DefaultBinaryMemcacheRequest("", writeInt);
        defaultBinaryMemcacheRequest.setOpcode((byte) 93);
        defaultBinaryMemcacheRequest.setExtrasLength((byte) writeInt.readableBytes());
        defaultBinaryMemcacheRequest.setTotalBodyLength(writeInt.readableBytes());
        return defaultBinaryMemcacheRequest;
    }

    private FullBinaryMemcacheRequest controlRequest(ChannelHandlerContext channelHandlerContext, ControlParameter controlParameter, boolean z) {
        return controlRequest(channelHandlerContext, controlParameter, Boolean.toString(z));
    }

    private FullBinaryMemcacheRequest controlRequest(ChannelHandlerContext channelHandlerContext, ControlParameter controlParameter, int i) {
        return controlRequest(channelHandlerContext, controlParameter, Integer.toString(i));
    }

    private FullBinaryMemcacheRequest controlRequest(ChannelHandlerContext channelHandlerContext, ControlParameter controlParameter, String str) {
        String value = controlParameter.value();
        short length = (short) value.getBytes(CharsetUtil.UTF_8).length;
        byte[] bytes = str.getBytes(CharsetUtil.UTF_8);
        ByteBuf buffer = channelHandlerContext.alloc().buffer(bytes.length);
        buffer.writeBytes(bytes);
        DefaultFullBinaryMemcacheRequest defaultFullBinaryMemcacheRequest = new DefaultFullBinaryMemcacheRequest(value, Unpooled.EMPTY_BUFFER, buffer);
        defaultFullBinaryMemcacheRequest.setOpcode((byte) 94);
        defaultFullBinaryMemcacheRequest.setKeyLength(length);
        defaultFullBinaryMemcacheRequest.setTotalBodyLength(length + buffer.readableBytes());
        return defaultFullBinaryMemcacheRequest;
    }

    private BinaryMemcacheRequest handleOpenConnectionRequest(ChannelHandlerContext channelHandlerContext, OpenConnectionRequest openConnectionRequest) {
        ByteBuf buffer = channelHandlerContext.alloc().buffer(8);
        buffer.writeInt(openConnectionRequest.sequenceNumber()).writeInt(openConnectionRequest.type().flags());
        String connectionName = openConnectionRequest.connectionName();
        byte readableBytes = (byte) buffer.readableBytes();
        short length = (short) connectionName.getBytes(CharsetUtil.UTF_8).length;
        DefaultBinaryMemcacheRequest defaultBinaryMemcacheRequest = new DefaultBinaryMemcacheRequest(connectionName, buffer);
        defaultBinaryMemcacheRequest.setOpcode((byte) 80);
        defaultBinaryMemcacheRequest.setKeyLength(length);
        defaultBinaryMemcacheRequest.setExtrasLength(readableBytes);
        defaultBinaryMemcacheRequest.setTotalBodyLength(length + readableBytes);
        return defaultBinaryMemcacheRequest;
    }

    private BinaryMemcacheRequest handleStreamRequestRequest(ChannelHandlerContext channelHandlerContext, StreamRequestRequest streamRequestRequest) {
        DCPConnection dCPConnection = this.connections.get(streamRequestRequest.connectionName());
        ByteBuf buffer = channelHandlerContext.alloc().buffer(48);
        buffer.writeInt(0).writeInt(0).writeLong(streamRequestRequest.startSequenceNumber()).writeLong(streamRequestRequest.endSequenceNumber()).writeLong(streamRequestRequest.vbucketUUID()).writeLong(streamRequestRequest.snapshotStartSequenceNumber()).writeLong(streamRequestRequest.snapshotEndSequenceNumber());
        byte readableBytes = (byte) buffer.readableBytes();
        DefaultBinaryMemcacheRequest defaultBinaryMemcacheRequest = new DefaultBinaryMemcacheRequest(buffer);
        defaultBinaryMemcacheRequest.setOpcode((byte) 83);
        defaultBinaryMemcacheRequest.setExtrasLength(readableBytes);
        defaultBinaryMemcacheRequest.setTotalBodyLength(readableBytes);
        defaultBinaryMemcacheRequest.setOpaque(dCPConnection.addStream(dCPConnection.name()));
        return defaultBinaryMemcacheRequest;
    }

    @Override // com.couchbase.client.core.endpoint.AbstractGenericHandler
    protected ServiceType serviceType() {
        return ServiceType.DCP;
    }
}
