package com.refinitiv.eta.valueadd.reactor;

import com.refinitiv.eta.codec.Buffer;
import com.refinitiv.eta.codec.CodecFactory;
import com.refinitiv.eta.codec.DecodeIterator;
import com.refinitiv.eta.codec.EncodeIterator;
import com.refinitiv.eta.codec.GenericMsg;
import com.refinitiv.eta.codec.Msg;
import com.refinitiv.eta.codec.RefreshMsg;
import com.refinitiv.eta.codec.State;
import com.refinitiv.eta.codec.StatusMsg;
import com.refinitiv.eta.transport.Error;
import com.refinitiv.eta.transport.TransportFactory;
import com.refinitiv.eta.valueadd.domainrep.rdm.queue.QueueMsgFactory;
import com.refinitiv.eta.valueadd.domainrep.rdm.queue.QueueRequest;
import com.refinitiv.eta.valueadd.reactor.TunnelStreamMsg;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.charset.Charset;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/refinitiv/eta/valueadd/reactor/TunnelSubstream.class */
public class TunnelSubstream {
    TunnelStream _tunnelStream;
    TunnelStreamPersistenceFile _persistFile;
    QueueRequestImpl _queueRequest;
    QueueRefreshImpl _queueRefresh;
    QueueStatusImpl _queueStatus;
    QueueAckImpl _queueAck;
    QueueDataImpl _queueData;
    QueueDataExpiredImpl _queueDataExpired;
    QueueCloseImpl _queueClose;
    GenericMsg _genericMsg;
    Msg _msg;
    Msg _substreamMsg;
    DecodeIterator _dIter;
    int _streamId;
    int _domainType;
    int _serviceId;
    Buffer _queueName;
    int _lastOutSeqNum;
    int _lastInSeqNum;
    EncodeIterator _encIter;
    DecodeIterator _decIter;
    TunnelSubstreamState _state;
    Msg _encSubMsg;
    Error _error;
    ByteBuffer _byteBuffer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/refinitiv/eta/valueadd/reactor/TunnelSubstream$TunnelSubstreamState.class */
    public enum TunnelSubstreamState {
        NOT_OPEN,
        WAITING_SUBSTREAM_REFRESH,
        OPEN
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TunnelSubstream(Buffer buffer, int i, int i2, int i3, TunnelStream tunnelStream, Error error) {
        this._tunnelStream = tunnelStream;
        this._queueName = CodecFactory.createBuffer();
        this._queueName.data(ByteBuffer.allocateDirect(buffer.length()));
        buffer.copy(this._queueName);
        this._streamId = i;
        this._domainType = i2;
        this._serviceId = i3;
        this._queueRequest = (QueueRequestImpl) QueueMsgFactory.createQueueRequest();
        this._queueRefresh = (QueueRefreshImpl) QueueMsgFactory.createQueueRefresh();
        this._queueStatus = (QueueStatusImpl) QueueMsgFactory.createQueueStatus();
        this._queueAck = (QueueAckImpl) QueueMsgFactory.createQueueAck();
        this._queueData = (QueueDataImpl) QueueMsgFactory.createQueueData();
        this._queueDataExpired = (QueueDataExpiredImpl) QueueMsgFactory.createQueueDataExpired();
        this._queueClose = (QueueCloseImpl) QueueMsgFactory.createQueueClose();
        this._substreamMsg = CodecFactory.createMsg();
        this._msg = CodecFactory.createMsg();
        this._dIter = CodecFactory.createDecodeIterator();
        error.errorId(0);
        this._state = TunnelSubstreamState.NOT_OPEN;
        this._encIter = CodecFactory.createEncodeIterator();
        this._decIter = CodecFactory.createDecodeIterator();
        this._encSubMsg = CodecFactory.createMsg();
        this._error = TransportFactory.createError();
        this._genericMsg = CodecFactory.createMsg();
        this._genericMsg.msgClass(7);
        this._byteBuffer = ByteBuffer.allocateDirect(4);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TunnelSubstream(Buffer buffer, int i, int i2, int i3, String str, TunnelStream tunnelStream, Error error) {
        this(buffer, i, i2, i3, tunnelStream, error);
        int defaultPersistenceVersion;
        if (error.errorId() != 0) {
            return;
        }
        this._tunnelStream = tunnelStream;
        byte[] bArr = new byte[buffer.length()];
        int position = buffer.position();
        boolean forceFileReset = this._tunnelStream.forceFileReset();
        for (int i4 = 0; i4 < buffer.length(); i4++) {
            bArr[i4] = buffer.data().get(position + i4);
        }
        File file = new File(str, new String(bArr, Charset.forName("UTF-8")));
        forceFileReset = file.exists() ? forceFileReset : true;
        try {
            RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
            FileChannel channel = randomAccessFile.getChannel();
            try {
                FileLock tryLock = channel.tryLock();
                if (tryLock == null) {
                    channel.close();
                    randomAccessFile.close();
                    error.errorId(-1);
                    error.text("Persistence file already in use.");
                    return;
                }
                if (forceFileReset) {
                    defaultPersistenceVersion = TunnelStreamPersistenceFile.defaultPersistenceVersion();
                } else {
                    this._byteBuffer.position(0);
                    try {
                        channel.read(this._byteBuffer, 0L);
                    } catch (IOException e) {
                        error.errorId(-1);
                        error.text("Caught IOException while reading persistence file version.");
                    }
                    defaultPersistenceVersion = this._byteBuffer.getInt(0);
                }
                switch (defaultPersistenceVersion) {
                    case 1:
                        this._persistFile = new TunnelStreamPersistenceFileV1(this, randomAccessFile, channel, tryLock, this._msg, this._encIter, this._dIter, forceFileReset, error);
                        if (error.errorId() != 0) {
                            this._persistFile = null;
                            return;
                        }
                        return;
                    case 33554432:
                        this._persistFile = new TunnelStreamPersistenceFileV2(this, randomAccessFile, channel, tryLock, this._msg, this._encIter, this._dIter, forceFileReset, error);
                        if (error.errorId() != 0) {
                            this._persistFile = null;
                            return;
                        }
                        return;
                    default:
                        try {
                            channel.close();
                            randomAccessFile.close();
                            error.text("Invalid persistence file version.");
                            error.errorId(-1);
                            return;
                        } catch (IOException e2) {
                            error.text("Caught IOException while closing persistence file (due to Invalid persistence file version).");
                            error.errorId(-1);
                            return;
                        }
                }
            } catch (OverlappingFileLockException e3) {
                channel.close();
                randomAccessFile.close();
                error.errorId(-1);
                error.text("Persistence file already in use.");
            }
        } catch (IOException e4) {
            error.errorId(-1);
            error.text("Failed to open persistence file.");
        }
    }

    int lastOutSeqNum() {
        return this._lastOutSeqNum;
    }

    void lastOutSeqNum(int i) {
        this._lastOutSeqNum = i;
        if (this._persistFile != null) {
            this._persistFile.lastOutSeqNum(i);
        }
    }

    int lastInSeqNum() {
        return this._lastInSeqNum;
    }

    void lastInSeqNum(int i) {
        this._lastInSeqNum = i;
        if (this._persistFile != null) {
            this._persistFile.lastInSeqNum(i);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int close(Error error) {
        this._state = TunnelSubstreamState.NOT_OPEN;
        if (this._persistFile == null) {
            return 0;
        }
        if (this._persistFile.close(error) != 0) {
            return error.errorId();
        }
        this._persistFile = null;
        return 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendQueueAckToListener(TunnelStreamBuffer tunnelStreamBuffer) {
        tunnelStreamBuffer.setAsInnerReadBuffer();
        this._decIter.clear();
        this._decIter.setBufferAndRWFVersion(tunnelStreamBuffer, this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
        this._genericMsg.decode(this._decIter);
        this._queueData.decode(this._decIter, this._genericMsg);
        this._queueAck.clear();
        this._queueAck.sourceName().data(this._queueData.destName().data(), this._queueData.destName().position(), this._queueData.destName().length());
        this._queueAck.destName().data(this._queueData.sourceName().data(), this._queueData.sourceName().position(), this._queueData.sourceName().length());
        this._queueAck.domainType(this._queueData.domainType());
        this._queueAck.identifier(this._queueData.identifier());
        this._queueAck.serviceId(this._queueData.serviceId());
        this._queueAck.streamId(this._streamId);
        TunnelStreamBuffer buffer = this._tunnelStream.getBuffer(this._queueAck.ackMsgBufferSize(), false, false, this._error);
        this._encIter.clear();
        this._encIter.setBufferAndRWFVersion(buffer, this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
        this._queueAck.encode(this._encIter);
        this._decIter.clear();
        this._decIter.setBufferAndRWFVersion(buffer, this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
        this._genericMsg.clear();
        this._genericMsg.decode(this._decIter);
        this._tunnelStream.queueMsgAcknowledged(this._queueAck, this._genericMsg);
        this._tunnelStream.releaseBuffer(buffer, this._error);
    }

    Buffer queueName() {
        return this._queueName;
    }

    int streamId() {
        return this._streamId;
    }

    int domainType() {
        return this._domainType;
    }

    int serviceId() {
        return this._serviceId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int sendSubstreamRequest(QueueRequest queueRequest, Error error) {
        this._queueRequest.clear();
        this._queueRequest.streamId(queueRequest.streamId());
        this._queueRequest.domainType(queueRequest.domainType());
        this._queueRequest.serviceId(this._tunnelStream.serviceId());
        this._queueRequest.lastOutSeqNum(lastOutSeqNum());
        this._queueRequest.lastInSeqNum(lastInSeqNum());
        this._queueRequest.sourceName(queueRequest.sourceName());
        this._queueRequest.opCode(((QueueRequestImpl) queueRequest).opCode());
        TunnelStreamBuffer buffer = this._tunnelStream.getBuffer(128 + this._queueRequest.requestMsgBufferSize(), false, true, error);
        if (buffer == null) {
            return error.errorId();
        }
        this._encIter.clear();
        this._encIter.setBufferAndRWFVersion(buffer, this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
        int encode = this._queueRequest.encode(this._encIter);
        if (encode < 0) {
            this._tunnelStream.releaseBuffer(buffer, error);
            error.errorId(encode);
            error.text("Substream request encode failed");
            return -1;
        }
        buffer.setCurrentPositionAsEndOfEncoding();
        if ((this._tunnelStream._traceFlags & 1) > 0) {
            System.out.println("<!-- TunnelTrace: Sending substream request. -->");
        }
        this._tunnelStream._outboundTransmitList.push(buffer, TunnelStreamBuffer.RETRANS_LINK);
        this._tunnelStream.tunnelStreamManager().addTunnelStreamToDispatchList(this._tunnelStream);
        this._state = TunnelSubstreamState.WAITING_SUBSTREAM_REFRESH;
        error.errorId(0);
        return 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int readMsg(Msg msg, Error error) {
        int sendLocalQueueAcks;
        int retransmitBuffers;
        switch (this._state) {
            case WAITING_SUBSTREAM_REFRESH:
                if (msg.containerType() != 141) {
                    error.errorId(-1);
                    error.text("Received unexpected container type " + msg.containerType() + " while establishing substream.");
                    return -1;
                }
                this._decIter.clear();
                this._decIter.setBufferAndRWFVersion(msg.encodedDataBody(), this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
                int decode = this._encSubMsg.decode(this._decIter);
                if (decode < 0) {
                    error.errorId(decode);
                    error.text("Failed to decode substream Msg.");
                    return -1;
                }
                switch (this._encSubMsg.msgClass()) {
                    case 2:
                        Msg msg2 = (RefreshMsg) this._encSubMsg;
                        State state = msg2.state();
                        switch (state.streamState()) {
                            case 1:
                                if (state.dataState() == 1) {
                                    this._state = TunnelSubstreamState.OPEN;
                                    break;
                                } else {
                                    return 0;
                                }
                            default:
                                this._tunnelStream._streamIdtoQueueSubstreamTable.remove(Integer.valueOf(this._streamId));
                                int close = close(error);
                                if (close != 0) {
                                    return close;
                                }
                                break;
                        }
                        int decode2 = this._queueRefresh.decode(this._decIter, msg2);
                        if (decode2 != 0) {
                            error.errorId(decode2);
                            error.text("Failed to decode substream refresh header.");
                            return -1;
                        }
                        if (state.streamState() != 1) {
                            this._tunnelStream.queueMsgReceived(this._queueRefresh, msg2);
                            return 0;
                        }
                        if (this._persistFile != null && (retransmitBuffers = this._persistFile.retransmitBuffers(this._queueRefresh.lastInSeqNum(), this._msg, this._encIter, this._dIter, error)) != 0) {
                            return retransmitBuffers;
                        }
                        lastOutSeqNum(this._queueRefresh.lastInSeqNum());
                        if (lastInSeqNum() == 0 || this._queueRefresh.lastOutSeqNum() == 0) {
                            lastInSeqNum(this._queueRefresh.lastOutSeqNum());
                        }
                        this._tunnelStream.queueMsgReceived(this._queueRefresh, msg2);
                        if (state.streamState() != 1) {
                            return 0;
                        }
                        if (this._persistFile != null && (sendLocalQueueAcks = this._persistFile.sendLocalQueueAcks(this._encIter, this._decIter, error)) != 0) {
                            return sendLocalQueueAcks;
                        }
                        break;
                    case 3:
                        Msg msg3 = (StatusMsg) this._encSubMsg;
                        State state2 = msg3.state();
                        if (!msg3.checkHasState()) {
                            this._queueStatus.clear();
                            this._queueStatus.streamId(msg3.streamId());
                            this._queueStatus.domainType(msg3.domainType());
                            this._tunnelStream.queueMsgReceived(this._queueStatus, msg3);
                            break;
                        } else {
                            switch (state2.streamState()) {
                                case 1:
                                    if (state2.dataState() != 1) {
                                        return 0;
                                    }
                                    break;
                                default:
                                    this._tunnelStream._streamIdtoQueueSubstreamTable.remove(Integer.valueOf(this._streamId));
                                    int close2 = close(error);
                                    if (close2 != 0) {
                                        return close2;
                                    }
                                    break;
                            }
                            int decode3 = this._queueStatus.decode(this._decIter, msg3);
                            if (decode3 != 0) {
                                error.errorId(decode3);
                                error.text("Failed to decode substream refresh header.");
                                return -1;
                            }
                            this._tunnelStream.queueMsgReceived(this._queueStatus, msg3);
                            if (state2.streamState() != 1) {
                                return 0;
                            }
                        }
                        break;
                    default:
                        error.errorId(-1);
                        error.text("Received unexpected substream MsgClass " + this._encSubMsg.msgClass() + " while establishing substream.");
                        return -1;
                }
                if ((this._tunnelStream._traceFlags & 1) > 0) {
                    System.out.println("<!-- TunnelTrace: Substream established on stream " + msg.streamId() + ", queue ready -->");
                }
                this._tunnelStream._recvLastSeqNum = ((TunnelStreamMsg.TunnelStreamData) this._tunnelStream._tunnelStreamMsg).seqNum();
                this._tunnelStream.tunnelStreamManager().addTunnelStreamToDispatchList(this._tunnelStream);
                return 0;
            case OPEN:
                if (msg.containerType() != 141) {
                    error.errorId(-1);
                    error.text("Unexpected container type: " + msg.containerType());
                    return -1;
                }
                this._decIter.clear();
                this._decIter.setBufferAndRWFVersion(msg.encodedDataBody(), this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
                int decode4 = this._encSubMsg.decode(this._decIter);
                if (decode4 < 0) {
                    error.errorId(decode4);
                    error.text("Failed to decode substream message.");
                    return -1;
                }
                switch (this._encSubMsg.msgClass()) {
                    case 7:
                        switch (getSubstreamOpcode(this._encSubMsg)) {
                            case 1:
                                int decode5 = this._queueData.decode(this._decIter, this._encSubMsg);
                                if (decode5 != 0) {
                                    error.errorId(decode5);
                                    error.text("Substream header decode failed");
                                    return -1;
                                }
                                lastInSeqNum(this._queueData.seqNum());
                                error.errorId(0);
                                this._tunnelStream.queueMsgReceived(this._queueData, this._encSubMsg);
                                this._queueAck.clear();
                                this._queueAck.streamId(streamId());
                                this._queueAck.domainType(domainType());
                                this._queueAck.serviceId(this._tunnelStream.serviceId());
                                this._queueAck.seqNum(this._queueData.seqNum());
                                this._queueAck.sourceName().data(this._queueData.destName().data().duplicate(), this._queueData.destName().position(), this._queueData.destName().length());
                                this._queueAck.destName().data(this._queueData.sourceName().data().duplicate(), this._queueData.sourceName().position(), this._queueData.sourceName().length());
                                this._queueAck.identifier(this._queueData.identifier());
                                TunnelStreamBuffer buffer = this._tunnelStream.getBuffer(this._queueAck.ackMsgBufferSize(), false, true, error);
                                if (buffer == null) {
                                    return error.errorId();
                                }
                                buffer.isApplicationBuffer(false);
                                this._encIter.clear();
                                this._encIter.setBufferAndRWFVersion(buffer, this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
                                int encode = this._queueAck.encode(this._encIter);
                                if (encode != 0) {
                                    this._tunnelStream.releaseBuffer(buffer, error);
                                    error.errorId(encode);
                                    error.text("Substream ack header encode failed.");
                                    return -1;
                                }
                                buffer.setCurrentPositionAsEndOfEncoding();
                                this._tunnelStream._outboundTransmitList.push(buffer, TunnelStreamBuffer.RETRANS_LINK);
                                this._tunnelStream.tunnelStreamManager().addTunnelStreamToDispatchList(this._tunnelStream);
                                return 0;
                            case 2:
                                int decode6 = this._queueAck.decode(this._decIter, this._encSubMsg);
                                if (decode6 == 0) {
                                    int seqNum = this._queueAck.seqNum();
                                    this._tunnelStream.queueMsgAcknowledged(this._queueAck, this._encSubMsg);
                                    if (this._persistFile != null) {
                                        this._persistFile.releasePersistenceBuffers(seqNum);
                                        break;
                                    }
                                } else {
                                    error.errorId(decode6);
                                    error.text("Substream header decode failed");
                                    return -1;
                                }
                                break;
                            case 3:
                                break;
                            case 4:
                                int decode7 = this._queueDataExpired.decode(this._decIter, this._encSubMsg);
                                if (decode7 != 0) {
                                    error.errorId(decode7);
                                    error.text("Substream header decode failed");
                                    return -1;
                                }
                                lastInSeqNum(this._queueDataExpired.seqNum());
                                error.errorId(0);
                                this._tunnelStream.queueMsgReceived(this._queueDataExpired, this._encSubMsg);
                                this._queueAck.clear();
                                this._queueAck.streamId(streamId());
                                this._queueAck.domainType(domainType());
                                this._queueAck.serviceId(this._tunnelStream.serviceId());
                                this._queueAck.seqNum(this._queueDataExpired.seqNum());
                                this._queueAck.sourceName().data(queueName().data());
                                this._queueAck.destName().data(this._queueDataExpired.sourceName().data(), this._queueDataExpired.sourceName().position(), this._queueDataExpired.sourceName().length());
                                this._queueAck.identifier(this._queueDataExpired.identifier());
                                TunnelStreamBuffer buffer2 = this._tunnelStream.getBuffer(this._queueAck.ackMsgBufferSize(), false, true, error);
                                if (buffer2 == null) {
                                    return error.errorId();
                                }
                                buffer2.isApplicationBuffer(false);
                                this._encIter.clear();
                                this._encIter.setBufferAndRWFVersion(buffer2, this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
                                int encode2 = this._queueAck.encode(this._encIter);
                                if (encode2 != 0) {
                                    this._tunnelStream.releaseBuffer(buffer2, error);
                                    error.errorId(encode2);
                                    error.text("Substream ack header encode failed.");
                                    return -1;
                                }
                                buffer2.setCurrentPositionAsEndOfEncoding();
                                this._tunnelStream._outboundTransmitList.push(buffer2, TunnelStreamBuffer.RETRANS_LINK);
                                this._tunnelStream.tunnelStreamManager().addTunnelStreamToDispatchList(this._tunnelStream);
                                return 0;
                            default:
                                error.errorId(-1);
                                error.text("Unhandled substream header opcode.");
                                return -1;
                        }
                        this._tunnelStream.tunnelStreamManager().addTunnelStreamToDispatchList(this._tunnelStream);
                        return 0;
                    default:
                        error.errorId(-1);
                        error.text("Unhandled substream message class.");
                        return -1;
                }
            case NOT_OPEN:
                error.errorId(0);
                return 0;
            default:
                error.errorId(-1);
                error.text("Unknown queue substream state.");
                return -1;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int saveMsg(TunnelStreamBuffer tunnelStreamBuffer, Error error) {
        if (this._persistFile != null) {
            return this._persistFile.saveMsg(tunnelStreamBuffer, error);
        }
        tunnelStreamBuffer.persistenceBuffer(this, null);
        return 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int setBufferAsTransmitted(TunnelStreamBuffer tunnelStreamBuffer, Error error) {
        int lastOutSeqNum = lastOutSeqNum() + 1;
        TunnelStreamPersistenceBuffer persistenceBuffer = tunnelStreamBuffer.persistenceBuffer();
        if (persistenceBuffer != null) {
            this._persistFile.setBufferAsTransmitted(persistenceBuffer);
            tunnelStreamBuffer.persistenceBuffer(null, null);
        } else {
            lastOutSeqNum = lastOutSeqNum() + 1;
        }
        lastOutSeqNum(lastOutSeqNum);
        tunnelStreamBuffer.setAsInnerReadBuffer();
        this._encIter.clear();
        this._encIter.setBufferAndRWFVersion(tunnelStreamBuffer, this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
        int replaceSeqNum = this._encIter.replaceSeqNum(lastOutSeqNum);
        if (replaceSeqNum != 0) {
            error.errorId(replaceSeqNum);
            error.text("Failed to update sequence number on substream message.");
            return -1;
        }
        if (!tunnelStreamBuffer.timeoutIsCode()) {
            long timeoutNsec = tunnelStreamBuffer.timeoutNsec() - System.nanoTime();
            if (timeoutNsec < 1000000) {
                timeoutNsec = 1000000;
            }
            TunnelStreamUtil.replaceQueueDataTimeout(tunnelStreamBuffer.data(), timeoutNsec / 1000000);
        }
        tunnelStreamBuffer.setToFullWritebuffer();
        return 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releasePersistenceBuffer(TunnelStreamPersistenceBuffer tunnelStreamPersistenceBuffer) {
        if (this._persistFile != null) {
            this._persistFile.releasePersistenceBuffer(tunnelStreamPersistenceBuffer);
        }
    }

    private int getSubstreamOpcode(Msg msg) {
        byte b = 0;
        if (msg.extendedHeader() != null) {
            b = msg.extendedHeader().data().get(msg.extendedHeader().position());
        }
        return b;
    }
}
