package com.solace.messaging.publisher;

import com.solace.messaging.MessagingService;
import com.solace.messaging.PubSubPlusClientException;
import com.solace.messaging.publisher.OutboundMessageBuilder;
import com.solace.messaging.publisher.PublisherBuffers;
import com.solace.messaging.publisher.PublisherHealthCheck;
import com.solace.messaging.publisher.RequestReplyMessagePublisher;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.receiver.MessageReceiver;
import com.solace.messaging.resources.Topic;
import com.solace.messaging.util.CompletionListener;
import com.solace.messaging.util.LifecycleControl;
import com.solace.messaging.util.Manageable;
import com.solace.messaging.util.ManageablePublisher;
import com.solace.messaging.util.TypedProperties;
import com.solace.messaging.util.async.ExtendedCompletableFuture;
import com.solace.messaging.util.async.ThreadFactories;
import com.solace.messaging.util.internal.BiTask;
import com.solace.messaging.util.internal.ClientSession;
import com.solace.messaging.util.internal.Internal;
import com.solace.messaging.util.internal.MessagingServiceInternalView;
import com.solace.messaging.util.internal.Task;
import com.solace.messaging.util.internal.TerminationEventImpl;
import com.solace.messaging.util.internal.TerminationNotificationDispatcher;
import com.solace.messaging.util.internal.Validation;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.ClosedFacilityException;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPInterruptedException;
import com.solacesystems.jcsmp.JCSMPRequestTimeoutException;
import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler;
import com.solacesystems.jcsmp.JCSMPStreamingPublishEventHandler;
import com.solacesystems.jcsmp.RequestReplyListener;
import com.solacesystems.jcsmp.Requestor;
import com.solacesystems.jcsmp.XMLMessageProducer;
import java.io.Serializable;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.annotation.versioning.ProviderType;

