package com.solace.messaging.receiver;

import com.solace.messaging.DirectMessagePublisherBuilder;
import com.solace.messaging.PubSubPlusClientException;
import com.solace.messaging.publisher.DirectMessagePublisher;
import com.solace.messaging.publisher.OutboundMessage;
import com.solace.messaging.receiver.MessageReceiver;
import com.solace.messaging.receiver.RequestReplyMessageReceiver;
import com.solace.messaging.resources.ReceiverInfo;
import com.solace.messaging.resources.ShareName;
import com.solace.messaging.resources.Topic;
import com.solace.messaging.resources.TopicSubscription;
import com.solace.messaging.util.CompletionListener;
import com.solace.messaging.util.LifecycleControl;
import com.solace.messaging.util.Manageable;
import com.solace.messaging.util.TypedProperties;
import com.solace.messaging.util.async.ExtendedCompletableFuture;
import com.solace.messaging.util.internal.Internal;
import com.solace.messaging.util.internal.MessagingServiceInternalView;
import com.solace.messaging.util.internal.SolaceMessageUtil;
import com.solace.messaging.util.internal.Validation;
import com.solacesystems.jcsmp.BytesXMLMessage;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicStampedReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.annotation.versioning.ProviderType;

@Internal
@ProviderType
/* loaded from: input_file:com/solace/messaging/receiver/RequestReplyMessageReceiverImpl.class */
public class RequestReplyMessageReceiverImpl implements RequestReplyMessageReceiver {
    static final int STATE_NOT_STARTED = 0;
    static final int STATE_STARTING = 1;
    static final int STATE_STARTED = 2;
    static final int STATE_TERMINATING = 3;
    static final int STATE_TERMINATED = 4;
    final AtomicStampedReference<CompletableFuture> stateHolder;
    private final long id;
    private final String instanceName;
    private final MessagingServiceInternalView serviceInternalView;
    private final DirectMessageReceiverImpl delegateReceiver;
    private final DirectMessagePublisher delegatePublisher;
    private final MessageCorrelationKeyProvider messageKeyProvider;
    private final ReceiverInfo receiverInfo;
    private LifecycleControl.TerminationNotificationListener applicationTerminationNotificationListener;
    private static final Log logger = LogFactory.getLog(RequestReplyMessageReceiverImpl.class);
    private static final AtomicLong instanceIdGenerator = new AtomicLong(0);

    /* JADX INFO: Access modifiers changed from: package-private */
    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/receiver/RequestReplyMessageReceiverImpl$CorrelationContext.class */
    public static class CorrelationContext implements Serializable {
        private static final long serialVersionUID = 426395239935681793L;
        private final Long correlationKey;
        private volatile Object userContext;
        private volatile OutboundMessage linkedMessage;

        CorrelationContext(Long l, Object obj, OutboundMessage outboundMessage) {
            this.correlationKey = l;
            this.userContext = obj;
            this.linkedMessage = outboundMessage;
        }

        public Long getCorrelationKey() {
            return this.correlationKey;
        }

        public Object getUserContext() {
            return this.userContext;
        }

        public OutboundMessage getLinkedMessage() {
            return this.linkedMessage;
        }

        public void clear() {
            this.linkedMessage = null;
            this.userContext = null;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.correlationKey, ((CorrelationContext) obj).correlationKey);
        }

