/*
 * Decompiled with CFR 0.152.
 */
package com.solacesystems.jcsmp.impl.flow;

import com.solacesystems.common.util.LogWrapper;
import com.solacesystems.common.util.NetworkByteOrderNumberUtil;
import com.solacesystems.common.util.ThreadUtil;
import com.solacesystems.jcsmp.CapabilityType;
import com.solacesystems.jcsmp.InvalidMessageReceivedException;
import com.solacesystems.jcsmp.JCSMPErrorResponseException;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPProducerEventHandler;
import com.solacesystems.jcsmp.JCSMPStreamingPublishEventHandler;
import com.solacesystems.jcsmp.JCSMPTransportException;
import com.solacesystems.jcsmp.ProducerFlowProperties;
import com.solacesystems.jcsmp.i18n.JCSMPRB;
import com.solacesystems.jcsmp.impl.JCSMPBasicSession;
import com.solacesystems.jcsmp.impl.JCSMPErrorResponseSubcodeMapper;
import com.solacesystems.jcsmp.impl.JCSMPXMLMessage;
import com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer;
import com.solacesystems.jcsmp.impl.MsgIdInfo;
import com.solacesystems.jcsmp.impl.OpenFlowWireMessageEncoder;
import com.solacesystems.jcsmp.impl.PubADManager;
import com.solacesystems.jcsmp.protocol.CSMPPublisherChannelObserver;
import com.solacesystems.jcsmp.protocol.WireMessage;
import com.solacesystems.jcsmp.protocol.impl.TcpChannel;
import com.solacesystems.jcsmp.protocol.impl.TcpClientChannel;
import com.solacesystems.jcsmp.protocol.smf.AssuredCtrlHeaderBean;
import com.solacesystems.jcsmp.protocol.smf.SMFHeaderBean;
import com.solacesystems.jcsmp.protocol.smf.SmfTLVParameter;
import com.solacesystems.jcsmp.protocol.smf.impl.TlvCoderUtil;
import com.solacesystems.jcsmp.protocol.smf.impl.TlvParameterParser;
import com.solacesystems.jcsmp.protocol.smf.impl.WireMessageFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.osgi.annotation.versioning.ProviderType;