@Internal
@ProviderType
/* loaded from: input_file:com/solace/messaging/publisher/RequestReplyMessagePublisherImpl.class */
public class RequestReplyMessagePublisherImpl implements RequestReplyMessagePublisher {
    static final int STATE_NOT_STARTED = 0;
    static final int STATE_STARTED = 1;
    static final int STATE_TERMINATED = 2;
    private final TypedProperties publisherConfiguration;
    private final MessagingServiceInternalView serviceInternalView;
    private volatile XMLMessageProducer sessionDefaultProducer;
    private volatile Requestor requestor;
    private PublisherHealthCheck.PublisherReadinessListener publisherReadinessListener;
    private static final Log logger = LogFactory.getLog(RequestReplyMessagePublisherImpl.class);
    private static final AtomicLong instanceIdGenerator = new AtomicLong(0);
    private static final BiTask<RequestReplyMessagePublisherImpl, AtomicInteger> postTerminateNowTask = (requestReplyMessagePublisherImpl, atomicInteger) -> {
        int outStandingRequests = requestReplyMessagePublisherImpl.outstandingRequestsTracker.getOutStandingRequests();
        if (outStandingRequests > 0) {
            atomicInteger.set(outStandingRequests);
            requestReplyMessagePublisherImpl.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED, outStandingRequests);
            if (logger.isWarnEnabled()) {
                logger.warn(requestReplyMessagePublisherImpl.instanceName + " not gracefully terminated before all messages were processed.");
            }
        }
    };
    final AtomicInteger stateHolder = new AtomicInteger(STATE_NOT_STARTED);
    private volatile boolean gracefulShutdownInProgress = false;
    private final long id = instanceIdGenerator.incrementAndGet();
    private final String instanceName = "RequestReplyMessagePublisher@" + this.id;
    private final ManageablePublisher.PublisherInfo publisherInfo = new RequestReplyPublisherInfoImpl();
    private final MessageCorrelationKeyProvider messageKeyProvider = new MessageCorrelationKeyProvider();
    private final OutStandingRequestTracker outstandingRequestsTracker = new OutStandingRequestTracker();
    private final TerminationNotificationDispatcher terminationNotificationDispatcher = new TerminationNotificationDispatcher();
    private final Task<RequestReplyMessagePublisherImpl> postTerminationClearOutstandingRequestsSilentTask = requestReplyMessagePublisherImpl -> {
        int outStandingRequests = this.outstandingRequestsTracker.getOutStandingRequests();
        if (outStandingRequests > 0) {
            requestReplyMessagePublisherImpl.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED, outStandingRequests);
            if (logger.isWarnEnabled()) {
                logger.warn(requestReplyMessagePublisherImpl.instanceName + " non-gracefully terminated before all messages were processed.");
            }
        }
        this.outstandingRequestsTracker.close();
    };
    private final MessagingService.ServiceInterruptionListener serviceInterruptionListener = serviceEvent -> {
        if (logger.isWarnEnabled()) {
            logger.warn("Shutting down publisher due to Service interruption");
        }
        if (this.stateHolder.getAndSet(STATE_TERMINATED) < STATE_TERMINATED) {
            this.terminationNotificationDispatcher.onTermination(new TerminationEventImpl(serviceEvent.getTimestamp(), serviceEvent.getMessage(), serviceEvent.getCause()));
            onTerminate(null, this.postTerminationClearOutstandingRequestsSilentTask);
        }
    };
    private final ClientSession.ClientSessionStateListener closedSessionListener = clientSessionStateChangeEvent -> {
        if (logger.isWarnEnabled()) {
            logger.info("Shutting down publisher due to service closure");
        }
        if (this.stateHolder.getAndSet(STATE_TERMINATED) < STATE_TERMINATED) {
            this.terminationNotificationDispatcher.onTermination(new TerminationEventImpl(clientSessionStateChangeEvent.getTimestamp(), clientSessionStateChangeEvent.getMessage(), clientSessionStateChangeEvent.getCause()));
            onTerminate(null, this.postTerminationClearOutstandingRequestsSilentTask);
        }
    };
    private final PublishFailureNotificationDispatcher errorNotificationDispatcher = new PublishFailureNotificationDispatcher();
    private final JCSMPStreamingPublishCorrelatingEventHandler publishEventHandler = new JCSMPStreamingPublishCorrelatingEventHandler() { // from class: com.solace.messaging.publisher.RequestReplyMessagePublisherImpl.1
        public void responseReceivedEx(Object obj) {
        }

        public void handleErrorEx(Object obj, JCSMPException jCSMPException, long j) {
            if (RequestReplyMessagePublisherImpl.logger.isErrorEnabled()) {
                RequestReplyMessagePublisherImpl.logger.error(RequestReplyMessagePublisherImpl.this.instanceName + " encountered problem during message publishing.", jCSMPException);
            }
            if (obj == null || !(obj instanceof CorrelationContext)) {
                return;
            }
            CorrelationContext correlationContext = (CorrelationContext) obj;
            RequestReplyMessagePublisherImpl.this.errorNotificationDispatcher.onException(jCSMPException, correlationContext.userContext, correlationContext.replyMessageHandler);
        }

        public void handleError(String str, JCSMPException jCSMPException, long j) {
        }

        public void responseReceived(String str) {
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/publisher/RequestReplyMessagePublisherImpl$CorrelationContext.class */
    public static class CorrelationContext implements Serializable {
        private static final long serialVersionUID = -6070196827721125521L;
        private final Long correlationKey;
        private volatile Object userContext;
        private volatile PubSubPlusClientException exception;
        private volatile OutboundMessage linkedMessage;
        private transient RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler;

        CorrelationContext(Long l, Object obj, OutboundMessage outboundMessage, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler) {
            this.correlationKey = l;
            this.userContext = obj;
            this.linkedMessage = outboundMessage;
            this.replyMessageHandler = replyMessageHandler;
        }

        public Long getCorrelationKey() {
            return this.correlationKey;
        }

        public Object getUserContext() {
            return this.userContext;
        }

        public PubSubPlusClientException getException() {
            return this.exception;
        }

        public RequestReplyMessagePublisher.ReplyMessageHandler getReplyMessageHandler() {
            return this.replyMessageHandler;
        }

        public void setException(PubSubPlusClientException pubSubPlusClientException) {
            this.exception = pubSubPlusClientException;
        }

        public OutboundMessage getLinkedMessage() {
            return this.linkedMessage;
        }

        public void clear() {
            this.linkedMessage = null;
            this.exception = null;
            this.userContext = null;
            this.replyMessageHandler = null;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.correlationKey, ((CorrelationContext) obj).correlationKey);
        }

        public int hashCode() {
            return this.correlationKey != null ? this.correlationKey.hashCode() : RequestReplyMessagePublisherImpl.STATE_NOT_STARTED;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/publisher/RequestReplyMessagePublisherImpl$MessageCorrelationKeyProvider.class */
    public static class MessageCorrelationKeyProvider implements Serializable {
        private static final long serialVersionUID = -5420202750378500494L;
        final AtomicLong messageKeyProvider = new AtomicLong(ThreadLocalRandom.current().nextLong(1, 1000));

        MessageCorrelationKeyProvider() {
        }

        Long nextLongKey() {
            return Long.valueOf(this.messageKeyProvider.incrementAndGet());
        }
    }

    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/publisher/RequestReplyMessagePublisherImpl$OutStandingRequestTracker.class */
    public class OutStandingRequestTracker {
        private final AtomicInteger outstandingRequests = new AtomicInteger(RequestReplyMessagePublisherImpl.STATE_NOT_STARTED);
        private final Lock outstandingRequestsLock = new ReentrantLock();
        private final Condition zeroOutstandingRequestsCondition = this.outstandingRequestsLock.newCondition();

        public OutStandingRequestTracker() {
        }

        public void incrementOutstandingRequestCount() {
            this.outstandingRequests.incrementAndGet();
        }

        public void decrementOutstandingRequestCount() {
            this.outstandingRequests.decrementAndGet();
            if (RequestReplyMessagePublisherImpl.this.gracefulShutdownInProgress) {
                this.outstandingRequestsLock.lock();
                try {
                    this.zeroOutstandingRequestsCondition.signalAll();
                } finally {
                    this.outstandingRequestsLock.unlock();
                }
            }
        }

        public void awaitTermination(long j) throws InterruptedException {
            if (this.outstandingRequests.get() < RequestReplyMessagePublisherImpl.STATE_STARTED) {
                return;
            }
            long currentTimeMillis = System.currentTimeMillis() + j;
            this.outstandingRequestsLock.lockInterruptibly();
            for (long j2 = j; this.outstandingRequests.get() >= RequestReplyMessagePublisherImpl.STATE_STARTED && j2 > 0; j2 = currentTimeMillis - System.currentTimeMillis()) {
                try {
                    if (this.zeroOutstandingRequestsCondition.await(j2, TimeUnit.MILLISECONDS) && this.outstandingRequests.get() < RequestReplyMessagePublisherImpl.STATE_STARTED) {
                        this.outstandingRequestsLock.unlock();
                        return;
                    }
                } finally {
                    this.outstandingRequestsLock.unlock();
                }
            }
        }

        public int getOutStandingRequests() {
            return this.outstandingRequests.get();
        }

        public void close() {
            this.outstandingRequests.set(RequestReplyMessagePublisherImpl.STATE_NOT_STARTED);
            try {
                this.zeroOutstandingRequestsCondition.signalAll();
            } finally {
                this.outstandingRequestsLock.unlock();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/publisher/RequestReplyMessagePublisherImpl$PublishFailureNotificationDispatcher.class */
    public class PublishFailureNotificationDispatcher {
        private final ExecutorService failureNotificationExecutorService;
        private final ReentrantLock closeLock;
        private final AtomicBoolean closed;

        /* JADX INFO: Access modifiers changed from: package-private */
        @Internal
        @ProviderType
        /* loaded from: input_file:com/solace/messaging/publisher/RequestReplyMessagePublisherImpl$PublishFailureNotificationDispatcher$ScheduledFailureNotification.class */
        public class ScheduledFailureNotification implements Callable<Void> {
            final Exception e;
            final PublisherBuffers.Publishable<Topic> publishable;
            final Object userContextObject;
            final RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler;

            ScheduledFailureNotification(PublisherBuffers.Publishable<Topic> publishable, Exception exc, Object obj, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler) {
                this.publishable = publishable;
                this.e = exc;
                this.userContextObject = obj;
                this.replyMessageHandler = replyMessageHandler;
            }

            ScheduledFailureNotification(Exception exc, Object obj, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler) {
                this.e = exc;
                this.userContextObject = obj;
                this.replyMessageHandler = replyMessageHandler;
                this.publishable = PublisherBuffers.Publishable.none();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler = this.replyMessageHandler;
                if (replyMessageHandler == null) {
                    return null;
                }
                replyMessageHandler.onMessage(null, this.userContextObject, mapException(this.e));
                return null;
            }

            PubSubPlusClientException mapException(Exception exc) {
                return exc instanceof PubSubPlusClientException ? (PubSubPlusClientException) exc : new PubSubPlusClientException(exc);
            }
        }

        private PublishFailureNotificationDispatcher() {
            this.failureNotificationExecutorService = Executors.newSingleThreadExecutor(new ThreadFactories.NamedDaemonThreadFactory(RequestReplyMessagePublisherImpl.this.instanceName + "-error-dispatcher"));
            this.closeLock = new ReentrantLock();
            this.closed = new AtomicBoolean(false);
        }

        void onException(PublisherBuffers.Publishable<Topic> publishable, Exception exc, Object obj, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler) {
            if (exc == null || publishable == null || replyMessageHandler == null) {
                return;
            }
            ScheduledFailureNotification scheduledFailureNotification = new ScheduledFailureNotification(publishable, exc, obj, replyMessageHandler);
            ReentrantLock reentrantLock = this.closeLock;
            reentrantLock.lock();
            try {
                try {
                    if (this.closed.get()) {
                        reentrantLock.unlock();
                    } else {
                        this.failureNotificationExecutorService.submit(scheduledFailureNotification);
                        reentrantLock.unlock();
                    }
                } catch (RejectedExecutionException e) {
                    if (RequestReplyMessagePublisherImpl.logger.isWarnEnabled()) {
                        RequestReplyMessagePublisherImpl.logger.warn(RequestReplyMessagePublisherImpl.this.instanceName + " could not schedule publisher failure notification, processing notification on a dispatcher thread");
                    }
                    try {
                        scheduledFailureNotification.call();
                    } catch (Exception e2) {
                        RequestReplyMessagePublisherImpl.logger.debug("Exception by customer callback during publish error notification processing", e2);
                        reentrantLock.unlock();
                    }
                    reentrantLock.unlock();
                }
            } catch (Throwable th) {
                reentrantLock.unlock();
                throw th;
            }
        }

        void onException(Exception exc, Object obj, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler) {
            if (exc == null || replyMessageHandler == null) {
                return;
            }
            ScheduledFailureNotification scheduledFailureNotification = new ScheduledFailureNotification(exc, obj, replyMessageHandler);
            ReentrantLock reentrantLock = this.closeLock;
            reentrantLock.lock();
            try {
                try {
                    if (this.closed.get()) {
                        reentrantLock.unlock();
                    } else {
                        this.failureNotificationExecutorService.submit(scheduledFailureNotification);
                        reentrantLock.unlock();
                    }
                } catch (RejectedExecutionException e) {
                    RequestReplyMessagePublisherImpl.logger.warn(RequestReplyMessagePublisherImpl.this.instanceName + " could not schedule publisher failure notification, processing notification on a dispatcher thread");
                    try {
                        scheduledFailureNotification.call();
                    } catch (Exception e2) {
                        RequestReplyMessagePublisherImpl.logger.debug("Exception by customer callback during publish error notification processing", e2);
                    }
                    reentrantLock.unlock();
                }
            } catch (Throwable th) {
                reentrantLock.unlock();
                throw th;
            }
        }

        void close() {
            ReentrantLock reentrantLock = this.closeLock;
            reentrantLock.lock();
            try {
                try {
                    if (this.closed.compareAndSet(false, true) && !this.failureNotificationExecutorService.isShutdown()) {
                        this.failureNotificationExecutorService.shutdown();
                    }
                } catch (Exception e) {
                    RequestReplyMessagePublisherImpl.logger.warn("Problem with closing internal failure notification dispatcher", e);
                }
            } finally {
                reentrantLock.unlock();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/publisher/RequestReplyMessagePublisherImpl$RequestReplyListenerImpl.class */
    public static class RequestReplyListenerImpl implements RequestReplyListener {
        final RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler;
        final RequestReplyMessagePublisherImpl parentPublisher;

        RequestReplyListenerImpl(RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler, RequestReplyMessagePublisherImpl requestReplyMessagePublisherImpl) {
            this.replyMessageHandler = replyMessageHandler;
            this.parentPublisher = requestReplyMessagePublisherImpl;
        }

        public void onException(JCSMPException jCSMPException, Object obj) {
            invokeCallback(null, obj, new PubSubPlusClientException((Throwable) jCSMPException));
        }

        public void onReply(BytesXMLMessage bytesXMLMessage, Object obj) {
            invokeCallback(bytesXMLMessage, obj, null);
        }

        public void onTimeout(Object obj) {
            invokeCallback(null, obj, new PubSubPlusClientException.TimeoutException("Request Timeout"));
        }

        void invokeCallback(BytesXMLMessage bytesXMLMessage, Object obj, PubSubPlusClientException pubSubPlusClientException) {
            PubSubPlusClientException pubSubPlusClientException2 = RequestReplyMessagePublisherImpl.STATE_NOT_STARTED;
            try {
                if (obj == null) {
                    pubSubPlusClientException2 = new PubSubPlusClientException("correlation key can't be null");
                } else if (!(obj instanceof CorrelationContext)) {
                    pubSubPlusClientException2 = new PubSubPlusClientException("Unknown correlation key type: " + obj.getClass().getCanonicalName());
                }
                CorrelationContext correlationContext = (CorrelationContext) obj;
                try {
                    this.replyMessageHandler.onMessage(bytesXMLMessage != null ? MessageReceiver.InboundMessageImpl.toInboundMessage(bytesXMLMessage) : null, correlationContext != null ? correlationContext.userContext : null, pubSubPlusClientException != null ? pubSubPlusClientException : pubSubPlusClientException2);
                } catch (Exception e) {
                    if (RequestReplyMessagePublisherImpl.logger.isWarnEnabled()) {
                        RequestReplyMessagePublisherImpl.logger.warn("Application code threw an unhandled exception while processing reply in ReplyMessageHandler", e);
                    }
                }
            } finally {
                this.parentPublisher.outstandingRequestsTracker.decrementOutstandingRequestCount();
            }
        }
    }

    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/publisher/RequestReplyMessagePublisherImpl$RequestReplyPublisherInfoImpl.class */
    private class RequestReplyPublisherInfoImpl implements ManageablePublisher.PublisherInfo {
        private RequestReplyPublisherInfoImpl() {
        }

        @Override // com.solace.messaging.util.ManageablePublisher.PublisherInfo
        public String getInstanceName() {
            return RequestReplyMessagePublisherImpl.this.instanceName;
        }

        @Override // com.solace.messaging.util.Identifiable
        public long getId() {
            return RequestReplyMessagePublisherImpl.this.id;
        }
    }

    public RequestReplyMessagePublisherImpl(MessagingServiceInternalView messagingServiceInternalView, TypedProperties typedProperties) {
        this.serviceInternalView = messagingServiceInternalView;
        this.publisherConfiguration = typedProperties;
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public void setTerminationNotificationListener(LifecycleControl.TerminationNotificationListener terminationNotificationListener) {
        this.terminationNotificationDispatcher.setTerminationNotificationListener(terminationNotificationListener);
    }

    @Override // com.solace.messaging.publisher.PublisherHealthCheck
    public boolean isReady() {
        return isRunning() && !this.gracefulShutdownInProgress;
    }

    @Override // com.solace.messaging.publisher.PublisherHealthCheck
    public void setPublisherReadinessListener(PublisherHealthCheck.PublisherReadinessListener publisherReadinessListener) {
        this.publisherReadinessListener = publisherReadinessListener;
    }

    @Override // com.solace.messaging.publisher.PublisherHealthCheck
    public void notifyWhenReady() {
        if (this.publisherReadinessListener == null) {
            logger.warn("Skip notification on a PublisherReadinessListener, listener is not set");
            return;
        }
        try {
            if (isReady()) {
                this.publisherReadinessListener.ready();
            } else {
                logger.debug("Skip notification on a PublisherReadinessListener, publisher is not ready");
            }
        } catch (Exception e) {
            logger.error("Client code in PublisherReadinessListener:ready() thrown an exception", e);
        }
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public boolean isRunning() {
        return STATE_STARTED == this.stateHolder.get();
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public boolean isTerminated() {
        return STATE_TERMINATED == this.stateHolder.get();
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public boolean isTerminating() {
        return this.gracefulShutdownInProgress;
    }

    @Override // com.solace.messaging.publisher.RequestReplyMessagePublisher, com.solace.messaging.util.LifecycleControl
    public RequestReplyMessagePublisher start() {
        if (STATE_STARTED == this.stateHolder.get()) {
            return this;
        }
        try {
            startAsync().get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Publisher start was canceled", e);
        } catch (CancellationException e2) {
            throw new PubSubPlusClientException.RequestInterruptedException("Publisher start was canceled", e2);
        } catch (ExecutionException e3) {
            Throwable cause = e3.getCause();
            if (cause != null) {
                if (cause instanceof PubSubPlusClientException) {
                    throw ((PubSubPlusClientException) cause);
                }
                if (cause instanceof IllegalStateException) {
                    throw ((IllegalStateException) cause);
                }
                throw new PubSubPlusClientException(cause);
            }
            if (logger.isErrorEnabled()) {
                logger.error(this.instanceName + " failed to start");
            }
        }
        return this;
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public void terminate(long j) throws PubSubPlusClientException.IncompleteMessageDeliveryException, PubSubPlusClientException, IllegalStateException {
        if (j == 0) {
            terminateNow();
            return;
        }
        try {
            terminateAsync(j).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Publisher termination was canceled", e);
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (cause == null) {
                throw new PubSubPlusClientException(e2);
            }
            if (cause instanceof PubSubPlusClientException) {
                throw ((PubSubPlusClientException) cause);
            }
            if (!(cause instanceof IllegalStateException)) {
                throw new PubSubPlusClientException(cause);
            }
            throw ((IllegalStateException) cause);
        }
    }

    @Override // com.solace.messaging.publisher.RequestReplyMessagePublisher, com.solace.messaging.util.AsyncLifecycleControl
    public <RequestReplyMessagePublisher> CompletableFuture<RequestReplyMessagePublisher> startAsync() throws PubSubPlusClientException {
        ExtendedCompletableFuture extendedCompletableFuture;
        if (!this.serviceInternalView.isConnected()) {
            throw new IllegalStateException("Publisher can't be started before it is connected to a messaging service");
        }
        if (this.stateHolder.get() == STATE_TERMINATED) {
            throw new IllegalStateException("Message publisher is already terminated");
        }
        do {
            extendedCompletableFuture = new ExtendedCompletableFuture();
            if (STATE_TERMINATED == this.stateHolder.get()) {
                return ExtendedCompletableFuture.failedFuture(new IllegalStateException("Publisher is already terminated"));
            }
            if (this.stateHolder.compareAndSet(STATE_NOT_STARTED, STATE_STARTED)) {
                if (logger.isDebugEnabled()) {
                    logger.debug(this.instanceName + " is being started");
                }
                try {
                    onStart();
                    extendedCompletableFuture.complete(this);
                    if (logger.isDebugEnabled()) {
                        logger.debug(this.instanceName + " is started");
                    }
                } catch (Exception e) {
                    logger.error(this.instanceName + " failed to start and is terminating", e);
                    onTerminate(null, this.postTerminationClearOutstandingRequestsSilentTask);
                    extendedCompletableFuture.completeExceptionally(new IllegalStateException("Publisher is already closed due to internal error"));
                }
                return ExtendedCompletableFuture.onCancellation(extendedCompletableFuture, (obj, th) -> {
                    this.stateHolder.set(STATE_TERMINATED);
                    onTerminate(null, this.postTerminationClearOutstandingRequestsSilentTask);
                    onTerminate(null, null);
                    if (logger.isDebugEnabled()) {
                        logger.debug(this.instanceName + " async start was canceled");
                    }
                });
            }
        } while (!(STATE_STARTED == this.stateHolder.get()));
        extendedCompletableFuture.complete(this);
        return extendedCompletableFuture;
    }

    @Override // com.solace.messaging.publisher.RequestReplyMessagePublisher, com.solace.messaging.util.AsyncLifecycleControl
    public <RequestReplyMessagePublisher> void startAsync(CompletionListener<RequestReplyMessagePublisher> completionListener) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(completionListener, "startListener can't be null");
        startAsync().whenComplete((obj, th) -> {
            Throwable cause;
            if (th == null) {
                cause = null;
            } else {
                try {
                    cause = th.getCause();
                } catch (Exception e) {
                    if (logger.isErrorEnabled()) {
                        logger.error(this.instanceName + " failed to start", e);
                        return;
                    }
                    return;
                }
            }
            completionListener.onCompletion(obj, cause);
        });
    }

    @Override // com.solace.messaging.util.AsyncLifecycleControl
    public CompletableFuture<Void> terminateAsync(long j) throws PubSubPlusClientException, IllegalStateException {
        Validation.smallerThanNumbersIllegal(1L, j, "Grace period < 1");
        ExtendedCompletableFuture extendedCompletableFuture = new ExtendedCompletableFuture();
        Task<RequestReplyMessagePublisherImpl> task = requestReplyMessagePublisherImpl -> {
            requestReplyMessagePublisherImpl.stateHolder.set(STATE_TERMINATED);
            requestReplyMessagePublisherImpl.gracefulShutdownInProgress = true;
        };
        AtomicInteger atomicInteger = new AtomicInteger(STATE_NOT_STARTED);
        Task<RequestReplyMessagePublisherImpl> task2 = requestReplyMessagePublisherImpl2 -> {
            try {
                this.outstandingRequestsTracker.awaitTermination(j);
                int outStandingRequests = this.outstandingRequestsTracker.getOutStandingRequests();
                boolean z = outStandingRequests > 0;
                requestReplyMessagePublisherImpl2.gracefulShutdownInProgress = false;
                if (z) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(requestReplyMessagePublisherImpl2.instanceName + " gracefully terminated before all messages were processed, not sufficient grace period of " + j);
                    }
                    atomicInteger.set(outStandingRequests);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new PubSubPlusClientException.RequestInterruptedException("Graceful termination of " + requestReplyMessagePublisherImpl2.instanceName + " was interrupted", e);
            }
        };
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " is being terminated");
        }
        onTerminate(task, task2);
        int i = atomicInteger.get();
        if (i > 0) {
            this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED, i);
            extendedCompletableFuture.completeExceptionally(new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %d messages could not be completed due to expiration of a grace period", Integer.valueOf(i)), i));
        } else {
            extendedCompletableFuture.complete(null);
        }
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " is terminated");
        }
        return extendedCompletableFuture;
    }

    @Override // com.solace.messaging.util.AsyncLifecycleControl
    public void terminateAsync(CompletionListener<Void> completionListener, long j) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(completionListener, "Termination listener can't be null");
        terminateAsync(j).whenComplete((r6, th) -> {
            Throwable cause;
            if (th == null) {
                cause = null;
            } else {
                try {
                    cause = th.getCause();
                } catch (Exception e) {
                    logger.warn(this.instanceName + "  encountered problem during termination.", e);
                    return;
                }
            }
            completionListener.onCompletion(null, cause);
        });
    }

    @Override // com.solace.messaging.publisher.RequestReplyMessagePublisher
    public void publish(OutboundMessage outboundMessage, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler, Topic topic, long j) {
        publish(outboundMessage, replyMessageHandler, null, topic, j);
    }

    @Override // com.solace.messaging.publisher.RequestReplyMessagePublisher
    public void publish(OutboundMessage outboundMessage, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler, Object obj, Topic topic, long j) {
        publish(outboundMessage, null, replyMessageHandler, obj, topic, j);
    }

    @Override // com.solace.messaging.publisher.RequestReplyMessagePublisher
    public void publish(OutboundMessage outboundMessage, Properties properties, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler, Object obj, Topic topic, long j) {
        Validation.nullIllegal(outboundMessage, "requestMessage can't be null");
        Validation.nullIllegal(replyMessageHandler, "replyMessageHandler can't be null");
        Validation.nullIllegal(topic, "requestDestination can't be null");
        Validation.smallerThanNumbersIllegal(1L, j, "replyTimeout < 1");
        validatePublisher();
        publishExternalMessage(outboundMessage, properties, replyMessageHandler, obj, topic, j);
    }

    @Override // com.solace.messaging.publisher.RequestReplyMessagePublisher
    public InboundMessage publishAwaitResponse(OutboundMessage outboundMessage, Topic topic, long j) throws PubSubPlusClientException.TimeoutException, PubSubPlusClientException.MessageRejectedByBrokerException, PubSubPlusClientException.PublisherOverflowException, PubSubPlusClientException, InterruptedException, IllegalArgumentException {
        return publishAwaitResponse(outboundMessage, null, topic, j);
    }

    @Override // com.solace.messaging.publisher.RequestReplyMessagePublisher
    public InboundMessage publishAwaitResponse(OutboundMessage outboundMessage, Properties properties, Topic topic, long j) throws PubSubPlusClientException.TimeoutException, PubSubPlusClientException.MessageRejectedByBrokerException, PubSubPlusClientException.PublisherOverflowException, PubSubPlusClientException, InterruptedException, IllegalArgumentException {
        Validation.nullIllegal(outboundMessage, "requestMessage can't be null");
        Validation.nullIllegal(topic, "requestDestination can't be null");
        Validation.smallerThanNumbersIllegal(1L, j, "replyTimeout < 1");
        BytesXMLMessage byteMessage = OutboundMessageBuilder.OutboundMessageBuilderImpl.OutboundMessageImpl.toByteMessage((properties == null || properties.isEmpty()) ? outboundMessage : OutboundMessageBuilder.deepCopy(outboundMessage, properties));
        try {
            try {
                this.outstandingRequestsTracker.incrementOutstandingRequestCount();
                validatePublisher();
                InboundMessage inboundMessage = MessageReceiver.InboundMessageImpl.toInboundMessage(this.requestor.request(byteMessage, j, topic));
                this.outstandingRequestsTracker.decrementOutstandingRequestCount();
                return inboundMessage;
            } catch (Exception e) {
                if (e instanceof ClosedFacilityException) {
                    throw new PubSubPlusClientException(this.instanceName + " publisher is down.", e);
                }
                if (e instanceof JCSMPInterruptedException) {
                    throw new PubSubPlusClientException.RequestInterruptedException(e.getMessage(), e);
                }
                if (e instanceof JCSMPRequestTimeoutException) {
                    throw new PubSubPlusClientException.TimeoutException(e.getMessage());
                }
                throw new PubSubPlusClientException(e);
            }
        } catch (Throwable th) {
            this.outstandingRequestsTracker.decrementOutstandingRequestCount();
            throw th;
        }
    }

    @Override // com.solace.messaging.util.ManageablePublisher
    public ManageablePublisher.PublisherInfo publisherInfo() {
        return this.publisherInfo;
    }

    @Internal
    void publishExternalMessage(OutboundMessage outboundMessage, Properties properties, RequestReplyMessagePublisher.ReplyMessageHandler replyMessageHandler, Object obj, Topic topic, long j) {
        OutboundMessage deepCopy = (properties == null || properties.isEmpty()) ? outboundMessage : OutboundMessageBuilder.deepCopy(outboundMessage, properties);
        CorrelationContext correlationContext = new CorrelationContext(this.messageKeyProvider.nextLongKey(), obj, deepCopy, replyMessageHandler);
        OutboundMessageBuilder.OutboundMessageBuilderImpl.injectCorrelationKey(deepCopy, correlationContext);
        BytesXMLMessage byteMessage = OutboundMessageBuilder.OutboundMessageBuilderImpl.OutboundMessageImpl.toByteMessage(deepCopy);
        try {
            this.outstandingRequestsTracker.incrementOutstandingRequestCount();
            validatePublisher();
            this.requestor.request(byteMessage, new RequestReplyListenerImpl(replyMessageHandler, this), j, topic, correlationContext);
        } catch (IllegalArgumentException e) {
            this.outstandingRequestsTracker.decrementOutstandingRequestCount();
            throw e;
        } catch (Exception e2) {
            this.outstandingRequestsTracker.decrementOutstandingRequestCount();
            this.errorNotificationDispatcher.onException(PublisherBuffers.Publishable.of(deepCopy, topic), e2, obj, replyMessageHandler);
        }
    }

    @Internal
    XMLMessageProducer createSessionDefaultProducer(JCSMPStreamingPublishEventHandler jCSMPStreamingPublishEventHandler) throws PubSubPlusClientException {
        try {
            ClientSession clientSession = this.serviceInternalView.getClientSession();
            if (clientSession.getDefaultProducer() == null) {
                clientSession.getMessageProducer(jCSMPStreamingPublishEventHandler);
            }
            return clientSession.getDefaultProducer();
        } catch (Exception e) {
            throw new PubSubPlusClientException("Failed to create message publisher", e);
        }
    }

    @Internal
    Requestor createRequester() throws PubSubPlusClientException {
        try {
            return this.serviceInternalView.getClientSession().createRequestor();
        } catch (Exception e) {
            throw new PubSubPlusClientException("Failed to create message publisher", e);
        }
    }

    @Internal
    void validatePublisher() {
        if (isTerminating() || isTerminated()) {
            throw new IllegalStateException("Message publisher was terminated");
        }
        if (!isRunning()) {
            throw new IllegalStateException("Message publisher not started");
        }
    }

    @Internal
    void onStart() throws PubSubPlusClientException {
        this.serviceInternalView.getClientSession().addServiceInterruptionListener(this.serviceInterruptionListener);
        this.serviceInternalView.getClientSession().addClientSessionStateListener(this.closedSessionListener);
        this.sessionDefaultProducer = createSessionDefaultProducer(this.publishEventHandler);
        this.requestor = createRequester();
        this.serviceInternalView.enableConsumerAPI();
    }

    @Internal
    void terminateNow() throws PubSubPlusClientException.IncompleteMessageDeliveryException, PubSubPlusClientException, IllegalStateException {
        this.stateHolder.set(STATE_TERMINATED);
        AtomicInteger atomicInteger = new AtomicInteger(STATE_NOT_STARTED);
        try {
            logger.debug(this.instanceName + " is being non gracefully terminated");
            onTerminate(null, postTerminateNowTask, atomicInteger);
            logger.debug(this.instanceName + " is terminated");
            int i = atomicInteger.get();
            if (i > 0) {
                throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %d messages could not be completed due to non graceful termination", Integer.valueOf(i)), i);
            }
        } catch (Throwable th) {
            int i2 = atomicInteger.get();
            if (i2 <= 0) {
                throw th;
            }
            throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %d messages could not be completed due to non graceful termination", Integer.valueOf(i2)), i2);
        }
    }

    @Internal
    void onTerminate(Task<RequestReplyMessagePublisherImpl> task, Task<RequestReplyMessagePublisherImpl> task2) {
        if (task != null) {
            try {
                task.run(this);
            } finally {
                this.terminationNotificationDispatcher.close();
                this.errorNotificationDispatcher.close();
                if (task2 != null) {
                    task2.run(this);
                }
            }
        }
        this.stateHolder.set(STATE_TERMINATED);
        this.serviceInternalView.getClientSession().removeServiceInterruptionListener(this.serviceInterruptionListener);
        this.serviceInternalView.getClientSession().removeClientSessionStateListener(this.closedSessionListener);
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " publisher is shutdown");
        }
    }

    @Internal
    <C> void onTerminate(Task<RequestReplyMessagePublisherImpl> task, BiTask<RequestReplyMessagePublisherImpl, C> biTask, C c) {
        if (task != null) {
            try {
                task.run(this);
            } finally {
                this.terminationNotificationDispatcher.close();
                this.errorNotificationDispatcher.close();
                if (biTask != null) {
                    biTask.run(this, c);
                }
            }
        }
        this.stateHolder.set(STATE_TERMINATED);
        this.serviceInternalView.getClientSession().removeServiceInterruptionListener(this.serviceInterruptionListener);
        this.serviceInternalView.getClientSession().removeClientSessionStateListener(this.closedSessionListener);
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " publisher is shutdown");
        }
    }
}
