/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.protocol.v0_10;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.protocol.v0_10.CreditCreditManager;
import org.apache.qpid.server.protocol.v0_10.ExplicitAcceptDispositionChangeListener;
import org.apache.qpid.server.protocol.v0_10.FlowCreditManager_0_10;
import org.apache.qpid.server.protocol.v0_10.ImplicitAcceptDispositionChangeListener;
import org.apache.qpid.server.protocol.v0_10.MessageAcceptCompletionListener;
import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
import org.apache.qpid.server.protocol.v0_10.ServerSession;
import org.apache.qpid.server.protocol.v0_10.WindowCreditManager;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Option;
import org.apache.qpid.util.ByteBufferUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerTarget_0_10
extends AbstractConsumerTarget
implements FlowCreditManager.FlowCreditManagerListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_0_10.class);
    private static final Option[] BATCHED = new Option[]{Option.BATCH};
    private final AtomicBoolean _deleted = new AtomicBoolean(false);
    private final String _name;
    private final String _targetAddress;
    private FlowCreditManager_0_10 _creditManager;
    private final MessageAcceptMode _acceptMode;
    private final MessageAcquireMode _acquireMode;
    private MessageFlowMode _flowMode;
    private final ServerSession _session;
    private final AtomicBoolean _stopped = new AtomicBoolean(true);
    private final AtomicLong _unacknowledgedCount = new AtomicLong(0L);
    private final AtomicLong _unacknowledgedBytes = new AtomicLong(0L);
    private int _deferredMessageCredit;
    private long _deferredSizeCredit;
    private final StateChangeListener<MessageInstance, MessageInstance.EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, MessageInstance.EntryState>(){

        public void stateChanged(MessageInstance entry, MessageInstance.EntryState oldState, MessageInstance.EntryState newState) {
            if (this.isConsumerAcquiredStateForThis(oldState) && !this.isConsumerAcquiredStateForThis(newState)) {
                ConsumerTarget_0_10.this.removeUnacknowledgedMessage(entry);
                entry.removeStateChangeListener((StateChangeListener)this);
            }
        }

        private boolean isConsumerAcquiredStateForThis(MessageInstance.EntryState state) {
            return state instanceof MessageInstance.ConsumerAcquiredState && ((MessageInstance.ConsumerAcquiredState)state).getConsumer().getTarget() == ConsumerTarget_0_10.this;
        }
    };
    private final AddMessageDispositionListenerAction _postIdSettingAction;

    public ConsumerTarget_0_10(ServerSession session, String name, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, MessageFlowMode flowMode, FlowCreditManager_0_10 creditManager, Map<String, Object> arguments, boolean multiQueue) {
        super(ConsumerTarget.State.SUSPENDED, ConsumerTarget_0_10.isPullOnly(arguments), multiQueue, (AMQPConnection)session.getAMQPConnection());
        this._session = session;
        this._postIdSettingAction = new AddMessageDispositionListenerAction(session);
        this._acceptMode = acceptMode;
        this._acquireMode = acquireMode;
        this._creditManager = creditManager;
        this._flowMode = flowMode;
        this._creditManager.addStateListener(this);
        this._name = name;
        this._targetAddress = arguments != null && arguments.containsKey("local-address") ? String.valueOf(arguments.get("local-address")) : name;
    }

    private static boolean isPullOnly(Map<String, Object> arguments) {
        return arguments != null && arguments.containsKey("x-pull-only") && Boolean.valueOf(String.valueOf(arguments.get("x-pull-only"))) != false;
    }

    public boolean isFlowSuspended() {
        return this.getState() != ConsumerTarget.State.ACTIVE || this._deleted.get() || this._session.isClosing() || this._session.getAMQPConnection().isConnectionStopped();
    }

    protected void doCloseInternal() {
        this._creditManager.removeListener(this);
    }

    public void creditStateChanged(boolean hasCredit) {
        if (hasCredit) {
            if (!this.updateState(ConsumerTarget.State.SUSPENDED, ConsumerTarget.State.ACTIVE)) {
                this.notifyCurrentState();
            }
        } else {
            this.updateState(ConsumerTarget.State.ACTIVE, ConsumerTarget.State.SUSPENDED);
        }
    }

    public String getName() {
        return this._name;
    }

    public void transportStateChanged() {
        this._creditManager.restoreCredit(0L, 0L);
    }

    public void doSend(ConsumerImpl consumer, MessageInstance entry, boolean batch) {
        MessageTransfer xfr;
        MessageTransferMessage msg;
        ServerMessage serverMsg = entry.getMessage();
        MessageProperties messageProps = null;
        if (serverMsg instanceof MessageTransferMessage) {
            msg = (MessageTransferMessage)serverMsg;
        } else {
            MessageConverter converter = MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
            msg = (MessageTransferMessage)converter.convert(serverMsg, this._session.getAddressSpace());
        }
        DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
        messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
        DeliveryProperties deliveryProps = new DeliveryProperties();
        if (origDeliveryProps != null) {
            if (origDeliveryProps.hasDeliveryMode()) {
                deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
            }
            if (origDeliveryProps.hasExchange()) {
                deliveryProps.setExchange(origDeliveryProps.getExchange());
            }
            if (origDeliveryProps.hasExpiration()) {
                deliveryProps.setExpiration(origDeliveryProps.getExpiration());
            }
            if (origDeliveryProps.hasPriority()) {
                deliveryProps.setPriority(origDeliveryProps.getPriority());
            }
            if (origDeliveryProps.hasRoutingKey()) {
                deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
            }
            if (origDeliveryProps.hasTimestamp()) {
                deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
            }
            if (origDeliveryProps.hasTtl()) {
                deliveryProps.setTtl(origDeliveryProps.getTtl());
            }
        }
        deliveryProps.setRedelivered(entry.isRedelivered());
        boolean msgCompressed = messageProps != null && "gzip".equals(messageProps.getContentEncoding());
        Collection<QpidByteBuffer> bodyBuffers = msg.getBody();
        boolean compressionSupported = this._session.getConnection().getConnectionDelegate().isCompressionSupported();
        if (msgCompressed && !compressionSupported && bodyBuffers != null) {
            Collection<QpidByteBuffer> uncompressedBuffers = this.inflateIfPossible(bodyBuffers);
            messageProps.setContentEncoding(null);
            for (QpidByteBuffer buf : bodyBuffers) {
                buf.dispose();
            }
            bodyBuffers = uncompressedBuffers;
        } else if (!msgCompressed && compressionSupported && (messageProps == null || messageProps.getContentEncoding() == null) && bodyBuffers != null && ByteBufferUtils.remaining(bodyBuffers) > this._session.getConnection().getMessageCompressionThreshold()) {
            Collection<QpidByteBuffer> compressedBuffers = this.deflateIfPossible(bodyBuffers);
            if (messageProps == null) {
                messageProps = new MessageProperties();
            }
            messageProps.setContentEncoding("gzip");
            for (QpidByteBuffer buf : bodyBuffers) {
                buf.dispose();
            }
            bodyBuffers = compressedBuffers;
        }
        Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
        MessageTransfer messageTransfer = xfr = batch ? new MessageTransfer(this._name, this._acceptMode, this._acquireMode, header, bodyBuffers, BATCHED) : new MessageTransfer(this._name, this._acceptMode, this._acquireMode, header, bodyBuffers, new Option[0]);
        if (bodyBuffers != null) {
            for (QpidByteBuffer buf : bodyBuffers) {
                buf.dispose();
            }
            bodyBuffers = null;
        }
        if (this._acceptMode == MessageAcceptMode.NONE && this._acquireMode != MessageAcquireMode.PRE_ACQUIRED) {
            xfr.setCompletionListener((Method.CompletionListener)new MessageAcceptCompletionListener(this, consumer, this._session, entry, this._flowMode == MessageFlowMode.WINDOW));
        } else if (this._flowMode == MessageFlowMode.WINDOW) {
            final long messageSize = entry.getMessage().getSize();
            xfr.setCompletionListener(new Method.CompletionListener(){

                public void onComplete(Method method) {
                    ConsumerTarget_0_10.this.deferredAddCredit(1, messageSize);
                }
            });
        }
        this._postIdSettingAction.setXfr(xfr);
        if (this._acceptMode == MessageAcceptMode.EXPLICIT) {
            this._postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this, consumer));
        } else if (this._acquireMode != MessageAcquireMode.PRE_ACQUIRED) {
            this._postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this, consumer));
        } else {
            this._postIdSettingAction.setAction(null);
        }
        this._session.sendMessage(xfr, this._postIdSettingAction);
        xfr.dispose();
        this._postIdSettingAction.setAction(null);
        this._postIdSettingAction.setXfr(null);
        entry.incrementDeliveryCount();
        if (this._acceptMode == MessageAcceptMode.NONE && this._acquireMode == MessageAcquireMode.PRE_ACQUIRED) {
            this.forceDequeue(entry, false);
        } else if (this._acquireMode == MessageAcquireMode.PRE_ACQUIRED) {
            this.addUnacknowledgedMessage(entry);
        }
    }

    void addUnacknowledgedMessage(MessageInstance entry) {
        this._unacknowledgedCount.incrementAndGet();
        this._unacknowledgedBytes.addAndGet(entry.getMessage().getSize());
        entry.addStateChangeListener(this._unacknowledgedMessageListener);
    }

    private void removeUnacknowledgedMessage(MessageInstance entry) {
        this._unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
        this._unacknowledgedCount.decrementAndGet();
    }

    public void acquisitionRemoved(MessageInstance entry) {
    }

    private void deferredAddCredit(int deferredMessageCredit, long deferredSizeCredit) {
        this._deferredMessageCredit += deferredMessageCredit;
        this._deferredSizeCredit += deferredSizeCredit;
    }

    public void flushCreditState(boolean strict) {
        if (strict || !this.isFlowSuspended() || this._deferredMessageCredit >= 200 || !(this._creditManager instanceof WindowCreditManager) || ((WindowCreditManager)this._creditManager).getMessageCreditLimit() < 400L) {
            this._creditManager.restoreCredit(this._deferredMessageCredit, this._deferredSizeCredit);
            this._deferredMessageCredit = 0;
            this._deferredSizeCredit = 0L;
        }
    }

    private void forceDequeue(final MessageInstance entry, final boolean restoreCredit) {
        AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(this._session.getAddressSpace().getMessageStore());
        dequeueTxn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action(){

            public void postCommit() {
                if (restoreCredit) {
                    ConsumerTarget_0_10.this.restoreCredit(entry.getMessage());
                }
                entry.delete();
            }

            public void onRollback() {
            }
        });
    }

    void reject(ConsumerImpl consumer, MessageInstance entry) {
        entry.setRedelivered();
        if (entry.makeAcquisitionUnstealable(consumer)) {
            entry.routeToAlternate(null, null);
        }
    }

    void release(ConsumerImpl consumer, MessageInstance entry, boolean setRedelivered) {
        if (setRedelivered) {
            entry.setRedelivered();
        }
        if (this.getSessionModel().isClosing() || !setRedelivered) {
            entry.decrementDeliveryCount();
        }
        if (this.isMaxDeliveryLimitReached(entry)) {
            this.sendToDLQOrDiscard(consumer, entry);
        } else {
            entry.release(consumer);
        }
    }

    protected void sendToDLQOrDiscard(ConsumerImpl consumer, MessageInstance entry) {
        TransactionLogResource owningResource;
        final ServerMessage msg = entry.getMessage();
        int requeues = 0;
        if (entry.makeAcquisitionUnstealable(consumer)) {
            requeues = entry.routeToAlternate((Action)new Action<MessageInstance>(){

                public void performAction(MessageInstance requeueEntry) {
                    ConsumerTarget_0_10.this.getEventLogger().message(ChannelMessages.DEADLETTERMSG((Number)msg.getMessageNumber(), (String)requeueEntry.getOwningResource().getName()));
                }
            }, null);
        }
        if (requeues == 0 && (owningResource = entry.getOwningResource()) instanceof Queue) {
            Queue queue = (Queue)owningResource;
            Exchange alternateExchange = queue.getAlternateExchange();
            if (alternateExchange != null) {
                this.getEventLogger().message(ChannelMessages.DISCARDMSG_NOROUTE((Number)msg.getMessageNumber(), (String)alternateExchange.getName()));
            } else {
                this.getEventLogger().message(ChannelMessages.DISCARDMSG_NOALTEXCH((Number)msg.getMessageNumber(), (String)queue.getName(), (String)msg.getInitialRoutingAddress()));
            }
        }
    }

    protected EventLogger getEventLogger() {
        return this.getSessionModel().getAMQPConnection().getEventLogger();
    }

    private boolean isMaxDeliveryLimitReached(MessageInstance entry) {
        int maxDeliveryLimit = entry.getMaximumDeliveryCount();
        return maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit;
    }

    public void queueDeleted() {
        this._deleted.set(true);
    }

    public boolean allocateCredit(ServerMessage message) {
        return this._creditManager.useCreditForMessage(message.getSize());
    }

    public void restoreCredit(ServerMessage message) {
        this._creditManager.restoreCredit(1L, message.getSize());
    }

    public FlowCreditManager_0_10 getCreditManager() {
        return this._creditManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        try {
            this.getSendLock();
            this.updateState(ConsumerTarget.State.ACTIVE, ConsumerTarget.State.SUSPENDED);
            this._stopped.set(true);
            FlowCreditManager_0_10 creditManager = this.getCreditManager();
            creditManager.clearCredit();
        }
        finally {
            this.releaseSendLock();
        }
    }

    public void addCredit(MessageCreditUnit unit, long value) {
        FlowCreditManager_0_10 creditManager = this.getCreditManager();
        switch (unit) {
            case MESSAGE: {
                creditManager.addCredit(value, 0L);
                break;
            }
            case BYTE: {
                creditManager.addCredit(0L, value);
            }
        }
        this._stopped.set(false);
        if (creditManager.hasCredit()) {
            this.updateState(ConsumerTarget.State.SUSPENDED, ConsumerTarget.State.ACTIVE);
        }
    }

    public void setFlowMode(MessageFlowMode flowMode) {
        this._creditManager.removeListener(this);
        switch (flowMode) {
            case CREDIT: {
                this._creditManager = new CreditCreditManager(0L, 0L, (ProtocolEngine)this._session.getConnection().getAmqpConnection());
                break;
            }
            case WINDOW: {
                this._creditManager = new WindowCreditManager(0L, 0L, (ProtocolEngine)this._session.getConnection().getAmqpConnection());
                break;
            }
            default: {
                throw new ConnectionScopedRuntimeException("Unknown message flow mode: " + flowMode);
            }
        }
        this._flowMode = flowMode;
        this.updateState(ConsumerTarget.State.ACTIVE, ConsumerTarget.State.SUSPENDED);
        this._creditManager.addStateListener(this);
    }

    public boolean isStopped() {
        return this._stopped.get();
    }

    public void flush() {
        this.flushCreditState(true);
        for (ConsumerImpl consumer : this.getConsumers()) {
            consumer.flush();
        }
        this.stop();
    }

    public ServerSession getSessionModel() {
        return this._session;
    }

    public boolean isDurable() {
        return false;
    }

    public void queueEmpty() {
    }

    public void flushBatched() {
    }

    public String getTargetAddress() {
        return this._targetAddress;
    }

    public long getUnacknowledgedBytes() {
        return this._unacknowledgedBytes.longValue();
    }

    public long getUnacknowledgedMessages() {
        return this._unacknowledgedCount.longValue();
    }

    protected void processClosed() {
    }

    protected void processStateChanged() {
    }

    protected boolean hasStateChanged() {
        return false;
    }

    protected boolean hasClosed() {
        return false;
    }

    public String toString() {
        return "ConsumerTarget_0_10[name=" + this._name + ", session=" + this._session.toLogString() + "]";
    }

    private Collection<QpidByteBuffer> deflateIfPossible(Collection<QpidByteBuffer> buffers) {
        try {
            return QpidByteBuffer.deflate(buffers);
        }
        catch (IOException e) {
            LOGGER.warn("Unable to compress message payload for consumer with gzip, message will be sent as is", (Throwable)e);
            return null;
        }
    }

    private Collection<QpidByteBuffer> inflateIfPossible(Collection<QpidByteBuffer> buffers) {
        try {
            return QpidByteBuffer.inflate(buffers);
        }
        catch (IOException e) {
            LOGGER.warn("Unable to decompress message payload for consumer with gzip, message will be sent as is", (Throwable)e);
            return null;
        }
    }

    public static class AddMessageDispositionListenerAction
    implements Runnable {
        private MessageTransfer _xfr;
        private ServerSession.MessageDispositionChangeListener _action;
        private ServerSession _session;

        public AddMessageDispositionListenerAction(ServerSession session) {
            this._session = session;
        }

        public void setXfr(MessageTransfer xfr) {
            this._xfr = xfr;
        }

        public void setAction(ServerSession.MessageDispositionChangeListener action) {
            this._action = action;
        }

        @Override
        public void run() {
            if (this._action != null) {
                this._session.onMessageDispositionChange(this._xfr, this._action);
            }
        }
    }
}

