package com.azure.core.amqp.implementation.handler;

import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.implementation.AmqpLoggingUtils;
import com.azure.core.amqp.implementation.AmqpMetricsProvider;
import com.azure.core.amqp.implementation.ClientConstants;
import com.azure.core.util.logging.LoggingEventBuilder;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.class */
public class ReceiveLinkHandler extends LinkHandler {
    private final String linkName;
    private final AtomicBoolean isRemoteActive;
    private final AtomicBoolean isTerminated;
    private final Sinks.Many<Delivery> deliveries;
    private final Set<Delivery> queuedDeliveries;
    private final String entityPath;

    @Deprecated
    public ReceiveLinkHandler(String str, String str2, String str3, String str4) {
        this(str, str2, str3, str4, new AmqpMetricsProvider(null, str2, str4));
    }

    public ReceiveLinkHandler(String str, String str2, String str3, String str4, AmqpMetricsProvider amqpMetricsProvider) {
        super(str, str2, str4, amqpMetricsProvider);
        this.isRemoteActive = new AtomicBoolean();
        this.isTerminated = new AtomicBoolean();
        this.deliveries = Sinks.many().multicast().onBackpressureBuffer();
        this.queuedDeliveries = Collections.newSetFromMap(new ConcurrentHashMap());
        this.linkName = (String) Objects.requireNonNull(str3, "'linkName' cannot be null.");
        this.entityPath = (String) Objects.requireNonNull(str4, "'entityPath' cannot be null.");
    }

    public String getLinkName() {
        return this.linkName;
    }

    public Flux<Delivery> getDeliveredMessages() {
        Flux asFlux = this.deliveries.asFlux();
        Set<Delivery> set = this.queuedDeliveries;
        Objects.requireNonNull(set);
        return asFlux.doOnNext((v1) -> {
            r1.remove(v1);
        });
    }

    @Override // com.azure.core.amqp.implementation.handler.Handler, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isTerminated.getAndSet(true)) {
            return;
        }
        clearAndCompleteDeliveries("Could not emit deliveries.close when closing handler.");
        onNext(EndpointState.CLOSED);
    }

    public void onLinkLocalOpen(Event event) {
        Link link = event.getLink();
        if (link instanceof Receiver) {
            this.logger.atVerbose().addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(ClientConstants.LINK_NAME_KEY, link.getName()).addKeyValue("localSource", link.getSource()).log("onLinkLocalOpen");
        }
    }

    public void onLinkRemoteOpen(Event event) {
        Link link = event.getLink();
        if (link instanceof Receiver) {
            LoggingEventBuilder addKeyValue = this.logger.atInfo().addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(ClientConstants.LINK_NAME_KEY, link.getName());
            if (link.getRemoteSource() != null) {
                addKeyValue.addKeyValue("remoteSource", link.getRemoteSource());
                if (!this.isRemoteActive.getAndSet(true)) {
                    onNext(EndpointState.ACTIVE);
                }
            } else {
                addKeyValue.addKeyValue("action", "waitingForError");
            }
            addKeyValue.log("onLinkRemoteOpen");
        }
    }

    public void onDelivery(Event event) {
        if (!this.isRemoteActive.getAndSet(true)) {
            onNext(EndpointState.ACTIVE);
        }
        Delivery delivery = event.getDelivery();
        Receiver link = delivery.getLink();
        boolean isSettled = delivery.isSettled();
        if (!delivery.isPartial()) {
            if (isSettled) {
                if (link != null) {
                    AmqpLoggingUtils.addErrorCondition(this.logger.atInfo(), link.getRemoteCondition()).addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(ClientConstants.LINK_NAME_KEY, this.linkName).addKeyValue(ClientConstants.UPDATED_LINK_CREDIT_KEY, link.getCredit()).addKeyValue(ClientConstants.REMOTE_CREDIT_KEY, link.getRemoteCredit()).addKeyValue(ClientConstants.IS_SETTLED_DELIVERY_KEY, delivery.isSettled()).log("onDelivery. Was already settled.");
                } else {
                    this.logger.atWarning().addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(ClientConstants.IS_SETTLED_DELIVERY_KEY, delivery.isSettled()).log("Settled delivery with no link.");
                }
            } else if (link.getLocalState() == EndpointState.CLOSED) {
                delivery.disposition(new Modified());
                delivery.settle();
            } else {
                this.queuedDeliveries.add(delivery);
                this.deliveries.emitNext(delivery, (signalType, emitResult) -> {
                    this.logger.atWarning().addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(ClientConstants.LINK_NAME_KEY, this.linkName).addKeyValue(ClientConstants.EMIT_RESULT_KEY, emitResult).log("Could not emit delivery. {}", new Object[]{delivery});
                    if (emitResult != Sinks.EmitResult.FAIL_OVERFLOW || link.getLocalState() == EndpointState.CLOSED) {
                        return false;
                    }
                    link.setCondition(new ErrorCondition(Symbol.getSymbol("delivery-buffer-overflow"), "Deliveries are not processed fast enough. Closing local link."));
                    link.close();
                    return true;
                });
            }
        }
        if (link != null) {
            AmqpLoggingUtils.addErrorCondition(this.logger.atVerbose(), link.getRemoteCondition()).addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(ClientConstants.LINK_NAME_KEY, this.linkName).addKeyValue(ClientConstants.UPDATED_LINK_CREDIT_KEY, link.getCredit()).addKeyValue(ClientConstants.REMOTE_CREDIT_KEY, link.getRemoteCredit()).addKeyValue(ClientConstants.IS_PARTIAL_DELIVERY_KEY, delivery.isPartial()).addKeyValue(ClientConstants.IS_SETTLED_DELIVERY_KEY, isSettled).log("onDelivery.");
        }
    }

    @Override // com.azure.core.amqp.implementation.handler.LinkHandler
    public void onLinkLocalClose(Event event) {
        super.onLinkLocalClose(event);
        if (this.isRemoteActive.get()) {
            return;
        }
        this.logger.atInfo().addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(ClientConstants.LINK_NAME_KEY, this.linkName).log("Receiver link was never active. Closing endpoint states");
        super.close();
    }

    @Override // com.azure.core.amqp.implementation.handler.LinkHandler
    public void onLinkRemoteClose(Event event) {
        clearAndCompleteDeliveries("Could not complete 'deliveries' when remotely closed.");
        super.onLinkRemoteClose(event);
    }

    @Override // com.azure.core.amqp.implementation.handler.LinkHandler
    public void onLinkFinal(Event event) {
        close();
        super.onLinkFinal(event);
    }

    private void clearAndCompleteDeliveries(String str) {
        this.deliveries.emitComplete((signalType, emitResult) -> {
            this.logger.atVerbose().addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(ClientConstants.LINK_NAME_KEY, this.linkName).log(str);
            return false;
        });
        this.queuedDeliveries.forEach(delivery -> {
            delivery.disposition(new Modified());
            delivery.settle();
        });
        this.queuedDeliveries.clear();
    }

    @Override // com.azure.core.amqp.implementation.handler.LinkHandler
    public /* bridge */ /* synthetic */ AmqpErrorContext getErrorContext(Link link) {
        return super.getErrorContext(link);
    }

    @Override // com.azure.core.amqp.implementation.handler.LinkHandler
    public /* bridge */ /* synthetic */ void onLinkRemoteDetach(Event event) {
        super.onLinkRemoteDetach(event);
    }
}
