package com.couchbase.client.core.io.netty.kv;

import com.couchbase.client.core.cnc.CbTracing;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.cnc.TracingIdentifiers;
import com.couchbase.client.core.cnc.events.io.ChannelClosedProactivelyEvent;
import com.couchbase.client.core.cnc.events.io.CollectionOutdatedHandledEvent;
import com.couchbase.client.core.cnc.events.io.InvalidRequestDetectedEvent;
import com.couchbase.client.core.cnc.events.io.KeyValueErrorMapCodeHandledEvent;
import com.couchbase.client.core.cnc.events.io.NotMyVbucketReceivedEvent;
import com.couchbase.client.core.cnc.events.io.UnknownResponseReceivedEvent;
import com.couchbase.client.core.cnc.events.io.UnknownResponseStatusReceivedEvent;
import com.couchbase.client.core.cnc.events.io.UnsupportedResponseTypeReceivedEvent;
import com.couchbase.client.core.config.ConfigurationProvider;
import com.couchbase.client.core.config.MemcachedBucketConfig;
import com.couchbase.client.core.config.ProposedBucketConfigContext;
import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.core.deps.io.netty.buffer.ByteBufUtil;
import com.couchbase.client.core.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.core.deps.io.netty.channel.ChannelDuplexHandler;
import com.couchbase.client.core.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.core.deps.io.netty.channel.ChannelPromise;
import com.couchbase.client.core.deps.io.netty.util.ReferenceCountUtil;
import com.couchbase.client.core.deps.io.netty.util.collection.IntObjectHashMap;
import com.couchbase.client.core.deps.io.netty.util.collection.IntObjectMap;
import com.couchbase.client.core.endpoint.BaseEndpoint;
import com.couchbase.client.core.endpoint.EndpointContext;
import com.couchbase.client.core.env.CompressionConfig;
import com.couchbase.client.core.error.CollectionNotFoundException;
import com.couchbase.client.core.error.DecodingFailureException;
import com.couchbase.client.core.error.FeatureNotAvailableException;
import com.couchbase.client.core.error.RangeScanPartitionFailedException;
import com.couchbase.client.core.error.context.GenericRequestErrorContext;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.io.CollectionMap;
import com.couchbase.client.core.io.IoContext;
import com.couchbase.client.core.io.netty.HandlerUtils;
import com.couchbase.client.core.io.netty.TracingUtils;
import com.couchbase.client.core.io.netty.kv.ErrorMap;
import com.couchbase.client.core.msg.Request;
import com.couchbase.client.core.msg.Response;
import com.couchbase.client.core.msg.ResponseStatus;
import com.couchbase.client.core.msg.kv.KeyValueRequest;
import com.couchbase.client.core.msg.kv.RangeScanContinueRequest;
import com.couchbase.client.core.msg.kv.RangeScanContinueResponse;
import com.couchbase.client.core.msg.kv.UnlockRequest;
import com.couchbase.client.core.retry.RetryOrchestrator;
import com.couchbase.client.core.retry.RetryReason;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.transaction.support.TransactionFields;
import com.couchbase.client.core.util.UnsignedLEB128;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/couchbase/client/core/io/netty/kv/KeyValueMessageHandler.class */
public class KeyValueMessageHandler extends ChannelDuplexHandler {
    private static final Logger log = LoggerFactory.getLogger(KeyValueMessageHandler.class);
    private final EndpointContext endpointContext;
    private final CompressionConfig compressionConfig;
    private final EventBus eventBus;
    private final Optional<String> bucketName;
    private final BaseEndpoint endpoint;
    private IoContext ioContext;
    private KeyValueChannelContext channelContext;
    private ErrorMap errorMap;
    private final boolean isInternalTracer;
    private final IntObjectMap<KeyValueRequest<Response>> writtenRequests = new IntObjectHashMap();
    private final IntObjectMap<Long> writtenRequestDispatchTimings = new IntObjectHashMap();
    private final IntObjectMap<RequestSpan> writtenRequestDispatchSpans = new IntObjectHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/couchbase/client/core/io/netty/kv/KeyValueMessageHandler$OutdatedCollectionContext.class */
    public static class OutdatedCollectionContext extends IoContext {
        private final CollectionMap collectionMap;

