package com.solace.messaging.publisher;

import com.solace.messaging.MessagingService;
import com.solace.messaging.PubSubPlusClientException;
import com.solace.messaging.publisher.OutboundMessageBuilder;
import com.solace.messaging.publisher.PersistentMessagePublisher;
import com.solace.messaging.publisher.PublisherBuffers;
import com.solace.messaging.publisher.PublisherHealthCheck;
import com.solace.messaging.resources.Topic;
import com.solace.messaging.util.CompletionListener;
import com.solace.messaging.util.LifecycleControl;
import com.solace.messaging.util.Manageable;
import com.solace.messaging.util.ManageablePublisher;
import com.solace.messaging.util.PublisherCongestionNotificationDispatcher;
import com.solace.messaging.util.TypedProperties;
import com.solace.messaging.util.async.DiscardOldestConcurrentBuffer;
import com.solace.messaging.util.async.ExtendedCompletableFuture;
import com.solace.messaging.util.async.ThreadFactories;
import com.solace.messaging.util.async.ToggleLatch;
import com.solace.messaging.util.internal.BiTask;
import com.solace.messaging.util.internal.ClientSession;
import com.solace.messaging.util.internal.Internal;
import com.solace.messaging.util.internal.MessagingServiceInternalView;
import com.solace.messaging.util.internal.Task;
import com.solace.messaging.util.internal.TerminationEventImpl;
import com.solace.messaging.util.internal.TerminationNotificationDispatcher;
import com.solace.messaging.util.internal.TriTask;
import com.solace.messaging.util.internal.Validation;
import com.solacesystems.jcsmp.ClosedFacilityException;
import com.solacesystems.jcsmp.DeliveryMode;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPProducerEventHandler;
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler;
import com.solacesystems.jcsmp.ProducerEventArgs;
import com.solacesystems.jcsmp.ProducerFlowProperties;
import com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer;
import com.solacesystems.jcsmp.statistics.StatType;
import java.io.Serializable;
import java.time.Instant;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.annotation.versioning.ProviderType;