        public int hashCode() {
            return this.correlationKey != null ? this.correlationKey.hashCode() : RequestReplyMessageReceiverImpl.STATE_NOT_STARTED;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/receiver/RequestReplyMessageReceiverImpl$MessageCorrelationKeyProvider.class */
    public static class MessageCorrelationKeyProvider implements Serializable {
        final AtomicLong messageKeyProvider = new AtomicLong(ThreadLocalRandom.current().nextLong(1, 1000));

        MessageCorrelationKeyProvider() {
        }

        Long nextLongKey() {
            return Long.valueOf(this.messageKeyProvider.incrementAndGet());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/receiver/RequestReplyMessageReceiverImpl$ReplierImpl.class */
    public class ReplierImpl implements RequestReplyMessageReceiver.Replier {
        private final MessagingServiceInternalView serviceInternalView;
        private final InboundMessage inboundMessage;
        private final Topic replyTo;
        private final String correlationId;

        ReplierImpl(MessagingServiceInternalView messagingServiceInternalView, InboundMessage inboundMessage, Topic topic, String str) {
            this.serviceInternalView = messagingServiceInternalView;
            this.inboundMessage = inboundMessage;
            this.replyTo = topic;
            this.correlationId = str;
        }

        @Override // com.solace.messaging.receiver.RequestReplyMessageReceiver.Replier
        public void reply(OutboundMessage outboundMessage) throws PubSubPlusClientException.PublisherOverflowException, PubSubPlusClientException {
            reply(outboundMessage, null);
        }

        @Override // com.solace.messaging.receiver.RequestReplyMessageReceiver.Replier
        public void reply(OutboundMessage outboundMessage, Properties properties) throws PubSubPlusClientException.PublisherOverflowException, PubSubPlusClientException {
            BytesXMLMessage solaceMessage = SolaceMessageUtil.getSolaceMessage(outboundMessage);
            solaceMessage.setAsReplyMessage(true);
            if (this.correlationId != null) {
                solaceMessage.setCorrelationId(this.correlationId);
            }
            solaceMessage.setCorrelationKey(new CorrelationContext(RequestReplyMessageReceiverImpl.this.messageKeyProvider.nextLongKey(), this.inboundMessage, outboundMessage));
            try {
                RequestReplyMessageReceiverImpl.this.delegatePublisher.publish(outboundMessage, this.replyTo, properties);
            } catch (IllegalStateException e) {
                if (RequestReplyMessageReceiverImpl.this.delegatePublisher.isTerminated()) {
                    RequestReplyMessageReceiverImpl.this.serviceInternalView.getApiMetricsCollector().incrementMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED);
                }
                throw e;
            }
        }
    }

    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/receiver/RequestReplyMessageReceiverImpl$RequestReplyReceiverInfoImpl.class */
    private class RequestReplyReceiverInfoImpl implements ReceiverInfo {
        private RequestReplyReceiverInfoImpl() {
        }

        @Override // com.solace.messaging.resources.ReceiverInfo
        public String getInstanceName() {
            return RequestReplyMessageReceiverImpl.this.instanceName;
        }

        @Override // com.solace.messaging.util.Identifiable
        public long getId() {
            return RequestReplyMessageReceiverImpl.this.id;
        }
    }

    public RequestReplyMessageReceiverImpl(MessagingServiceInternalView messagingServiceInternalView, TypedProperties typedProperties, DirectMessagePublisherBuilder directMessagePublisherBuilder, List<TopicSubscription> list) {
        this(messagingServiceInternalView, typedProperties, directMessagePublisherBuilder, list, ShareName.ShareNameImpl.noOp());
    }

    public RequestReplyMessageReceiverImpl(MessagingServiceInternalView messagingServiceInternalView, TypedProperties typedProperties, DirectMessagePublisherBuilder directMessagePublisherBuilder, List<TopicSubscription> list, ShareName shareName) {
        this.stateHolder = new AtomicStampedReference<>(null, STATE_NOT_STARTED);
        this.id = instanceIdGenerator.incrementAndGet();
        this.instanceName = "RequestReplyMessageReceiver@" + this.id;
        this.receiverInfo = new RequestReplyReceiverInfoImpl();
        this.serviceInternalView = messagingServiceInternalView;
        this.delegateReceiver = new DirectMessageReceiverImpl(messagingServiceInternalView, typedProperties, list, shareName);
        this.delegatePublisher = directMessagePublisherBuilder.build();
        this.messageKeyProvider = new MessageCorrelationKeyProvider();
        this.delegateReceiver.setTerminationNotificationListener(terminationEvent -> {
            if (this.applicationTerminationNotificationListener != null) {
                try {
                    this.applicationTerminationNotificationListener.onTermination(terminationEvent);
                } catch (Exception e) {
                }
            }
            terminateOnUnsolicitedInterruption();
        });
    }

    @Override // com.solace.messaging.receiver.MessageReceiver
    public void setReceiveFailureListener(MessageReceiver.ReceiveFailureListener receiveFailureListener) {
        this.delegateReceiver.setReceiveFailureListener(receiveFailureListener);
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public void setTerminationNotificationListener(LifecycleControl.TerminationNotificationListener terminationNotificationListener) {
        this.applicationTerminationNotificationListener = terminationNotificationListener;
    }

    @Override // com.solace.messaging.receiver.RequestReplyMessageReceiver
    public void setReplyFailureListener(RequestReplyMessageReceiver.ReplyFailureListener replyFailureListener) {
        if (replyFailureListener == null) {
            this.delegatePublisher.setPublishFailureListener(null);
        } else {
            this.delegatePublisher.setPublishFailureListener(failedPublishEvent -> {
                Object correlationKey;
                Object userContext;
                InboundMessage inboundMessage = STATE_NOT_STARTED;
                OutboundMessage message = failedPublishEvent.getMessage();
                if (message != null && (correlationKey = message.getCorrelationKey()) != null && (correlationKey instanceof CorrelationContext) && (userContext = ((CorrelationContext) correlationKey).getUserContext()) != null && (userContext instanceof InboundMessage)) {
                    inboundMessage = (InboundMessage) userContext;
                }
                try {
                    replyFailureListener.onFailedReply(new RequestReplyMessageReceiver.FailedReplyEvent(inboundMessage, failedPublishEvent.getMessage(), failedPublishEvent.getDestination(), failedPublishEvent.getException(), failedPublishEvent.getTimeStamp()));
                } catch (Exception e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Application code threw an unhandled exception while processing FailedReplyEvent in ReplyFailureListener", e);
                    }
                }
            });
        }
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public boolean isRunning() {
        return STATE_STARTED == this.stateHolder.getStamp();
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public boolean isTerminated() {
        return STATE_TERMINATED == this.stateHolder.getStamp();
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public boolean isTerminating() {
        return STATE_TERMINATING == this.stateHolder.getStamp();
    }

    @Override // com.solace.messaging.receiver.RequestReplyMessageReceiver, com.solace.messaging.receiver.MessageReceiver, com.solace.messaging.util.LifecycleControl
    public RequestReplyMessageReceiver start() throws PubSubPlusClientException {
        try {
            startAsync().get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Message receiver start was canceled", e);
        } catch (CancellationException e2) {
            throw new PubSubPlusClientException.RequestInterruptedException("Message receiver start was canceled", e2);
        } catch (ExecutionException e3) {
            Throwable cause = e3.getCause();
            if (cause != null) {
                if (cause instanceof PubSubPlusClientException) {
                    throw ((PubSubPlusClientException) cause);
                }
                if (cause instanceof IllegalStateException) {
                    throw ((IllegalStateException) cause);
                }
                throw new PubSubPlusClientException(cause);
            }
            if (logger.isErrorEnabled()) {
                logger.error(this.instanceName + " failed to start", e3);
            }
        }
        return this;
    }

    @Override // com.solace.messaging.receiver.RequestReplyMessageReceiver, com.solace.messaging.receiver.MessageReceiver, com.solace.messaging.util.AsyncLifecycleControl
    public <RequestReplyMessageReceiver> CompletableFuture<RequestReplyMessageReceiver> startAsync() {
        int stamp = this.stateHolder.getStamp();
        if (STATE_TERMINATING == stamp || STATE_TERMINATED == stamp) {
            throw new IllegalStateException("Message receiver is already terminated");
        }
        if (!this.serviceInternalView.isConnected()) {
            throw new IllegalStateException("Message receiver can't be started when service is not connected");
        }
        while (this.serviceInternalView.isConnected()) {
            int[] iArr = new int[STATE_STARTING];
            CompletableFuture<RequestReplyMessageReceiver> completableFuture = this.stateHolder.get(iArr);
            switch (iArr[STATE_NOT_STARTED]) {
                case STATE_NOT_STARTED /* 0 */:
                    ExtendedCompletableFuture extendedCompletableFuture = new ExtendedCompletableFuture();
                    if (this.stateHolder.compareAndSet(null, extendedCompletableFuture, STATE_NOT_STARTED, STATE_STARTING)) {
                        if (logger.isDebugEnabled()) {
                            logger.debug(this.instanceName + " is being started");
                        }
                        try {
                            if (this.stateHolder.compareAndSet(extendedCompletableFuture, extendedCompletableFuture, STATE_STARTING, STATE_STARTED)) {
                                this.delegatePublisher.start();
                                this.delegateReceiver.start();
                            } else if (this.stateHolder.getStamp() >= STATE_TERMINATING) {
                                onTerminate();
                                extendedCompletableFuture.completeExceptionally(new CancellationException("Starting of message receiver was interrupted"));
                                return extendedCompletableFuture;
                            }
                            extendedCompletableFuture.complete(this);
                            if (logger.isDebugEnabled()) {
                                logger.debug(this.instanceName + " is started");
                            }
                        } catch (Exception e) {
                            this.stateHolder.set(null, STATE_TERMINATED);
                            onTerminate();
                            extendedCompletableFuture.completeExceptionally(PubSubPlusClientException.of(e));
                            if (logger.isErrorEnabled()) {
                                logger.error(this.instanceName + " failed to start and is terminating", e);
                            }
                        }
                        return ExtendedCompletableFuture.onCancellation(extendedCompletableFuture, (obj, th) -> {
                            this.stateHolder.set(null, STATE_TERMINATED);
                            onTerminate();
                            if (logger.isDebugEnabled()) {
                                logger.debug(this.instanceName + " async start was canceled");
                            }
                        });
                    }
                case STATE_STARTING /* 1 */:
                case STATE_STARTED /* 2 */:
                    return completableFuture;
                case STATE_TERMINATING /* 3 */:
                case STATE_TERMINATED /* 4 */:
                default:
                    return ExtendedCompletableFuture.failedFuture(new IllegalStateException("Message receiver is already terminated"));
            }
        }
        return ExtendedCompletableFuture.failedFuture(new IllegalStateException("Message receiver can't be started when service is not connected"));
    }

    @Override // com.solace.messaging.receiver.RequestReplyMessageReceiver, com.solace.messaging.util.AsyncLifecycleControl
    public <RequestReplyMessageReceiver> void startAsync(CompletionListener<RequestReplyMessageReceiver> completionListener) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(completionListener, "Start listener can't be null");
        startAsync().whenComplete((obj, th) -> {
            Throwable cause;
            if (th == null) {
                cause = null;
            } else {
                try {
                    cause = th.getCause();
                } catch (Exception e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Application code throw an unhandled exception by processing async start completion notification", e);
                        return;
                    }
                    return;
                }
            }
            completionListener.onCompletion(obj, cause);
        });
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public void terminate(long j) throws PubSubPlusClientException.IncompleteMessageDeliveryException, PubSubPlusClientException, IllegalStateException {
        if (j == 0) {
            terminateNow();
            return;
        }
        try {
            terminateAsync(j).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Message receiver termination was interrupted", e);
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (cause == null) {
                throw new PubSubPlusClientException(e2);
            }
            if (!(cause instanceof PubSubPlusClientException)) {
                throw new PubSubPlusClientException(cause);
            }
            throw ((PubSubPlusClientException) cause);
        } catch (Exception e3) {
            if (logger.isWarnEnabled()) {
                logger.warn(this.instanceName + " encountered problem during termination.", e3);
            }
        }
    }

    void terminateNow() throws PubSubPlusClientException.IncompleteMessageDeliveryException, PubSubPlusClientException, IllegalStateException {
        this.stateHolder.set(null, STATE_TERMINATED);
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " is being non gracefully terminated");
        }
        int i = STATE_NOT_STARTED;
        PubSubPlusClientException.IncompleteMessageDeliveryException incompleteMessageDeliveryException = STATE_NOT_STARTED;
        try {
            this.delegateReceiver.terminateNow();
        } catch (PubSubPlusClientException.IncompleteMessageDeliveryException e) {
            i += e.getMessageCount();
            incompleteMessageDeliveryException = e;
        }
        try {
            this.delegatePublisher.terminate(0L);
        } catch (PubSubPlusClientException.IncompleteMessageDeliveryException e2) {
            i += e2.getMessageCount();
            incompleteMessageDeliveryException = e2;
        }
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " is terminated");
        }
        if (incompleteMessageDeliveryException != null) {
            throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %s messages could not be completed due to non graceful termination", Integer.valueOf(i)), i);
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // com.solace.messaging.util.AsyncLifecycleControl
    public CompletableFuture<Void> terminateAsync(long j) throws PubSubPlusClientException, IllegalStateException {
        Validation.smallerThanNumbersIllegal(1L, j, "Grace period < 1");
        while (true) {
            int[] iArr = new int[STATE_STARTING];
            CompletableFuture completableFuture = this.stateHolder.get(iArr);
            switch (iArr[STATE_NOT_STARTED]) {
                case STATE_NOT_STARTED /* 0 */:
                    if (!this.stateHolder.compareAndSet(null, null, STATE_NOT_STARTED, STATE_TERMINATED)) {
                        break;
                    } else {
                        onTerminate();
                        return CompletableFuture.completedFuture(null);
                    }
                case STATE_STARTING /* 1 */:
                    this.stateHolder.set(null, STATE_TERMINATED);
                    completableFuture.cancel(true);
                    return CompletableFuture.completedFuture(null);
                case STATE_STARTED /* 2 */:
                    if (!this.stateHolder.compareAndSet(completableFuture, new ExtendedCompletableFuture(), STATE_STARTED, STATE_TERMINATING)) {
                        break;
                    } else {
                        try {
                            CompletableFuture<Void> allOf = CompletableFuture.allOf(this.delegateReceiver.terminateAsync(j), this.delegatePublisher.terminateAsync(j));
                            this.stateHolder.set(null, STATE_TERMINATED);
                            return allOf;
                        } catch (Throwable th) {
                            this.stateHolder.set(null, STATE_TERMINATED);
                            throw th;
                        }
                    }
                case STATE_TERMINATING /* 3 */:
                case STATE_TERMINATED /* 4 */:
                default:
                    return CompletableFuture.completedFuture(null);
            }
        }
    }

    @Override // com.solace.messaging.util.AsyncLifecycleControl
    public void terminateAsync(CompletionListener<Void> completionListener, long j) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(completionListener, "Termination listener can't be null");
        terminateAsync(j).whenComplete((r5, th) -> {
            Throwable cause;
            if (th == null) {
                cause = null;
            } else {
                try {
                    cause = th.getCause();
                } catch (Exception e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Application code throw an unhandled exception by processing termination completion notification", e);
                        return;
                    }
                    return;
                }
            }
            completionListener.onCompletion(null, cause);
        });
    }