        public OutdatedCollectionContext(IoContext ioContext, CollectionMap collectionMap) {
            super(ioContext, ioContext.localSocket(), ioContext.remoteSocket(), ioContext.bucket());
            this.collectionMap = collectionMap;
        }

        @Override // com.couchbase.client.core.io.IoContext, com.couchbase.client.core.CoreContext, com.couchbase.client.core.cnc.AbstractContext
        public void injectExportableParams(Map<String, Object> map) {
            super.injectExportableParams(map);
            map.put("open", this.collectionMap.inner().entrySet().stream().map(entry -> {
                HashMap hashMap = new HashMap(((CollectionIdentifier) entry.getKey()).toMap());
                hashMap.put(TransactionFields.ATR_FIELD_PER_DOC_ID, "0x" + Long.toHexString(UnsignedLEB128.decode((byte[]) entry.getValue())));
                return hashMap;
            }).collect(Collectors.toList()));
        }
    }

    public KeyValueMessageHandler(BaseEndpoint baseEndpoint, EndpointContext endpointContext, Optional<String> optional) {
        this.endpoint = baseEndpoint;
        this.endpointContext = endpointContext;
        this.compressionConfig = endpointContext.environment().compressionConfig();
        this.eventBus = endpointContext.environment().eventBus();
        this.bucketName = optional;
        this.isInternalTracer = CbTracing.isInternalTracer(endpointContext.coreResources().requestTracer());
    }

    @Override // com.couchbase.client.core.deps.io.netty.channel.ChannelInboundHandlerAdapter, com.couchbase.client.core.deps.io.netty.channel.ChannelInboundHandler
    public void channelActive(ChannelHandlerContext channelHandlerContext) {
        this.ioContext = new IoContext(this.endpointContext, channelHandlerContext.channel().localAddress(), channelHandlerContext.channel().remoteAddress(), this.endpointContext.bucket());
        this.errorMap = (ErrorMap) channelHandlerContext.channel().attr(ChannelAttributes.ERROR_MAP_KEY).get();
        Set set = (Set) channelHandlerContext.channel().attr(ChannelAttributes.SERVER_FEATURE_KEY).get();
        if (set == null) {
            set = Collections.emptySet();
        }
        this.channelContext = new KeyValueChannelContext(set.contains(ServerFeature.SNAPPY) ? this.compressionConfig : null, this.bucketName, this.ioContext.core().configurationProvider().collectionMap(), channelHandlerContext.channel().id(), set);
        channelHandlerContext.fireChannelActive();
    }