@ProviderType
public class PubFlowManager
implements CSMPPublisherChannelObserver {
    private final LogWrapper Trace = new LogWrapper(PubFlowManager.class);
    private JCSMPBasicSession _session = null;
    private TcpClientChannel _channel = null;
    private Map<Long, JCSMPXMLMessageProducer> activeFlows = new HashMap<Long, JCSMPXMLMessageProducer>();
    private List<JCSMPXMLMessageProducer> inactiveFlows = new ArrayList<JCSMPXMLMessageProducer>();

    public PubFlowManager(JCSMPBasicSession session) {
        this._session = session;
        this.Trace.setContextInfo(this._session.getLogContextInfo() + ":" + Integer.toHexString(this.hashCode()));
        if (this.Trace.isDebugEnabled()) {
            this.Trace.debug("PubFlowManager created ");
        }
    }

    public void setChannel(TcpClientChannel channel) {
        this._channel = channel;
        if (this._channel != null) {
            this._channel.setObserver(this);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int countAll() {
        Map<Long, JCSMPXMLMessageProducer> map = this.activeFlows;
        synchronized (map) {
            List<JCSMPXMLMessageProducer> list = this.inactiveFlows;
            synchronized (list) {
                return this.activeFlows.size() + this.inactiveFlows.size();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyReconnectAborted() {
        Map<Long, JCSMPXMLMessageProducer> map = this.activeFlows;
        synchronized (map) {
            List<JCSMPXMLMessageProducer> list = this.inactiveFlows;
            synchronized (list) {
                for (JCSMPXMLMessageProducer pubFlow : this.activeFlows.values()) {
                    pubFlow.notifyReconnectAborted();
                }
                for (JCSMPXMLMessageProducer pubFlow : this.inactiveFlows) {
                    pubFlow.notifyReconnectAborted();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JCSMPXMLMessageProducer createDefaultProducer(JCSMPStreamingPublishEventHandler callback, JCSMPProducerEventHandler eventCallback, ProducerFlowProperties fprop) throws JCSMPException {
        JCSMPXMLMessageProducer nprod = new JCSMPXMLMessageProducer(this._session, this._channel, callback, eventCallback, this, fprop, null);
        nprod.setDirectPermitted(true);
        nprod.open();
        long flowId = -1L;
        if (this._session.isCapable(CapabilityType.PUB_GUARANTEED)) {
            flowId = this.doPubAssuredCtrl(nprod, true, null);
        } else {
            this.skipPubAssuredCtrl(nprod);
        }
        Map<Long, JCSMPXMLMessageProducer> map = this.activeFlows;
        synchronized (map) {
            this.activeFlows.put(flowId, nprod);
            this.activeFlows.put(-1L, nprod);
        }
        return nprod;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JCSMPXMLMessageProducer createProducerFlow(JCSMPStreamingPublishEventHandler callback, JCSMPProducerEventHandler eventCallback, ProducerFlowProperties fprop, JCSMPBasicSession.InternalBindProperties internalBindProp) throws JCSMPException {
        long flowId;
        JCSMPXMLMessageProducer nprod = new JCSMPXMLMessageProducer(this._session, this._channel, callback, eventCallback, this, fprop, internalBindProp);
        nprod.setDirectPermitted(false);
        nprod.open();
        Integer idTag = null;
        if (internalBindProp != null) {
            idTag = internalBindProp.connCounterTag;
        }
        if ((flowId = this.doPubAssuredCtrl(nprod, true, idTag)) == -1L) {
            nprod.close();
            throw new InvalidMessageReceivedException("Router does not support producer flow, unable to get flowID.");
        }
        Map<Long, JCSMPXMLMessageProducer> map = this.activeFlows;
        synchronized (map) {
            this.activeFlows.put(flowId, nprod);
        }
        if (this.Trace.isDebugEnabled()) {
            this.Trace.debug("createProducerFlow: " + flowId);
        }
        return nprod;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void closeAllFlows(boolean graceful) {
        ArrayList<JCSMPXMLMessageProducer> prods = new ArrayList<JCSMPXMLMessageProducer>();
        Map<Long, JCSMPXMLMessageProducer> map = this.activeFlows;
        synchronized (map) {
            List<JCSMPXMLMessageProducer> list = this.inactiveFlows;
            synchronized (list) {
                for (JCSMPXMLMessageProducer p : this.activeFlows.values()) {
                    prods.add(p);
                }
                for (JCSMPXMLMessageProducer p : this.inactiveFlows) {
                    prods.add(p);
                }
            }
        }
        for (JCSMPXMLMessageProducer p : prods) {
            p.close();
        }
    }

    public void closeFlow(JCSMPXMLMessageProducer producer, boolean graceful) {
        long flowId = producer.getPubADManager().getFlowId();
        try {
            this._session.waitUntilSessionReconnectDone("closeFlow");
        }
        catch (JCSMPException jCSMPException) {
            // empty catch block
        }
        if (graceful && flowId != -1L) {
            SMFHeaderBean smfHeader = new SMFHeaderBean().setProtocol(9).setTtl(1);
            AssuredCtrlHeaderBean assBean = this._session.getAssuredCtrlFactory().createCloseFlowRequest(flowId);
            WireMessage msgReq = WireMessageFactory.createWith(smfHeader, assBean);
            msgReq.setFriendlyName(String.format("ProducerCloseFlow[flowid=%s]", flowId));
            try {
                this._channel.setReqCorrelationTag(msgReq);
                int writecode = this._channel.sendAdCtrlRequest(msgReq, false, TcpChannel.WriteBlockPolicy.DROP_AND_IGNORE, false);
                if (writecode == 1) {
                    this._channel.enqueuePriorityData(msgReq);
                }
            }
            catch (JCSMPException ex) {
                this.Trace.info(String.format("Error occurred closing flow %s, ignoring: %s", flowId, ex));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deregister(JCSMPXMLMessageProducer producer) {
        Map<Long, JCSMPXMLMessageProducer> map = this.activeFlows;
        synchronized (map) {
            long flowId = producer.getPubADManager().getFlowId();
            this.activeFlows.remove(flowId);
            if (producer == this.activeFlows.get(-1L)) {
                this.activeFlows.remove(-1L);
            }
            List<JCSMPXMLMessageProducer> list = this.inactiveFlows;
            synchronized (list) {
                this.inactiveFlows.remove(producer);
            }
        }
    }

    @Override
    public void handleCloseFlowResponse(WireMessage wireMsg) {
        AssuredCtrlHeaderBean actrl_bean = (AssuredCtrlHeaderBean)wireMsg.getHeaderBean();
        SmfTLVParameter flowId_param = (SmfTLVParameter)actrl_bean.findFirstParameter(6);
        JCSMPXMLMessageProducer producer = this.activeFlows.get(NetworkByteOrderNumberUtil.fourByteToUInt(flowId_param.value));
        if (producer != null) {
            this.deregister(producer);
        }
    }

    private void skipPubAssuredCtrl(JCSMPXMLMessageProducer producer) {
        PubADManager admgr = producer.getPubADManager();
        admgr.setPub_Ack_Window_Size(0);
        admgr.initMessageQueue();
    }

    private long doPubAssuredCtrl(JCSMPXMLMessageProducer producer, boolean retryable, Integer idTag) throws JCSMPException {
        int pubWinSz;
        WireMessage response = null;
        PubADManager admgr = producer.getPubADManager();
        SMFHeaderBean smfHeader = new SMFHeaderBean().setProtocol(9).setTtl(1);
        long max_post_tries = this._channel.getRequestMaxRetries();
        int tries = 0;
        int n = pubWinSz = admgr.getPub_Ack_Window_Size() != -1 ? admgr.getPub_Ack_Window_Size() : admgr.configured_Pub_Ack_Window_Size;
        if (!admgr.isRtr_Windowed_Ack()) {
            pubWinSz = 1;
        }
        boolean resetADState = !this._session.getSessionStats().hasPublishedAD();
        boolean waitForResponse = true;
        AssuredCtrlHeaderBean assBean = null;
        while (waitForResponse) {
            Long transactedSessionId;
            Long l = transactedSessionId = producer.getTransactedSession() == null ? null : Long.valueOf(producer.getTransactedSession().getTransactedSessionId());
            if (resetADState) {
                admgr.resetAdFlow();
            }
            assBean = this._session.getAssuredCtrlFactory().createOpenFlowRequest(admgr.getLastMessageIdSent(), admgr.getLastMessageIdAcked(), pubWinSz, admgr.flow_Name, transactedSessionId);
            WireMessage msgReq = WireMessageFactory.createWith(smfHeader, assBean);
            msgReq.setFriendlyName("ADCTRL-ProducerOpenFlow");
            if (this.Trace.isDebugEnabled()) {
                this.Trace.debug(String.format("Created ADCTRL Handshake/OpenFlow Request [TransactedSessionId=%d, flowName=%s, lastSent=%s  lastAck=%s  winSz=%s]", transactedSessionId == null ? -1L : transactedSessionId, admgr.flow_Name == null ? "null" : admgr.flow_Name, admgr.getLastMessageIdSent(), admgr.getLastMessageIdAcked(), pubWinSz));
            }
            msgReq.encoder = new OpenFlowWireMessageEncoder(this._session, producer, pubWinSz);
            try {
                if (retryable) {
                    try {
                        response = this._channel.doSmfSharedRequest(msgReq, 1L, this._channel.getConnCounterTag());
                    }
                    catch (JCSMPTransportException e) {
                        ++tries;
                        if (this.Trace.isDebugEnabled()) {
                            this.Trace.debug(String.format("doPubAssuredCtrl try=%s / max=%s %n", tries, max_post_tries));
                        }
                        if ((long)tries < max_post_tries) {
                            this._session.waitUntilSessionReconnectDone("doPubAssuredCtrl");
                            continue;
                        }
                        throw e;
                    }
                } else {
                    response = this._channel.doSmfSubSingleShotRequest(msgReq, true, true, TcpChannel.WriteBlockPolicy.RESCHED_OK_BUT_NO_BLOCK_ON_STATE, this._channel.getConnCounterTag(), null);
                }
                SMFHeaderBean smfh = response.getSmfHeader();
                int resp_code = smfh.getPm_respcode();
                if (resp_code != 200) {
                    JCSMPErrorResponseException respEx;
                    long txSessionId = -1L;
                    SmfTLVParameter param = (SmfTLVParameter)assBean.findFirstParameter(24);
                    if (param != null) {
                        txSessionId = TlvParameterParser.getAssuredTransactedSessionId(param);
                    }
                    String flowName = " ";
                    param = (SmfTLVParameter)assBean.findFirstParameter(10);
                    if (param != null) {
                        flowName = TlvCoderUtil.nullTermUtf8ToString(param.value);
                    }
                    if (this.Trace.isInfoEnabled()) {
                        this.Trace.info(String.format("Error Response [%d %s], TransactedSessionId=%d, flowName=%s", resp_code, smfh.getPm_respstr(), txSessionId, flowName));
                    }
                    if (producer.getTransactedSession() != null) {
                        producer.getTransactedSession().setRollbackOnly(producer);
                    }
                    if ((respEx = new JCSMPErrorResponseException(resp_code, smfh.getPm_respstr(), "", this._channel == null ? "" : this._channel.getNetworkInfoString(), JCSMPErrorResponseSubcodeMapper.ErrorContext.CONTROL)).getResponseCode() == 400 && respEx.getSubcodeEx() == 55 && this._session.getGdReconnectFailAction().equals("gd_reconnect_fail_action_auto_retry")) {
                        resetADState = true;
                        continue;
                    }
                    throw respEx;
                }
                waitForResponse = false;
            }
            catch (JCSMPErrorResponseException e) {
                if (e.getResponsePhrase().toUpperCase().indexOf("UNKNOWN PROTOCOL") == -1) {
                    throw e;
                }
                admgr.setPub_Ack_Window_Size(0);
                this.setSessionCapability(CapabilityType.PUB_GUARANTEED, Boolean.FALSE);
                this.setSessionCapability(CapabilityType.PUB_FLOW_GUARANTEED, Boolean.FALSE);
                this.Trace.warn(JCSMPRB.BUNDLE.getStringSafely("TcpPublisherChannel.routerDoesNotSupportAD"));
                this.Trace.warn(ThreadUtil.getMyStackTrace());
                return -1L;
            }
        }
        smfHeader = response.getSmfHeader();
        if (smfHeader.getProtocol() != 9 && smfHeader.getProtocol() != 19) {
            throw new InvalidMessageReceivedException(JCSMPRB.BUNDLE.getStringSafely("TcpPublisherChannel.expectedAssuredCtrlResponseGotWrongType"));
        }
        if (!(response.getHeaderBean() instanceof AssuredCtrlHeaderBean)) {
            throw new InvalidMessageReceivedException(JCSMPRB.BUNDLE.getStringSafely("TcpPublisherChannel.expectedAssredCtrlResponseBlockNotFound"));
        }
        assBean = (AssuredCtrlHeaderBean)response.getHeaderBean();
        this.processOpenFlowResponse(assBean, admgr, resetADState);
        return admgr.flow_Id;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processOpenFlowResponse(AssuredCtrlHeaderBean assBean, PubADManager admgr, boolean resetADState) throws JCSMPException {
        long respLastIdAcked = 0L;
        long respLastIdReceived = 0L;
        long pubId = -1L;
        int respWindowsSz = 0;
        long lastMsgIdSentSaved = admgr.getLastMessageIdSent();
        SmfTLVParameter param = (SmfTLVParameter)assBean.findFirstParameter(10);
        if (param != null) {
            String newFlowName;
            admgr.flow_Name = newFlowName = TlvCoderUtil.nullTermUtf8ToString(param.value);
        }
        long respFlowId = (param = (SmfTLVParameter)assBean.findFirstParameter(6)) == null ? -1L : TlvParameterParser.getAssuredFlowId(param);
        param = (SmfTLVParameter)assBean.findFirstParameter(3);
        SmfTLVParameter smfTLVParameter = param = param != null ? param : (SmfTLVParameter)assBean.findFirstParameter(14);
        if (param != null) {
            respWindowsSz = TlvParameterParser.getAssuredWindowSize(param);
            if (admgr.getPub_Ack_Window_Size() == -1) {
                if (admgr.isRtr_Windowed_Ack()) {
                    admgr.setPub_Ack_Window_Size(respWindowsSz);
                } else {
                    admgr.setPub_Ack_Window_Size(admgr.configured_Pub_Ack_Window_Size);
                }
            }
        }
        if ((param = (SmfTLVParameter)assBean.findFirstParameter(16)) != null) {
            respLastIdReceived = TlvParameterParser.getAssuredLastMsgIdReceived(param);
        } else if (this.Trace.isDebugEnabled()) {
            this.Trace.debug("lastMsgIdReceived not present in OpenFlow Response");
        }
        param = (SmfTLVParameter)assBean.findFirstParameter(2);
        if (param != null) {
            respLastIdAcked = TlvParameterParser.getAssuredLastMsgIdAcked(param);
            admgr.setLastMessageIdAcked(respLastIdAcked);
            if (resetADState) {
                admgr.setLastMessageIdSent(respLastIdAcked);
                admgr.idAllocator.setToNoCheck(respLastIdAcked + 1L);
            } else if (respLastIdAcked > admgr.getLastMessageIdSent()) {
                admgr.setLastMessageIdSent(respLastIdAcked);
                admgr.idAllocator.setTo(respLastIdAcked + 1L);
            }
            if (respLastIdReceived == 0L) {
                respLastIdReceived = respLastIdAcked;
            }
            if (respLastIdReceived < admgr.getLastMessageIdSent() && admgr.getMessageProducer().isTransacted() && admgr.noFirstRetransmitMsg(respLastIdReceived)) {
                admgr.getMessageProducer().getTransactedSession().rollbackCurrentTransaction();
                admgr.clearMessageQueue(false);
            }
        }
        admgr.setLastTransportAcked(respLastIdReceived);
        param = (SmfTLVParameter)assBean.findFirstParameter(35);
        if (param != null) {
            admgr.pub_Id = pubId = TlvParameterParser.getAssuredPublisherId(param);
        }
        long txSessionId = -1L;
        param = (SmfTLVParameter)assBean.findFirstParameter(24);
        if (param != null) {
            txSessionId = TlvParameterParser.getAssuredTransactedSessionId(param);
        }
        admgr.initMessageQueue();
        if (this.Trace.isDebugEnabled()) {
            this.Trace.debug(String.format("Got ADCTRL Response [TransactedSessionId=%d, flowName=%s, flowId=%d, lastAck=%s lastRcv=%s winSz=%s]", txSessionId, admgr.flow_Name, respFlowId, respLastIdAcked, respLastIdReceived, respWindowsSz));
            this.Trace.debug(String.format("Client AD state is now: lastAcked=%s lastSent=%s winSz=%s, flowId=%s", admgr.getLastMessageIdAcked(), admgr.getLastMessageIdSent(), admgr.getPub_Ack_Window_Size(), respFlowId));
        }
        if (admgr.getPub_Ack_Window_Size() > 0) {
            admgr.setFlowId(respFlowId);
            if (resetADState || lastMsgIdSentSaved == 0L) {
                admgr.getMessageProducer().suspend();
                try {
                    admgr.renumberMessageIdParamsOnPubMessages(respLastIdAcked);
                    this._session.xaSessionMessageIdRenumbering(admgr);
                }
                finally {
                    admgr.getMessageProducer().resume(1);
                }
            } else {
                admgr.handleClientAck(admgr.getLastMessageIdAcked(), null, false, false);
            }
        } else {
            admgr.setFlowId(-1L);
            this.Trace.warn(JCSMPRB.BUNDLE.getStringSafely("TcpPublisherChannel.routerDoesNotSupportAD"));
            this.setSessionCapability(CapabilityType.PUB_GUARANTEED, Boolean.FALSE);
            this.setSessionCapability(CapabilityType.PUB_FLOW_GUARANTEED, Boolean.FALSE);
        }
    }

    private void setSessionCapability(CapabilityType cap, Object val) {
        Map tmpCapabilities = (Map)this._session.getTransientData(JCSMPBasicSession.TransientData.CAP);
        tmpCapabilities.put(cap, val);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifyReconnected() throws JCSMPException {
        ArrayList<JCSMPXMLMessageProducer> inactiveCopy = new ArrayList<JCSMPXMLMessageProducer>();
        Map<Long, JCSMPXMLMessageProducer> map = this.activeFlows;
        synchronized (map) {
            List<JCSMPXMLMessageProducer> list = this.inactiveFlows;
            synchronized (list) {
                Iterator<Map.Entry<Long, JCSMPXMLMessageProducer>> it_active_entries = this.activeFlows.entrySet().iterator();
                while (it_active_entries.hasNext()) {
                    Map.Entry<Long, JCSMPXMLMessageProducer> e = it_active_entries.next();
                    if (e.getKey() == -1L) continue;
                    it_active_entries.remove();
                    this.inactiveFlows.add(e.getValue());
                }
                inactiveCopy.addAll(this.inactiveFlows);
                this.Trace.debug(String.format("PubFlowManager.notifyReconnected ClientChannel %s", this._channel.getDbgId()));
                for (JCSMPXMLMessageProducer pr : inactiveCopy) {
                    long flowId = this.doPubAssuredCtrl(pr, false, null);
                    if (flowId != -1L) {
                        this.activeFlows.put(flowId, pr);
                    }
                    this.inactiveFlows.remove(pr);
                    this.Trace.debug(String.format("PubFlowManager.notifyReconnected - rebound flow on ClientChannel %s", this._channel.getDbgId()));
                    if (pr.isClosed()) {
                        this.Trace.debug(String.format("PubFlowManager.notifyReconnected - rebound flow is closed locally. Closing [flowId=%d]", flowId));
                        this.closeFlow(pr, true);
                        continue;
                    }
                    pr.notifyPubFlowResumed();
                }
                for (Map.Entry<Long, JCSMPXMLMessageProducer> e : this.activeFlows.entrySet()) {
                    if (e.getKey() == -1L) continue;
                    e.getValue().getPubADManager().startADTimer();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifyPreReconnect() {
        Map<Long, JCSMPXMLMessageProducer> map = this.activeFlows;
        synchronized (map) {
            for (Map.Entry<Long, JCSMPXMLMessageProducer> e : this.activeFlows.entrySet()) {
                e.getValue().notifyPreReconnect();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void buildDispatchProducerList(List<JCSMPXMLMessageProducer> toNotify) {
        Map<Long, JCSMPXMLMessageProducer> map = this.activeFlows;
        synchronized (map) {
            List<JCSMPXMLMessageProducer> list = this.inactiveFlows;
            synchronized (list) {
                for (JCSMPXMLMessageProducer p : this.inactiveFlows) {
                    if (toNotify.contains(p)) continue;
                    toNotify.add(p);
                }
                for (JCSMPXMLMessageProducer p : this.activeFlows.values()) {
                    if (toNotify.contains(p)) continue;
                    toNotify.add(p);
                }
            }
        }
    }

    @Override
    public void handleException(MsgIdInfo msgId, JCSMPException e, long producerId, boolean forceNotifyIfClosed, List<JCSMPXMLMessageProducer> toNotify) {
        this.Trace.debug(String.format("PubFlowManager.handleException() ClientChannel %s, stack:(%s)", this._channel.getDbgId(), ThreadUtil.getMyStackTraceOneLine()));
        for (JCSMPXMLMessageProducer p : toNotify) {
            p.handleException(msgId, e, producerId, forceNotifyIfClosed);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handlePubMsgResponse(WireMessage resp) {
        long flowId = -1L;
        SMFHeaderBean smf = resp.getSmfHeader();
        switch (smf.getProtocol()) {
            case 9: {
                AssuredCtrlHeaderBean actrl_bean = (AssuredCtrlHeaderBean)resp.getHeaderBean();
                if (actrl_bean.getMsgType() != 3) break;
                SmfTLVParameter flowId_param = (SmfTLVParameter)actrl_bean.findFirstParameter(6);
                flowId = NetworkByteOrderNumberUtil.fourByteToUInt(flowId_param.value);
                SmfTLVParameter msgId_param = (SmfTLVParameter)actrl_bean.findFirstParameter(2);
                long msgId = NetworkByteOrderNumberUtil.eightByteToUInt(msgId_param.value);
                smf.setPm_ad_msgid(msgId);
                SmfTLVParameter retransmitRequest_param = (SmfTLVParameter)actrl_bean.findFirstParameter(53);
                if (retransmitRequest_param == null) break;
                smf.setRetransmitRequired(true);
                break;
            }
            case 3: 
            case 13: {
                flowId = -1L;
            }
        }
        JCSMPXMLMessageProducer prod = null;
        Map<Long, JCSMPXMLMessageProducer> map = this.activeFlows;
        synchronized (map) {
            prod = this.activeFlows.get(flowId);
        }
        if (prod != null) {
            prod.handlePubMsgResponse(resp);
        } else {
            this.Trace.warn(String.format("PubFlowManager got response for FlowId %s, no such active flow.", flowId));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleAsyncCloseFlow(WireMessage message) {
        AssuredCtrlHeaderBean actrl_bean = (AssuredCtrlHeaderBean)message.getHeaderBean();
        SmfTLVParameter flowId_param = (SmfTLVParameter)actrl_bean.findFirstParameter(6);
        long flowId = NetworkByteOrderNumberUtil.fourByteToUInt(flowId_param.value);
        JCSMPXMLMessageProducer prod = null;
        Map<Long, JCSMPXMLMessageProducer> map = this.activeFlows;
        synchronized (map) {
            prod = this.activeFlows.get(flowId);
        }
        if (prod != null) {
            prod.handleAsyncCloseFlow(message);
        } else {
            this.Trace.debug(String.format("PubFlowManager got CloseFlow for FlowId %s, no such active flow.", flowId));
        }
    }

    @Override
    public void handlePubMsgSent(JCSMPXMLMessage xmlMsg, JCSMPXMLMessageProducer prod) {
        if (prod != null) {
            prod.handlePubMsgSent(xmlMsg, prod);
        }
    }
}