    @Override // com.solace.messaging.receiver.RequestReplyMessageReceiver
    public void receiveMessage(RequestReplyMessageReceiver.RequestMessageHandler requestMessageHandler) throws PubSubPlusClientException {
        InboundMessage receiveMessage = this.delegateReceiver.receiveMessage();
        if (receiveMessage != null) {
            doRequestReply(requestMessageHandler, receiveMessage);
        }
    }

    @Override // com.solace.messaging.receiver.RequestReplyMessageReceiver
    public void receiveMessage(RequestReplyMessageReceiver.RequestMessageHandler requestMessageHandler, long j) throws PubSubPlusClientException {
        InboundMessage receiveMessage = this.delegateReceiver.receiveMessage(j);
        if (receiveMessage != null) {
            doRequestReply(requestMessageHandler, receiveMessage);
        }
    }

    @Override // com.solace.messaging.receiver.RequestReplyMessageReceiver
    public void receiveAsync(RequestReplyMessageReceiver.RequestMessageHandler requestMessageHandler) throws PubSubPlusClientException {
        this.delegateReceiver.receiveAsync(inboundMessage -> {
            doRequestReply(requestMessageHandler, inboundMessage);
        });
    }

    @Override // com.solace.messaging.receiver.RequestReplyMessageReceiver
    public void receiveAsync(RequestReplyMessageReceiver.RequestMessageHandler requestMessageHandler, ExecutorService executorService) throws PubSubPlusClientException {
        this.delegateReceiver.receiveAsync(inboundMessage -> {
            doRequestReply(requestMessageHandler, inboundMessage);
        }, executorService);
    }