    @Override // com.couchbase.client.core.deps.io.netty.channel.ChannelDuplexHandler, com.couchbase.client.core.deps.io.netty.channel.ChannelOutboundHandler
    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) {
        if (!(obj instanceof KeyValueRequest)) {
            this.eventBus.publish(new InvalidRequestDetectedEvent(this.ioContext, ServiceType.KV, obj));
            channelHandlerContext.channel().close().addListener2(future -> {
                this.eventBus.publish(new ChannelClosedProactivelyEvent(this.ioContext, ChannelClosedProactivelyEvent.Reason.INVALID_REQUEST_DETECTED));
            });
            return;
        }
        KeyValueRequest<Response> keyValueRequest = (KeyValueRequest) obj;
        int opaque = keyValueRequest.opaque();
        this.writtenRequests.put(opaque, (int) keyValueRequest);
        try {
            channelHandlerContext.write(keyValueRequest.encode(channelHandlerContext.alloc(), opaque, this.channelContext), channelPromise);
            this.writtenRequestDispatchTimings.put(opaque, (int) Long.valueOf(System.nanoTime()));
            if (keyValueRequest.requestSpan() != null) {
                RequestSpan requestSpan = this.endpointContext.coreResources().requestTracer().requestSpan(TracingIdentifiers.SPAN_DISPATCH, keyValueRequest.requestSpan());
                if (!this.isInternalTracer) {
                    TracingUtils.setCommonDispatchSpanAttributes(requestSpan, (String) channelHandlerContext.channel().attr(ChannelAttributes.CHANNEL_ID_KEY).get(), this.ioContext.localHostname(), this.ioContext.localPort(), this.endpoint.remoteHostname(), this.endpoint.remotePort(), null);
                    TracingUtils.setNumericOperationId(requestSpan, keyValueRequest.opaque());
                    TracingUtils.setCommonKVSpanAttributes(requestSpan, keyValueRequest);
                }
                this.writtenRequestDispatchSpans.put(opaque, (int) requestSpan);
            }
        } catch (Throwable th) {
            this.writtenRequests.remove(opaque);
            if (!(th instanceof CollectionNotFoundException) || !this.channelContext.collectionsEnabled()) {
                keyValueRequest.fail(th);
                return;
            }
            ConfigurationProvider configurationProvider = this.ioContext.core().configurationProvider();
            if (configurationProvider.collectionRefreshInProgress(keyValueRequest.collectionIdentifier())) {
                RetryOrchestrator.maybeRetry(this.ioContext, keyValueRequest, RetryReason.COLLECTION_MAP_REFRESH_IN_PROGRESS);
            } else if (configurationProvider.config().bucketConfig(keyValueRequest.bucket()) instanceof MemcachedBucketConfig) {
                keyValueRequest.fail(FeatureNotAvailableException.collectionsForMemcached());
            } else {
                handleOutdatedCollection(keyValueRequest, RetryReason.COLLECTION_NOT_FOUND);
            }
        }
    }

    @Override // com.couchbase.client.core.deps.io.netty.channel.ChannelInboundHandlerAdapter, com.couchbase.client.core.deps.io.netty.channel.ChannelInboundHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        try {
            if (obj instanceof ByteBuf) {
                decode(channelHandlerContext, (ByteBuf) obj);
            } else {
                this.ioContext.environment().eventBus().publish(new UnsupportedResponseTypeReceivedEvent(this.ioContext, obj));
                HandlerUtils.closeChannelWithReason(this.ioContext, channelHandlerContext, ChannelClosedProactivelyEvent.Reason.INVALID_RESPONSE_FORMAT_DETECTED);
            }
        } finally {
            if (this.endpoint != null) {
                this.endpoint.markRequestCompletion();
            }
            ReferenceCountUtil.release(obj);
        }
    }

    @Override // com.couchbase.client.core.deps.io.netty.channel.ChannelInboundHandlerAdapter, com.couchbase.client.core.deps.io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        Iterator<KeyValueRequest<Response>> it = this.writtenRequests.values().iterator();
        while (it.hasNext()) {
            RetryOrchestrator.maybeRetry(this.ioContext, it.next(), RetryReason.CHANNEL_CLOSED_WHILE_IN_FLIGHT);
        }
        channelHandlerContext.fireChannelInactive();
    }

    private void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        int opaque = MemcacheProtocol.opaque(byteBuf);
        KeyValueRequest<Response> remove = this.writtenRequests.remove(opaque);
        if (remove == null) {
            handleUnknownResponseReceived(channelHandlerContext, byteBuf);
            return;
        }
        long completeRequestTimings = completeRequestTimings(remove, byteBuf, opaque);
        short status = MemcacheProtocol.status(byteBuf);
        ResponseStatus decodeStatus = MemcacheProtocol.decodeStatus(status);
        ErrorMap.ErrorCode decodeErrorCode = decodeStatus != ResponseStatus.SUCCESS ? decodeErrorCode(status) : null;
        if (decodeErrorCode != null) {
            remove.errorCode(decodeErrorCode);
        }
        boolean z = false;
        if (decodeStatus == ResponseStatus.UNKNOWN) {
            z = true;
            if (decodeErrorCode != null) {
                this.ioContext.environment().eventBus().publish(new KeyValueErrorMapCodeHandledEvent(this.ioContext, decodeErrorCode));
                decodeStatus = handleErrorCode(channelHandlerContext, decodeErrorCode);
            }
            this.ioContext.environment().eventBus().publish(new UnknownResponseStatusReceivedEvent(this.ioContext, status));
        }
        boolean z2 = remove instanceof RangeScanContinueRequest;
        if (z && decodeErrorCode != null && decodeErrorCode.attributes().contains(ErrorMap.ErrorAttribute.FETCH_CONFIG)) {
            log.warn("Refreshing cluster config in response to error code {} ; details={}", Short.valueOf(status), decodeErrorCode);
            this.endpointContext.core().configurationProvider().signalConfigChanged();
        }
        if (decodeStatus == ResponseStatus.NOT_MY_VBUCKET && !z2) {
            handleNotMyVbucket(remove, byteBuf);
            return;
        }
        if (decodeStatus == ResponseStatus.UNKNOWN_COLLECTION) {
            handleOutdatedCollection(remove, RetryReason.KV_COLLECTION_OUTDATED);
            return;
        }
        if (z && errorMapIndicatesRetry(decodeErrorCode)) {
            RetryOrchestrator.maybeRetry(this.ioContext, remove, RetryReason.KV_ERROR_MAP_INDICATED);
        } else if (statusIndicatesInvalidChannel(decodeStatus)) {
            HandlerUtils.closeChannelWithReason(this.ioContext, channelHandlerContext, ChannelClosedProactivelyEvent.Reason.KV_RESPONSE_CONTAINED_CLOSE_INDICATION);
        } else {
            retryOrComplete(remove, byteBuf, decodeStatus, z2, completeRequestTimings);
        }
    }

    private void retryOrComplete(KeyValueRequest<Response> keyValueRequest, ByteBuf byteBuf, ResponseStatus responseStatus, boolean z, long j) {
        RetryReason statusCodeIndicatesRetry = statusCodeIndicatesRetry(responseStatus, keyValueRequest);
        if (statusCodeIndicatesRetry != null) {
            RetryOrchestrator.maybeRetry(this.ioContext, keyValueRequest, statusCodeIndicatesRetry);
            return;
        }
        if (z) {
            decodeAndCompleteRangeScanContinue(keyValueRequest, byteBuf, j);
        } else if (keyValueRequest.completed()) {
            this.ioContext.environment().orphanReporter().report(keyValueRequest);
        } else {
            decodeAndComplete(keyValueRequest, byteBuf);
        }
    }

    private long completeRequestTimings(KeyValueRequest<Response> keyValueRequest, ByteBuf byteBuf, int i) {
        long parseServerDurationFromResponse = MemcacheProtocol.parseServerDurationFromResponse(byteBuf);
        keyValueRequest.context().serverLatency(parseServerDurationFromResponse);
        long longValue = this.writtenRequestDispatchTimings.remove(i).longValue();
        keyValueRequest.context().dispatchLatency(System.nanoTime() - longValue);
        RequestSpan remove = this.writtenRequestDispatchSpans.remove(i);
        if (remove != null) {
            if (!this.isInternalTracer) {
                TracingUtils.setServerDurationAttribute(remove, parseServerDurationFromResponse);
            }
            remove.end();
        }
        return longValue;
    }

    private void decodeAndComplete(KeyValueRequest<Response> keyValueRequest, ByteBuf byteBuf) {
        try {
            keyValueRequest.succeed(keyValueRequest.decode(byteBuf, this.channelContext));
        } catch (Throwable th) {
            keyValueRequest.fail(new DecodingFailureException(th.getMessage(), th, new GenericRequestErrorContext(keyValueRequest)));
        }
    }

    private void decodeAndCompleteRangeScanContinue(KeyValueRequest<Response> keyValueRequest, ByteBuf byteBuf, long j) {
        RangeScanContinueResponse rangeScanContinueResponse = (RangeScanContinueResponse) keyValueRequest.decode(byteBuf, this.channelContext);
        if (!keyValueRequest.completed()) {
            keyValueRequest.succeed(rangeScanContinueResponse);
        }
        ResponseStatus status = rangeScanContinueResponse.status();
        if (!(status == ResponseStatus.COMPLETE || status == ResponseStatus.CONTINUE || status == ResponseStatus.SUCCESS)) {
            rangeScanContinueResponse.failFeed(new RangeScanPartitionFailedException("Stream continue failed with non-successful response status", status));
            return;
        }
        boolean z = status == ResponseStatus.COMPLETE;
        rangeScanContinueResponse.feedItems(MemcacheProtocol.body(byteBuf).orElse(Unpooled.EMPTY_BUFFER), z, z || status == ResponseStatus.CONTINUE);
        if (rangeScanContinueResponse.status() == ResponseStatus.SUCCESS) {
            this.writtenRequests.put(keyValueRequest.opaque(), (int) keyValueRequest);
            this.writtenRequestDispatchTimings.put(keyValueRequest.opaque(), (int) Long.valueOf(j));
        }
    }

    private boolean statusIndicatesInvalidChannel(ResponseStatus responseStatus) {
        return responseStatus == ResponseStatus.INTERNAL_SERVER_ERROR || (responseStatus == ResponseStatus.NO_BUCKET && this.bucketName.isPresent()) || responseStatus == ResponseStatus.NOT_INITIALIZED;
    }

    private void handleUnknownResponseReceived(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        this.ioContext.environment().eventBus().publish(new UnknownResponseReceivedEvent(this.ioContext, ByteBufUtil.getBytes(byteBuf)));
        HandlerUtils.closeChannelWithReason(this.ioContext, channelHandlerContext, ChannelClosedProactivelyEvent.Reason.KV_RESPONSE_CONTAINED_UNKNOWN_OPAQUE);
    }

    private ResponseStatus handleErrorCode(ChannelHandlerContext channelHandlerContext, ErrorMap.ErrorCode errorCode) {
        if (!errorCode.attributes().contains(ErrorMap.ErrorAttribute.CONN_STATE_INVALIDATED)) {
            return errorCode.attributes().contains(ErrorMap.ErrorAttribute.TEMP) ? ResponseStatus.TEMPORARY_FAILURE : errorCode.attributes().contains(ErrorMap.ErrorAttribute.AUTH) ? ResponseStatus.NO_ACCESS : errorCode.attributes().contains(ErrorMap.ErrorAttribute.ITEM_LOCKED) ? ResponseStatus.LOCKED : ResponseStatus.UNKNOWN;
        }
        HandlerUtils.closeChannelWithReason(this.ioContext, channelHandlerContext, ChannelClosedProactivelyEvent.Reason.KV_RESPONSE_CONTAINED_CLOSE_INDICATION);
        return ResponseStatus.UNKNOWN;
    }

    private boolean errorMapIndicatesRetry(ErrorMap.ErrorCode errorCode) {
        return errorCode != null && (errorCode.attributes().contains(ErrorMap.ErrorAttribute.RETRY_NOW) || errorCode.attributes().contains(ErrorMap.ErrorAttribute.RETRY_LATER));
    }

    private RetryReason statusCodeIndicatesRetry(ResponseStatus responseStatus, Request<?> request) {
        switch (responseStatus) {
            case LOCKED:
                if (request instanceof UnlockRequest) {
                    return null;
                }
                return RetryReason.KV_LOCKED;
            case TEMPORARY_FAILURE:
            case SERVER_BUSY:
                return RetryReason.KV_TEMPORARY_FAILURE;
            case SYNC_WRITE_IN_PROGRESS:
                return RetryReason.KV_SYNC_WRITE_IN_PROGRESS;
            case SYNC_WRITE_RE_COMMIT_IN_PROGRESS:
                return RetryReason.KV_SYNC_WRITE_RE_COMMIT_IN_PROGRESS;
            default:
                return null;
        }
    }

    private ErrorMap.ErrorCode decodeErrorCode(short s) {
        if (this.errorMap != null) {
            return this.errorMap.errors().get(Short.valueOf(s));
        }
        return null;
    }

    private void handleNotMyVbucket(KeyValueRequest<? extends Response> keyValueRequest, ByteBuf byteBuf) {
        keyValueRequest.indicateRejectedWithNotMyVbucket();
        this.eventBus.publish(new NotMyVbucketReceivedEvent(this.ioContext, keyValueRequest.partition()));
        String host = keyValueRequest.context().lastDispatchedTo() != null ? keyValueRequest.context().lastDispatchedTo().host() : null;
        RetryOrchestrator.maybeRetry(this.ioContext, keyValueRequest, RetryReason.KV_NOT_MY_VBUCKET);
        String trim = MemcacheProtocol.bodyAsString(byteBuf).trim();
        if (trim.startsWith("{")) {
            this.ioContext.core().configurationProvider().proposeBucketConfig(new ProposedBucketConfigContext(keyValueRequest.bucket(), trim, host));
        }
    }

    private void handleOutdatedCollection(KeyValueRequest<? extends Response> keyValueRequest, RetryReason retryReason) {
        this.eventBus.publish(new CollectionOutdatedHandledEvent(keyValueRequest.collectionIdentifier(), retryReason, new OutdatedCollectionContext(this.ioContext, this.ioContext.core().configurationProvider().collectionMap())));
        this.ioContext.core().configurationProvider().refreshCollectionId(keyValueRequest.collectionIdentifier());
        RetryOrchestrator.maybeRetry(this.ioContext, keyValueRequest, retryReason);
    }
}