@Internal
@ProviderType
/* loaded from: input_file:com/solace/messaging/publisher/PersistentMessagePublisherImpl.class */
public class PersistentMessagePublisherImpl implements PersistentMessagePublisher {
    private volatile JCSMPXMLMessageProducer producer;
    private final MessagingServiceInternalView serviceInternalView;
    private final TypedProperties publisherConfiguration;
    private final OutboundMessageBuilder messageBuilder;
    private final PublisherBuffers.PublisherBuffer<Topic> buffer;
    private final DiscardOldestConcurrentBuffer<Long> inFlightMessageIdBuffer;
    static final int STATE_NOT_STARTED = 0;
    static final int STATE_STARTED = 1;
    static final int STATE_TERMINATED = 2;
    private static final AtomicLong instanceIdGenerator = new AtomicLong(0);
    private static final Log logger = LogFactory.getLog(PersistentMessagePublisherImpl.class);
    private static final TriTask<PersistentMessagePublisherImpl, Long, AtomicInteger> postTerminationTerminateAsyncTask = (persistentMessagePublisherImpl, l, atomicInteger) -> {
        long nanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(l.longValue());
        try {
            try {
                boolean awaitEmpty = persistentMessagePublisherImpl.buffer.awaitEmpty(l.longValue(), TimeUnit.MILLISECONDS);
                long nanoTime2 = nanoTime - System.nanoTime();
                if (nanoTime2 <= 0) {
                    persistentMessagePublisherImpl.gracefulShutdownInProgress = false;
                    if (!awaitEmpty || !persistentMessagePublisherImpl.inFlightMessageIdBuffer.isEmpty()) {
                        if (logger.isWarnEnabled()) {
                            logger.warn(persistentMessagePublisherImpl.instanceName + " gracefully terminated before all buffered messages were processed, not sufficient grace period of " + l);
                        }
                        atomicInteger.set(persistentMessagePublisherImpl.buffer.size() + persistentMessagePublisherImpl.inFlightMessageIdBuffer.size());
                    }
                } else {
                    boolean awaitEmpty2 = persistentMessagePublisherImpl.inFlightMessageIdBuffer.awaitEmpty(nanoTime2, TimeUnit.NANOSECONDS);
                    persistentMessagePublisherImpl.gracefulShutdownInProgress = false;
                    if (!awaitEmpty || !awaitEmpty2) {
                        if (logger.isWarnEnabled()) {
                            logger.warn(persistentMessagePublisherImpl.instanceName + " gracefully terminated before all all broker acks/nacks were received, not sufficient grace period of " + l);
                        }
                        atomicInteger.set(persistentMessagePublisherImpl.buffer.size() + persistentMessagePublisherImpl.inFlightMessageIdBuffer.size());
                    }
                }
                persistentMessagePublisherImpl.gracefulShutdownInProgress = false;
                persistentMessagePublisherImpl.emptyBuffer(persistentMessagePublisherImpl.buffer, persistentMessagePublisherImpl.inFlightMessageIdBuffer);
                try {
                    if (persistentMessagePublisherImpl.producer != null) {
                        persistentMessagePublisherImpl.producer.close();
                    }
                } catch (Exception e) {
                    logger.warn("Problem with closing underlying IO after termination of " + persistentMessagePublisherImpl.instanceName, e);
                }
                if (persistentMessagePublisherImpl.publisherBotExecutorService.isShutdown()) {
                    return;
                }
                try {
                    persistentMessagePublisherImpl.publisherBotExecutorService.shutdown();
                } catch (Exception e2) {
                    logger.warn("Problem with shutdown of executor service after termination of " + persistentMessagePublisherImpl.instanceName, e2);
                }
            } catch (Throwable th) {
                persistentMessagePublisherImpl.gracefulShutdownInProgress = false;
                persistentMessagePublisherImpl.emptyBuffer(persistentMessagePublisherImpl.buffer, persistentMessagePublisherImpl.inFlightMessageIdBuffer);
                try {
                    if (persistentMessagePublisherImpl.producer != null) {
                        persistentMessagePublisherImpl.producer.close();
                    }
                } catch (Exception e3) {
                    logger.warn("Problem with closing underlying IO after termination of " + persistentMessagePublisherImpl.instanceName, e3);
                }
                if (!persistentMessagePublisherImpl.publisherBotExecutorService.isShutdown()) {
                    try {
                        persistentMessagePublisherImpl.publisherBotExecutorService.shutdown();
                    } catch (Exception e4) {
                        logger.warn("Problem with shutdown of executor service after termination of " + persistentMessagePublisherImpl.instanceName, e4);
                    }
                }
                throw th;
            }
        } catch (PubSubPlusClientException.RequestInterruptedException e5) {
            if (logger.isWarnEnabled()) {
                logger.warn("Graceful termination of " + persistentMessagePublisherImpl.instanceName + " was interrupted");
            }
            persistentMessagePublisherImpl.gracefulShutdownInProgress = false;
            persistentMessagePublisherImpl.emptyBuffer(persistentMessagePublisherImpl.buffer, persistentMessagePublisherImpl.inFlightMessageIdBuffer);
            try {
                if (persistentMessagePublisherImpl.producer != null) {
                    persistentMessagePublisherImpl.producer.close();
                }
            } catch (Exception e6) {
                logger.warn("Problem with closing underlying IO after termination of " + persistentMessagePublisherImpl.instanceName, e6);
            }
            if (persistentMessagePublisherImpl.publisherBotExecutorService.isShutdown()) {
                return;
            }
            try {
                persistentMessagePublisherImpl.publisherBotExecutorService.shutdown();
            } catch (Exception e7) {
                logger.warn("Problem with shutdown of executor service after termination of " + persistentMessagePublisherImpl.instanceName, e7);
            }
        }
    };
    private static final BiTask<PersistentMessagePublisherImpl, AtomicInteger> postTerminateNowTask = (persistentMessagePublisherImpl, atomicInteger) -> {
        try {
            int size = persistentMessagePublisherImpl.buffer.size() + persistentMessagePublisherImpl.inFlightMessageIdBuffer.size();
            boolean z = size < STATE_STARTED;
            persistentMessagePublisherImpl.emptyBuffer(persistentMessagePublisherImpl.buffer, persistentMessagePublisherImpl.inFlightMessageIdBuffer);
            if (!z) {
                atomicInteger.set(size);
                persistentMessagePublisherImpl.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED, size);
                if (logger.isWarnEnabled()) {
                    logger.warn(persistentMessagePublisherImpl.instanceName + " non-gracefully terminated before all buffered messages were processed.");
                }
            }
        } catch (PubSubPlusClientException.RequestInterruptedException e) {
            if (logger.isWarnEnabled()) {
                logger.warn("Non-graceful termination of " + persistentMessagePublisherImpl.instanceName + " was interrupted");
            }
        }
        try {
            if (persistentMessagePublisherImpl.producer != null) {
                persistentMessagePublisherImpl.producer.close();
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            if (logger.isWarnEnabled()) {
                logger.warn("Problem closing underlying IO during termination of " + persistentMessagePublisherImpl.instanceName, e2);
            }
        }
        if (persistentMessagePublisherImpl.publisherBotExecutorService.isShutdown()) {
            return;
        }
        try {
            persistentMessagePublisherImpl.publisherBotExecutorService.shutdown();
        } catch (Exception e3) {
            if (logger.isWarnEnabled()) {
                logger.warn("Problem shutting down executor service during termination of " + persistentMessagePublisherImpl.instanceName, e3);
            }
        }
    };
    private volatile boolean gracefulShutdownInProgress = false;
    final AtomicInteger stateHolder = new AtomicInteger(STATE_NOT_STARTED);
    private final long id = instanceIdGenerator.incrementAndGet();
    private final String instanceName = "PersistentMessagePublisher@" + this.id;
    private final ManageablePublisher.PersistentPublisherInfo publisherInfo = new PersistentPublisherInfoImpl();
    private final ExecutorService publisherBotExecutorService = Executors.newSingleThreadExecutor(new ThreadFactories.NamedDaemonThreadFactory(this.instanceName + "-message-dispatcher"));
    private final MessageCorrelationKeyProvider messageKeyProvider = new MessageCorrelationKeyProvider();
    private final TerminationNotificationDispatcher terminationNotificationDispatcher = new TerminationNotificationDispatcher();
    private final MessagingService.ServiceInterruptionListener serviceInterruptionListener = new MessagingService.ServiceInterruptionListener() { // from class: com.solace.messaging.publisher.PersistentMessagePublisherImpl.1
        @Override // com.solace.messaging.MessagingService.ServiceInterruptionListener
        public void onServiceInterrupted(MessagingService.ServiceEvent serviceEvent) {
            if (PersistentMessagePublisherImpl.logger.isWarnEnabled()) {
                PersistentMessagePublisherImpl.logger.warn("Shutting down publisher due to Service interruption");
            }
            if (PersistentMessagePublisherImpl.this.stateHolder.getAndSet(PersistentMessagePublisherImpl.STATE_TERMINATED) < PersistentMessagePublisherImpl.STATE_TERMINATED) {
                PersistentMessagePublisherImpl.this.terminationNotificationDispatcher.onTermination(new TerminationEventImpl(serviceEvent.getTimestamp(), serviceEvent.getMessage(), serviceEvent.getCause()));
                PersistentMessagePublisherImpl.this.onTerminate(null, PersistentMessagePublisherImpl.this.postTerminationClearBufferSilentTask);
            }
        }
    };
    private final ClientSession.ClientSessionStateListener closedSessionListener = new ClientSession.ClientSessionStateListener() { // from class: com.solace.messaging.publisher.PersistentMessagePublisherImpl.2
        @Override // com.solace.messaging.util.internal.ClientSession.ClientSessionStateListener
        public void onClientSessionStateChange(ClientSession.ClientSessionStateChangeEvent clientSessionStateChangeEvent) {
            if (PersistentMessagePublisherImpl.logger.isWarnEnabled()) {
                PersistentMessagePublisherImpl.logger.info("Shutting down publisher due to service closure");
            }
            if (PersistentMessagePublisherImpl.this.stateHolder.getAndSet(PersistentMessagePublisherImpl.STATE_TERMINATED) < PersistentMessagePublisherImpl.STATE_TERMINATED) {
                PersistentMessagePublisherImpl.this.terminationNotificationDispatcher.onTermination(new TerminationEventImpl(clientSessionStateChangeEvent.getTimestamp(), clientSessionStateChangeEvent.getMessage(), clientSessionStateChangeEvent.getCause()));
                PersistentMessagePublisherImpl.this.onTerminate(null, PersistentMessagePublisherImpl.this.postTerminationClearBufferSilentTask);
            }
        }
    };
    private final PublisherNotificationDispatcher notificationDispatcher = new PublisherNotificationDispatcher();
    private final JCSMPStreamingPublishCorrelatingEventHandler messageCorrelationHandler = new JCSMPStreamingPublishCorrelatingEventHandler() { // from class: com.solace.messaging.publisher.PersistentMessagePublisherImpl.3
        public void handleError(String str, JCSMPException jCSMPException, long j) {
        }

        public void responseReceived(String str) {
        }

        public void responseReceivedEx(Object obj) {
            if (PersistentMessagePublisherImpl.this.isRunning() || PersistentMessagePublisherImpl.this.gracefulShutdownInProgress) {
                PersistentMessagePublisherImpl.this.notificationDispatcher.onDeliveryConfirmation(obj);
            } else if (PersistentMessagePublisherImpl.logger.isWarnEnabled()) {
                PersistentMessagePublisherImpl.logger.warn(PersistentMessagePublisherImpl.this.instanceName + " received 'ack' message response from a broker after termination");
            }
        }

        public void handleErrorEx(Object obj, JCSMPException jCSMPException, long j) {
            if (PersistentMessagePublisherImpl.this.isRunning() || PersistentMessagePublisherImpl.this.gracefulShutdownInProgress) {
                PersistentMessagePublisherImpl.this.notificationDispatcher.onException(obj, jCSMPException, j);
            } else if (PersistentMessagePublisherImpl.logger.isWarnEnabled()) {
                PersistentMessagePublisherImpl.logger.warn(PersistentMessagePublisherImpl.this.instanceName + " received 'nack' message response from a broker after termination", jCSMPException);
            }
        }
    };
    private final JCSMPProducerEventHandler publisherEventHandler = new JCSMPProducerEventHandler() { // from class: com.solace.messaging.publisher.PersistentMessagePublisherImpl.4
        public void handleEvent(ProducerEventArgs producerEventArgs) {
        }
    };
    private final Consumer<PublisherBuffers.Publishable<Topic>> bufferCleaningAction = publishable -> {
        this.notificationDispatcher.onException(publishable.getMessage().getCorrelationKey(), new IllegalStateException("Message publisher is already terminated"), Instant.now().toEpochMilli());
    };
    private final Task<PersistentMessagePublisherImpl> postTerminationClearBufferSilentTask = persistentMessagePublisherImpl -> {
        long size = persistentMessagePublisherImpl.buffer.size() + persistentMessagePublisherImpl.inFlightMessageIdBuffer.size();
        try {
            persistentMessagePublisherImpl.buffer.close(this.bufferCleaningAction);
        } catch (PubSubPlusClientException.RequestInterruptedException e) {
            if (logger.isWarnEnabled()) {
                logger.warn("Non-graceful termination of " + persistentMessagePublisherImpl.instanceName + " was interrupted");
            }
        }
        persistentMessagePublisherImpl.inFlightMessageIdBuffer.clear();
        persistentMessagePublisherImpl.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED, size);
        if (!this.publisherBotExecutorService.isShutdown()) {
            this.publisherBotExecutorService.shutdown();
        }
        try {
            if (this.producer != null && !this.producer.isClosed()) {
                this.producer.close();
            }
        } catch (Exception e2) {
            logger.warn("Problem with closing underlying IO after termination of " + this.instanceName, e2);
        }
    };
    private final PublisherCongestionNotificationDispatcher bufferCongestionNotificationDispatcher = new PublisherCongestionNotificationDispatcher(this);

    /* JADX INFO: Access modifiers changed from: package-private */
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/publisher/PersistentMessagePublisherImpl$BlockingCorrelationContext.class */
    public static class BlockingCorrelationContext extends CorrelationContext {
        private static final long serialVersionUID = -7494601208708091659L;
        private volatile ToggleLatch locker;

        BlockingCorrelationContext(Long l, Object obj, OutboundMessage outboundMessage) {
            super(l, obj, outboundMessage);
            this.locker = new ToggleLatch();
        }

        public boolean lock(long j, TimeUnit timeUnit) throws InterruptedException {
            if (this.locker == null) {
                throw new IllegalStateException("Thread can't be locked after context is cleared");
            }
            ToggleLatch toggleLatch = this.locker;
            if (toggleLatch == null) {
                throw new IllegalStateException("Thread can't be locked after context is cleared");
            }
            toggleLatch.lock();
            return toggleLatch.await(j, timeUnit);
        }

        public void unlock() {
            ToggleLatch toggleLatch;
            if (this.locker == null || (toggleLatch = this.locker) == null) {
                return;
            }
            toggleLatch.open();
        }

        @Override // com.solace.messaging.publisher.PersistentMessagePublisherImpl.CorrelationContext
        public void clear() {
            super.clear();
            this.locker.open();
            this.locker = null;
        }

        @Override // com.solace.messaging.publisher.PersistentMessagePublisherImpl.CorrelationContext
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return (obj instanceof BlockingCorrelationContext) && super.equals(obj);
        }

        @Override // com.solace.messaging.publisher.PersistentMessagePublisherImpl.CorrelationContext
        public int hashCode() {
            return super.hashCode();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/publisher/PersistentMessagePublisherImpl$CorrelationContext.class */
    public static class CorrelationContext implements Serializable {
        private static final long serialVersionUID = 426395239935681793L;
        private final Long correlationKey;
        private volatile Object userContext;
        private volatile PubSubPlusClientException exception;
        private volatile OutboundMessage linkedMessage;

        CorrelationContext(Long l, Object obj, OutboundMessage outboundMessage) {
            this.correlationKey = l;
            this.userContext = obj;
            this.linkedMessage = outboundMessage;
        }

        public Long getCorrelationKey() {
            return this.correlationKey;
        }

        public Object getUserContext() {
            return this.userContext;
        }

        public PubSubPlusClientException getException() {
            return this.exception;
        }

        public void setException(PubSubPlusClientException pubSubPlusClientException) {
            this.exception = pubSubPlusClientException;
        }

        public OutboundMessage getLinkedMessage() {
            return this.linkedMessage;
        }

        public void clear() {
            this.linkedMessage = null;
            this.exception = null;
            this.userContext = null;
            this.userContext = null;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            CorrelationContext correlationContext = (CorrelationContext) obj;
            return this.correlationKey != null ? this.correlationKey.equals(correlationContext.correlationKey) : correlationContext.correlationKey == null;
        }

        public int hashCode() {
            return this.correlationKey != null ? this.correlationKey.hashCode() : PersistentMessagePublisherImpl.STATE_NOT_STARTED;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/publisher/PersistentMessagePublisherImpl$MessageCorrelationKeyProvider.class */
    public static class MessageCorrelationKeyProvider implements Serializable {
        final AtomicLong messageKeyProvider = new AtomicLong(ThreadLocalRandom.current().nextLong(1, 1000));

        MessageCorrelationKeyProvider() {
        }

        Long nextLongKey() {
            return Long.valueOf(this.messageKeyProvider.incrementAndGet());
        }
    }

    @ProviderType
    /* loaded from: input_file:com/solace/messaging/publisher/PersistentMessagePublisherImpl$PersistentPublisherInfoImpl.class */
    private class PersistentPublisherInfoImpl implements ManageablePublisher.PersistentPublisherInfo {
        private PersistentPublisherInfoImpl() {
        }

        @Override // com.solace.messaging.util.Identifiable
        public long getId() {
            return PersistentMessagePublisherImpl.this.id;
        }

        @Override // com.solace.messaging.util.ManageablePublisher.PublisherInfo
        public String getInstanceName() {
            return PersistentMessagePublisherImpl.this.instanceName;
        }
    }

    @ProviderType
    /* loaded from: input_file:com/solace/messaging/publisher/PersistentMessagePublisherImpl$PropertyConfiguration.class */
    static class PropertyConfiguration extends JCSMPProperties {
        private static final long serialVersionUID = 7052052844320400643L;

        PropertyConfiguration(Map<String, Object> map) {
            super(map);
        }
    }

    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/publisher/PersistentMessagePublisherImpl$PublisherNotificationDispatcher.class */
    private class PublisherNotificationDispatcher {
        private volatile PersistentMessagePublisher.MessagePublishReceiptListener messageDeliveryListener;

        private PublisherNotificationDispatcher() {
        }

        public void onDeliveryConfirmation(Object obj) {
            if (obj == null) {
                return;
            }
            if (obj instanceof BlockingCorrelationContext) {
                handleSyncDeliveryConfirmation((BlockingCorrelationContext) obj);
            } else if (obj instanceof CorrelationContext) {
                handleAsyncDeliveryConfirmation((CorrelationContext) obj);
            } else {
                PersistentMessagePublisherImpl.logger.error(PersistentMessagePublisherImpl.this.instanceName + "received unknown type of message correlation object with message acknowledgement: " + obj);
            }
        }

        public void onException(Object obj, Exception exc, long j) {
            if (exc == null || obj == null) {
                return;
            }
            PubSubPlusClientException mapException = mapException(exc);
            if (obj instanceof BlockingCorrelationContext) {
                handleSyncDeliveryError((BlockingCorrelationContext) obj, mapException);
            } else if (obj instanceof CorrelationContext) {
                handleAsyncDeliveryError((CorrelationContext) obj, mapException, j);
            } else {
                PersistentMessagePublisherImpl.logger.error(PersistentMessagePublisherImpl.this.instanceName + "received unknown type of message correlation object with message acknowledgement: " + obj);
            }
        }

        void handleAsyncDeliveryConfirmation(CorrelationContext correlationContext) {
            cleanInFlightMessageIdBuffer(correlationContext);
            PersistentMessagePublisher.MessagePublishReceiptListener messagePublishReceiptListener = this.messageDeliveryListener;
            try {
                if (messagePublishReceiptListener == null) {
                    return;
                }
                try {
                    messagePublishReceiptListener.onPublishReceipt(new PersistentMessagePublisher.PublishReceipt(correlationContext.getLinkedMessage(), null, Instant.now().toEpochMilli(), true, correlationContext.getUserContext()));
                    correlationContext.clear();
                } catch (Exception e) {
                    PersistentMessagePublisherImpl.logger.error("Application code throw an unhandled exception by processing of Publish Receipt notification", e);
                    correlationContext.clear();
                }
            } catch (Throwable th) {
                correlationContext.clear();
                throw th;
            }
        }

        void handleSyncDeliveryConfirmation(BlockingCorrelationContext blockingCorrelationContext) {
            cleanInFlightMessageIdBuffer(blockingCorrelationContext);
            blockingCorrelationContext.locker.open();
        }

        void handleAsyncDeliveryError(CorrelationContext correlationContext, PubSubPlusClientException pubSubPlusClientException, long j) {
            cleanInFlightMessageIdBuffer(correlationContext);
            PersistentMessagePublisher.MessagePublishReceiptListener messagePublishReceiptListener = this.messageDeliveryListener;
            try {
                if (messagePublishReceiptListener == null) {
                    return;
                }
                try {
                    messagePublishReceiptListener.onPublishReceipt(new PersistentMessagePublisher.PublishReceipt(correlationContext.getLinkedMessage(), pubSubPlusClientException, j, false, correlationContext.getUserContext()));
                    correlationContext.clear();
                } catch (Exception e) {
                    PersistentMessagePublisherImpl.logger.error("Application code throw an unhandled exception by processing of Publish Receipt notification", e);
                    correlationContext.clear();
                }
            } catch (Throwable th) {
                correlationContext.clear();
                throw th;
            }
        }

        void handleSyncDeliveryError(BlockingCorrelationContext blockingCorrelationContext, PubSubPlusClientException pubSubPlusClientException) {
            cleanInFlightMessageIdBuffer(blockingCorrelationContext);
            blockingCorrelationContext.setException(pubSubPlusClientException);
            blockingCorrelationContext.locker.open();
        }

        void cleanInFlightMessageIdBuffer(CorrelationContext correlationContext) {
            PersistentMessagePublisherImpl.this.inFlightMessageIdBuffer.remove(correlationContext.getCorrelationKey());
        }

        void setMessageDeliveryListener(PersistentMessagePublisher.MessagePublishReceiptListener messagePublishReceiptListener) {
            this.messageDeliveryListener = messagePublishReceiptListener;
        }

        PubSubPlusClientException mapException(Exception exc) {
            return exc instanceof PubSubPlusClientException ? (PubSubPlusClientException) exc : new PubSubPlusClientException(exc);
        }
    }

    public PersistentMessagePublisherImpl(MessagingServiceInternalView messagingServiceInternalView, TypedProperties typedProperties, OutboundMessageBuilder outboundMessageBuilder) {
        this.serviceInternalView = messagingServiceInternalView;
        this.publisherConfiguration = typedProperties;
        this.buffer = PublisherBuffers.createBuffer(this.publisherConfiguration, this.serviceInternalView.getApiMetricsCollector());
        this.inFlightMessageIdBuffer = createInFlightMessageIdBuffer(this.publisherConfiguration);
        this.messageBuilder = configureMessageBuilder(outboundMessageBuilder);
        this.buffer.setBufferCongestionMonitor(this.bufferCongestionNotificationDispatcher, STATE_STARTED);
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public boolean isRunning() {
        return STATE_STARTED == this.stateHolder.get();
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public boolean isTerminated() {
        return STATE_TERMINATED == this.stateHolder.get();
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public boolean isTerminating() {
        return this.gracefulShutdownInProgress;
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public void setTerminationNotificationListener(LifecycleControl.TerminationNotificationListener terminationNotificationListener) {
        this.terminationNotificationDispatcher.setTerminationNotificationListener(terminationNotificationListener);
    }

    @Override // com.solace.messaging.publisher.PersistentMessagePublisher
    public void setMessagePublishReceiptListener(PersistentMessagePublisher.MessagePublishReceiptListener messagePublishReceiptListener) {
        this.notificationDispatcher.setMessageDeliveryListener(messagePublishReceiptListener);
    }

    @Override // com.solace.messaging.publisher.PersistentMessagePublisher
    public void publish(byte[] bArr, Topic topic) throws PubSubPlusClientException, IllegalStateException, IllegalArgumentException, PubSubPlusClientException.PublisherOverflowException {
        publish(bArr, topic, (Object) null);
    }

    @Override // com.solace.messaging.publisher.PersistentMessagePublisher
    public void publish(byte[] bArr, Topic topic, Object obj) throws PubSubPlusClientException, IllegalStateException, IllegalArgumentException, PubSubPlusClientException.PublisherOverflowException {
        validatePublisher();
        Validation.nullIllegal(bArr, "Message array can't be null");
        Validation.nullIllegal(topic, "Message destination can't be null");
        publishInternalMessage(this.messageBuilder.build(bArr), topic, null, obj);
    }

    @Override // com.solace.messaging.publisher.PersistentMessagePublisher
    public void publish(String str, Topic topic) throws PubSubPlusClientException, IllegalStateException, IllegalArgumentException, PubSubPlusClientException.PublisherOverflowException {
        publish(str, topic, (Object) null);
    }

    @Override // com.solace.messaging.publisher.PersistentMessagePublisher
    public void publish(String str, Topic topic, Object obj) throws PubSubPlusClientException, IllegalStateException, IllegalArgumentException, PubSubPlusClientException.PublisherOverflowException {
        validatePublisher();
        Validation.nullIllegal(str, "Message string can't be null");
        Validation.nullIllegal(topic, "Message destination can't be null");
        publishInternalMessage(this.messageBuilder.build(str), topic, null, obj);
    }

    @Override // com.solace.messaging.publisher.PersistentMessagePublisher
    public void publish(OutboundMessage outboundMessage, Topic topic) throws PubSubPlusClientException, IllegalStateException, IllegalArgumentException, PubSubPlusClientException.PublisherOverflowException {
        publish(outboundMessage, topic, null, null);
    }

    @Override // com.solace.messaging.publisher.PersistentMessagePublisher
    public void publish(OutboundMessage outboundMessage, Topic topic, Object obj) throws PubSubPlusClientException, IllegalStateException, IllegalArgumentException, PubSubPlusClientException.PublisherOverflowException {
        publish(outboundMessage, topic, obj, null);
    }

    @Override // com.solace.messaging.publisher.PersistentMessagePublisher
    public void publish(OutboundMessage outboundMessage, Topic topic, Object obj, Properties properties) throws PubSubPlusClientException, IllegalStateException, IllegalArgumentException, PubSubPlusClientException.PublisherOverflowException {
        validatePublisher();
        Validation.nullIllegal(outboundMessage, "Message can't be null");
        Validation.nullIllegal(topic, "Message destination can't be null");
        publishExternalMessage(outboundMessage, topic, properties, obj);
    }

    @Override // com.solace.messaging.publisher.PersistentMessagePublisher
    public void publishAwaitAcknowledgement(OutboundMessage outboundMessage, Topic topic, long j) throws PubSubPlusClientException.TimeoutException, PubSubPlusClientException.MessageRejectedByBrokerException, PubSubPlusClientException.PublisherOverflowException, PubSubPlusClientException.MessageDestinationDoesNotExistException, PubSubPlusClientException.MessageNotAcknowledgedByBrokerException, PubSubPlusClientException, InterruptedException {
        publishAwaitAcknowledgement(outboundMessage, topic, j, null);
    }

    @Override // com.solace.messaging.publisher.PersistentMessagePublisher
    public void publishAwaitAcknowledgement(OutboundMessage outboundMessage, Topic topic, long j, Properties properties) throws PubSubPlusClientException.TimeoutException, PubSubPlusClientException.MessageRejectedByBrokerException, PubSubPlusClientException.PublisherOverflowException, PubSubPlusClientException.MessageDestinationDoesNotExistException, PubSubPlusClientException.MessageNotAcknowledgedByBrokerException, PubSubPlusClientException, InterruptedException {
        validatePublisher();
        Validation.nullIllegal(outboundMessage, "Message can't be null");
        Validation.nullIllegal(topic, "Message destination can't be null");
        publishBlockingExternalMessage(outboundMessage, topic, properties, j, true);
    }

    @Override // com.solace.messaging.publisher.PersistentMessagePublisher, com.solace.messaging.util.AsyncLifecycleControl
    public <PersistentMessagePublisher> CompletableFuture<PersistentMessagePublisher> startAsync() throws PubSubPlusClientException {
        ExtendedCompletableFuture extendedCompletableFuture;
        if (!this.serviceInternalView.isConnected()) {
            throw new IllegalStateException("Publisher can't be started before it is connected to a messaging service");
        }
        if (this.stateHolder.get() == STATE_TERMINATED) {
            throw new IllegalStateException("Message publisher is already terminated");
        }
        do {
            extendedCompletableFuture = new ExtendedCompletableFuture();
            if (STATE_TERMINATED == this.stateHolder.get()) {
                return ExtendedCompletableFuture.failedFuture(new IllegalStateException("Publisher is already terminated"));
            }
            if (this.stateHolder.compareAndSet(STATE_NOT_STARTED, STATE_STARTED)) {
                if (logger.isDebugEnabled()) {
                    logger.debug(this.instanceName + " is being started");
                }
                try {
                    onStart();
                    extendedCompletableFuture.complete(this);
                    if (logger.isDebugEnabled()) {
                        logger.debug(this.instanceName + " is started");
                    }
                } catch (Exception e) {
                    logger.error(this.instanceName + " failed to start and is terminating", e);
                    onTerminate(null, this.postTerminationClearBufferSilentTask);
                    extendedCompletableFuture.completeExceptionally(new IllegalStateException("Publisher is already closed due to internal error"));
                }
                return ExtendedCompletableFuture.onCancellation(extendedCompletableFuture, (obj, th) -> {
                    this.stateHolder.set(STATE_TERMINATED);
                    onTerminate(null, this.postTerminationClearBufferSilentTask);
                    if (logger.isDebugEnabled()) {
                        logger.debug(this.instanceName + " async start was canceled");
                    }
                });
            }
        } while (!(STATE_STARTED == this.stateHolder.get()));
        extendedCompletableFuture.complete(this);
        return extendedCompletableFuture;
    }

    @Override // com.solace.messaging.util.AsyncLifecycleControl
    public CompletableFuture<Void> terminateAsync(long j) throws PubSubPlusClientException, IllegalStateException {
        Validation.smallerThanNumbersIllegal(1L, j, "Grace period < 1");
        Task<PersistentMessagePublisherImpl> task = persistentMessagePublisherImpl -> {
            persistentMessagePublisherImpl.stateHolder.set(STATE_TERMINATED);
            persistentMessagePublisherImpl.gracefulShutdownInProgress = true;
        };
        AtomicInteger atomicInteger = new AtomicInteger(STATE_NOT_STARTED);
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " is being terminated");
        }
        onTerminate(task, postTerminationTerminateAsyncTask, Long.valueOf(j), atomicInteger);
        ExtendedCompletableFuture extendedCompletableFuture = new ExtendedCompletableFuture();
        int i = atomicInteger.get();
        if (i > 0) {
            this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED, i);
            extendedCompletableFuture.completeExceptionally(new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %d messages could not be completed due to expiration of a grace period", Integer.valueOf(i)), i));
        } else {
            extendedCompletableFuture.complete(null);
        }
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " is terminated");
        }
        return extendedCompletableFuture;
    }

    @Override // com.solace.messaging.util.AsyncLifecycleControl
    public void terminateAsync(CompletionListener<Void> completionListener, long j) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(completionListener, "Termination listener can't be null");
        terminateAsync(j).whenComplete((r5, th) -> {
            Throwable cause;
            if (th == null) {
                cause = null;
            } else {
                try {
                    cause = th.getCause();
                } catch (Exception e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Application code throw an unhandled exception by processing async termination completion notification", e);
                        return;
                    }
                    return;
                }
            }
            completionListener.onCompletion(null, cause);
        });
    }

    @Override // com.solace.messaging.publisher.PersistentMessagePublisher, com.solace.messaging.util.AsyncLifecycleControl
    public <PersistentMessagePublisher> void startAsync(CompletionListener<PersistentMessagePublisher> completionListener) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(completionListener, "Start listener can't be null");
        startAsync().whenComplete((obj, th) -> {
            Throwable cause;
            if (th == null) {
                cause = null;
            } else {
                try {
                    cause = th.getCause();
                } catch (Exception e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Application code throw an unhandled exception by processing async start completion notification", e);
                        return;
                    }
                    return;
                }
            }
            completionListener.onCompletion(obj, cause);
        });
    }

    @Override // com.solace.messaging.publisher.PersistentMessagePublisher, com.solace.messaging.util.ManageablePublisher
    public ManageablePublisher.PersistentPublisherInfo publisherInfo() {
        return this.publisherInfo;
    }

    @Override // com.solace.messaging.publisher.PersistentMessagePublisher, com.solace.messaging.util.LifecycleControl
    public PersistentMessagePublisher start() {
        if (STATE_STARTED == this.stateHolder.get()) {
            return this;
        }
        try {
            startAsync().get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Publisher start was canceled", e);
        } catch (CancellationException e2) {
            throw new PubSubPlusClientException.RequestInterruptedException("Publisher start was canceled", e2);
        } catch (ExecutionException e3) {
            Throwable cause = e3.getCause();
            if (cause != null) {
                if (cause instanceof PubSubPlusClientException) {
                    throw ((PubSubPlusClientException) cause);
                }
                if (cause instanceof IllegalStateException) {
                    throw ((IllegalStateException) cause);
                }
                throw new PubSubPlusClientException(cause);
            }
            if (logger.isErrorEnabled()) {
                logger.error(this.instanceName + " failed to start");
            }
        }
        return this;
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public void terminate(long j) throws PubSubPlusClientException {
        if (j == 0) {
            terminateNow();
            return;
        }
        try {
            terminateAsync(j).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Publisher termination was canceled", e);
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (cause == null) {
                throw new PubSubPlusClientException(e2);
            }
            if (cause instanceof PubSubPlusClientException) {
                throw ((PubSubPlusClientException) cause);
            }
            if (!(cause instanceof IllegalStateException)) {
                throw new PubSubPlusClientException(cause);
            }
            throw ((IllegalStateException) cause);
        }
    }

    void terminateNow() throws PubSubPlusClientException.IncompleteMessageDeliveryException, PubSubPlusClientException, IllegalStateException {
        AtomicInteger atomicInteger = new AtomicInteger(STATE_NOT_STARTED);
        try {
            if (logger.isDebugEnabled()) {
                logger.debug(this.instanceName + " is being non gracefully terminated");
            }
            onTerminate(null, postTerminateNowTask, atomicInteger);
            if (logger.isDebugEnabled()) {
                logger.debug(this.instanceName + " is terminated");
            }
            int i = atomicInteger.get();
            if (i > 0) {
                throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %d messages could not be completed due to non graceful termination", Integer.valueOf(i)), i);
            }
        } catch (Throwable th) {
            int i2 = atomicInteger.get();
            if (i2 <= 0) {
                throw th;
            }
            throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %d messages could not be completed due to non graceful termination", Integer.valueOf(i2)), i2);
        }
    }

    @Override // com.solace.messaging.publisher.PublisherHealthCheck
    public boolean isReady() {
        return isRunning() && this.buffer.remainingCapacity() > 0 && !this.gracefulShutdownInProgress;
    }

    @Override // com.solace.messaging.publisher.PublisherHealthCheck
    public void setPublisherReadinessListener(PublisherHealthCheck.PublisherReadinessListener publisherReadinessListener) {
        this.bufferCongestionNotificationDispatcher.setPublisherReadinessListener(publisherReadinessListener);
    }

    @Override // com.solace.messaging.publisher.PublisherHealthCheck
    public void notifyWhenReady() {
        this.bufferCongestionNotificationDispatcher.notifyWhenReady();
    }

    @Internal
    void publishExternalMessage(OutboundMessage outboundMessage, Topic topic, Properties properties, Object obj) {
        OutboundMessage deepCopy = (properties == null || properties.isEmpty()) ? OutboundMessageBuilder.deepCopy(outboundMessage) : OutboundMessageBuilder.deepCopy(outboundMessage, properties);
        OutboundMessageBuilder.OutboundMessageBuilderImpl.injectDeliveryMode(deepCopy, DeliveryMode.PERSISTENT);
        OutboundMessageBuilder.OutboundMessageBuilderImpl.injectCorrelationKey(deepCopy, new CorrelationContext(this.messageKeyProvider.nextLongKey(), obj, deepCopy));
        this.buffer.insert(PublisherBuffers.Publishable.of(deepCopy, topic));
    }

    @Internal
    void publishInternalMessage(OutboundMessage outboundMessage, Topic topic, Properties properties, Object obj) {
        OutboundMessage injectExtendedMessageProperties = (properties == null || properties.isEmpty()) ? outboundMessage : OutboundMessageBuilder.OutboundMessageBuilderImpl.injectExtendedMessageProperties(outboundMessage, properties);
        OutboundMessageBuilder.OutboundMessageBuilderImpl.injectCorrelationKey(injectExtendedMessageProperties, new CorrelationContext(this.messageKeyProvider.nextLongKey(), obj, injectExtendedMessageProperties));
        this.buffer.insert(PublisherBuffers.Publishable.of(injectExtendedMessageProperties, topic));
    }

    @Internal
    void publishBlockingExternalMessage(OutboundMessage outboundMessage, Topic topic, Properties properties, long j, boolean z) {
        OutboundMessage deepCopy = (properties == null || properties.isEmpty()) ? OutboundMessageBuilder.deepCopy(outboundMessage) : OutboundMessageBuilder.deepCopy(outboundMessage, properties);
        OutboundMessageBuilder.OutboundMessageBuilderImpl.injectDeliveryMode(deepCopy, DeliveryMode.PERSISTENT);
        OutboundMessageBuilder.OutboundMessageBuilderImpl.injectAckImmediately(deepCopy, z);
        BlockingCorrelationContext blockingCorrelationContext = new BlockingCorrelationContext(this.messageKeyProvider.nextLongKey(), null, deepCopy);
        OutboundMessageBuilder.OutboundMessageBuilderImpl.injectCorrelationKey(deepCopy, blockingCorrelationContext);
        this.buffer.insert(PublisherBuffers.Publishable.of(deepCopy, topic));
        try {
            try {
                blockingCorrelationContext.locker.lock();
                if (!blockingCorrelationContext.locker.await(j, TimeUnit.MILLISECONDS)) {
                    throw new PubSubPlusClientException.TimeoutException(String.format("Message was not acknowledged within %d milliseconds", Long.valueOf(j)));
                }
                PubSubPlusClientException exception = blockingCorrelationContext.getException();
                if (exception != null) {
                    throw exception;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new PubSubPlusClientException.RequestInterruptedException("Blocking publish was interrupted", e);
            }
        } finally {
            blockingCorrelationContext.clear();
        }
    }

    void validatePublisher() {
        if (isTerminating() || isTerminated()) {
            throw new IllegalStateException("Message publisher was terminated");
        }
        if (!isRunning()) {
            throw new IllegalStateException("Message publisher not running");
        }
    }

    OutboundMessageBuilder configureMessageBuilder(OutboundMessageBuilder outboundMessageBuilder) {
        if (outboundMessageBuilder instanceof OutboundMessageBuilder.OutboundMessageBuilderImpl) {
            ((OutboundMessageBuilder.OutboundMessageBuilderImpl) outboundMessageBuilder).forPersistentMessagePublisher();
        }
        return outboundMessageBuilder;
    }

    void onStart() throws PubSubPlusClientException {
        this.serviceInternalView.getClientSession().addServiceInterruptionListener(this.serviceInterruptionListener);
        this.serviceInternalView.getClientSession().addClientSessionStateListener(this.closedSessionListener);
        this.producer = createMessageProducer(this.messageCorrelationHandler, this.publisherEventHandler, this.publisherConfiguration);
        this.publisherBotExecutorService.submit(() -> {
            JCSMPXMLMessageProducer jCSMPXMLMessageProducer = this.producer;
            while (true) {
                if ((!isRunning() && !this.gracefulShutdownInProgress) || this.buffer.isClosed()) {
                    return;
                }
                if (jCSMPXMLMessageProducer == null) {
                    if (logger.isErrorEnabled()) {
                        onTerminate(null, null);
                        logger.error(this.instanceName + " could not create an internal service to publish messages to a broker.");
                        return;
                    }
                    return;
                }
                if (!jCSMPXMLMessageProducer.isClosed()) {
                    PublisherBuffers.Publishable<Topic> consume = this.buffer.consume();
                    if (consume != null) {
                        Object correlationKey = consume.getMessage().getCorrelationKey();
                        if ((correlationKey instanceof CorrelationContext) && correlationKey != null) {
                            this.inFlightMessageIdBuffer.add(((CorrelationContext) correlationKey).getCorrelationKey());
                        }
                        try {
                            jCSMPXMLMessageProducer.send(OutboundMessageBuilder.OutboundMessageBuilderImpl.OutboundMessageImpl.toByteMessage(consume.getMessage()), consume.getDestination());
                        } catch (Exception e) {
                            this.serviceInternalView.getClientSession().getSessionStats().incStat(StatType.MESSAGES_DISCARDED_INTERNAL);
                            if (correlationKey instanceof CorrelationContext) {
                                this.inFlightMessageIdBuffer.remove(((CorrelationContext) correlationKey).getCorrelationKey());
                            }
                            if ((e instanceof ClosedFacilityException) && this.gracefulShutdownInProgress) {
                                return;
                            }
                            if (logger.isErrorEnabled()) {
                                logger.error(this.instanceName + " could not publish message to a broker. Message:" + consume.getMessage() + ", destination: " + consume.getDestination(), e);
                            }
                            jCSMPXMLMessageProducer = this.producer;
                            this.notificationDispatcher.onException(consume.getMessage().getCorrelationKey(), e, Instant.now().toEpochMilli());
                        }
                    } else if (this.buffer.isClosed()) {
                        return;
                    }
                } else {
                    if (this.gracefulShutdownInProgress) {
                        return;
                    }
                    try {
                        Thread.sleep(500L);
                    } catch (InterruptedException e2) {
                    }
                    jCSMPXMLMessageProducer = this.producer;
                }
            }
        });
    }

    void onTerminate(Task<PersistentMessagePublisherImpl> task, Task<PersistentMessagePublisherImpl> task2) {
        if (task != null) {
            try {
                task.run(this);
            } finally {
                this.terminationNotificationDispatcher.close();
                if (task2 != null) {
                    task2.run(this);
                }
            }
        }
        this.stateHolder.set(STATE_TERMINATED);
        this.serviceInternalView.getClientSession().removeServiceInterruptionListener(this.serviceInterruptionListener);
        this.serviceInternalView.getClientSession().removeClientSessionStateListener(this.closedSessionListener);
        this.buffer.setBufferCongestionMonitor(null, -1);
        this.bufferCongestionNotificationDispatcher.close();
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " publisher is shutdown");
        }
    }

    <P2> void onTerminate(Task<PersistentMessagePublisherImpl> task, BiTask<PersistentMessagePublisherImpl, P2> biTask, P2 p2) {
        if (task != null) {
            try {
                task.run(this);
            } finally {
                this.terminationNotificationDispatcher.close();
                if (biTask != null) {
                    biTask.run(this, p2);
                }
            }
        }
        this.stateHolder.set(STATE_TERMINATED);
        this.serviceInternalView.getClientSession().removeServiceInterruptionListener(this.serviceInterruptionListener);
        this.serviceInternalView.getClientSession().removeClientSessionStateListener(this.closedSessionListener);
        this.buffer.setBufferCongestionMonitor(null, -1);
        this.bufferCongestionNotificationDispatcher.close();
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " publisher is shutdown");
        }
    }

    <P2, P3> void onTerminate(Task<PersistentMessagePublisherImpl> task, TriTask<PersistentMessagePublisherImpl, P2, P3> triTask, P2 p2, P3 p3) {
        if (task != null) {
            try {
                task.run(this);
            } finally {
                this.terminationNotificationDispatcher.close();
                if (triTask != null) {
                    triTask.run(this, p2, p3);
                }
            }
        }
        this.stateHolder.set(STATE_TERMINATED);
        this.serviceInternalView.getClientSession().removeServiceInterruptionListener(this.serviceInterruptionListener);
        this.serviceInternalView.getClientSession().removeClientSessionStateListener(this.closedSessionListener);
        this.buffer.setBufferCongestionMonitor(null, -1);
        this.bufferCongestionNotificationDispatcher.close();
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " publisher is shutdown");
        }
    }

    DiscardOldestConcurrentBuffer<Long> createInFlightMessageIdBuffer(TypedProperties typedProperties) {
        return new DiscardOldestConcurrentBuffer<>(255);
    }

    JCSMPXMLMessageProducer createMessageProducer(JCSMPStreamingPublishCorrelatingEventHandler jCSMPStreamingPublishCorrelatingEventHandler, JCSMPProducerEventHandler jCSMPProducerEventHandler, TypedProperties typedProperties) throws PubSubPlusClientException {
        try {
            ClientSession clientSession = this.serviceInternalView.getClientSession();
            if (clientSession.getDefaultProducer() == null) {
                clientSession.getMessageProducer(new JCSMPStreamingPublishCorrelatingEventHandler() { // from class: com.solace.messaging.publisher.PersistentMessagePublisherImpl.5
                    public void responseReceivedEx(Object obj) {
                    }

                    public void handleErrorEx(Object obj, JCSMPException jCSMPException, long j) {
                    }

                    public void handleError(String str, JCSMPException jCSMPException, long j) {
                    }

                    public void responseReceived(String str) {
                    }
                }, new JCSMPProducerEventHandler() { // from class: com.solace.messaging.publisher.PersistentMessagePublisherImpl.6
                    public void handleEvent(ProducerEventArgs producerEventArgs) {
                    }
                });
            }
            return clientSession.createProducer(toFlowProperties(typedProperties), jCSMPStreamingPublishCorrelatingEventHandler, jCSMPProducerEventHandler);
        } catch (Exception e) {
            e.printStackTrace();
            throw new PubSubPlusClientException("Failed to create message publisher", e);
        }
    }

    ProducerFlowProperties toFlowProperties(TypedProperties typedProperties) {
        ProducerFlowProperties producerFlowProperties = new ProducerFlowProperties();
        Integer integerProperty = typedProperties.getIntegerProperty("pub_ack_time");
        if (integerProperty != null) {
            producerFlowProperties.setPubAckTime(integerProperty);
        }
        Boolean booleanProperty = typedProperties.getBooleanProperty("generate_sender_id");
        if (booleanProperty != null) {
            producerFlowProperties.setGenerateSenderId(booleanProperty);
        }
        Boolean booleanProperty2 = typedProperties.getBooleanProperty("generate_send_timestamps");
        if (booleanProperty2 != null) {
            producerFlowProperties.setGenerateSendTimeStamp(booleanProperty2);
        }
        Boolean booleanProperty3 = typedProperties.getBooleanProperty("generate_sequence_numbers");
        if (booleanProperty3 != null) {
            producerFlowProperties.setGenerateSequenceNumber(booleanProperty3);
        }
        Boolean booleanProperty4 = typedProperties.getBooleanProperty("calculate_message_expiration");
        if (booleanProperty4 != null) {
            producerFlowProperties.setCalculateMessageExpiration(booleanProperty4);
        }
        Boolean booleanProperty5 = typedProperties.getBooleanProperty("pub_multi_thread");
        if (booleanProperty5 != null) {
            producerFlowProperties.setPubMultiThreaded(booleanProperty5);
        }
        Integer integerProperty2 = typedProperties.getIntegerProperty("pub_ack_window_size");
        if (integerProperty2 != null) {
            producerFlowProperties.setWindowSize(integerProperty2.intValue());
        }
        Boolean booleanProperty6 = typedProperties.getBooleanProperty("ad_pub_router_windowed_ack");
        if (booleanProperty6 != null) {
            producerFlowProperties.setRtrWindowedAck(booleanProperty6.booleanValue());
        }
        String property = typedProperties.getProperty("ACK_EVENT_MODE");
        if (property != null) {
            producerFlowProperties.setAckEventMode(property);
        }
        return producerFlowProperties;
    }

    void emptyBuffer(PublisherBuffers.PublisherBuffer<Topic> publisherBuffer, DiscardOldestConcurrentBuffer<Long> discardOldestConcurrentBuffer) {
        try {
            publisherBuffer.close(this.bufferCleaningAction);
        } catch (PubSubPlusClientException.RequestInterruptedException e) {
            if (logger.isWarnEnabled()) {
                logger.warn("Non-graceful termination of " + this.instanceName + " was interrupted");
            }
        }
        discardOldestConcurrentBuffer.clear();
    }
}