    @Override // com.solace.messaging.util.ManageableReceiver
    public ReceiverInfo receiverInfo() {
        return this.receiverInfo;
    }

    @Internal
    void doRequestReply(RequestReplyMessageReceiver.RequestMessageHandler requestMessageHandler, InboundMessage inboundMessage) {
        if (inboundMessage != null) {
            BytesXMLMessage solaceMessage = SolaceMessageUtil.getSolaceMessage(inboundMessage);
            Topic of = solaceMessage.getReplyTo() == null ? null : Topic.of(solaceMessage.getReplyTo().getName());
            String correlationId = solaceMessage.getCorrelationId();
            try {
                requestMessageHandler.onMessage(inboundMessage, (of == null || correlationId == null || correlationId.isEmpty()) ? null : new ReplierImpl(this.serviceInternalView, inboundMessage, of, correlationId));
            } catch (Exception e) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Application code threw an unhandled exception while processing request message in RequestMessageHandler", e);
                }
            }
        }
    }

    @Internal
    void terminateOnUnsolicitedInterruption() throws PubSubPlusClientException {
        this.stateHolder.set(null, STATE_TERMINATED);
    }

    @Internal
    void onTerminate() {
        this.delegateReceiver.terminateNow();
        this.delegatePublisher.terminate(0L);
    }
}
