/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.qpid.protonj2.engine.impl;

import com.rabbitmq.qpid.protonj2.buffer.ProtonBuffer;
import com.rabbitmq.qpid.protonj2.engine.Connection;
import com.rabbitmq.qpid.protonj2.engine.ConnectionState;
import com.rabbitmq.qpid.protonj2.engine.EventHandler;
import com.rabbitmq.qpid.protonj2.engine.HeaderEnvelope;
import com.rabbitmq.qpid.protonj2.engine.Receiver;
import com.rabbitmq.qpid.protonj2.engine.Scheduler;
import com.rabbitmq.qpid.protonj2.engine.Sender;
import com.rabbitmq.qpid.protonj2.engine.Session;
import com.rabbitmq.qpid.protonj2.engine.SessionState;
import com.rabbitmq.qpid.protonj2.engine.TransactionManager;
import com.rabbitmq.qpid.protonj2.engine.exceptions.EngineFailedException;
import com.rabbitmq.qpid.protonj2.engine.exceptions.EngineStateException;
import com.rabbitmq.qpid.protonj2.engine.exceptions.ProtocolViolationException;
import com.rabbitmq.qpid.protonj2.engine.impl.ProtonEndpoint;
import com.rabbitmq.qpid.protonj2.engine.impl.ProtonEngine;
import com.rabbitmq.qpid.protonj2.engine.impl.ProtonSession;
import com.rabbitmq.qpid.protonj2.logging.ProtonLogger;
import com.rabbitmq.qpid.protonj2.logging.ProtonLoggerFactory;
import com.rabbitmq.qpid.protonj2.types.Symbol;
import com.rabbitmq.qpid.protonj2.types.transport.AMQPHeader;
import com.rabbitmq.qpid.protonj2.types.transport.AmqpError;
import com.rabbitmq.qpid.protonj2.types.transport.Attach;
import com.rabbitmq.qpid.protonj2.types.transport.Begin;
import com.rabbitmq.qpid.protonj2.types.transport.Close;
import com.rabbitmq.qpid.protonj2.types.transport.ConnectionError;
import com.rabbitmq.qpid.protonj2.types.transport.Detach;
import com.rabbitmq.qpid.protonj2.types.transport.Disposition;
import com.rabbitmq.qpid.protonj2.types.transport.End;
import com.rabbitmq.qpid.protonj2.types.transport.ErrorCondition;
import com.rabbitmq.qpid.protonj2.types.transport.Flow;
import com.rabbitmq.qpid.protonj2.types.transport.Open;
import com.rabbitmq.qpid.protonj2.types.transport.Performative;
import com.rabbitmq.qpid.protonj2.types.transport.Transfer;
import java.lang.ref.SoftReference;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;

public class ProtonConnection
extends ProtonEndpoint<Connection>
implements Connection,
AMQPHeader.HeaderHandler<ProtonEngine>,
Performative.PerformativeHandler<ProtonEngine> {
    private static final ProtonLogger LOG = ProtonLoggerFactory.getLogger(ProtonConnection.class);
    private final Open localOpen = new Open();
    private Open remoteOpen;
    private AMQPHeader remoteHeader;
    private Map<Integer, ProtonSession> localSessions = new LinkedHashMap<Integer, ProtonSession>();
    private Map<Integer, ProtonSession> remoteSessions = new LinkedHashMap<Integer, ProtonSession>();
    private Map<Integer, SoftReference<ProtonSession>> zombieSessions = new LinkedHashMap<Integer, SoftReference<ProtonSession>>();
    private ConnectionState localState = ConnectionState.IDLE;
    private ConnectionState remoteState = ConnectionState.IDLE;
    private boolean headerSent;
    private boolean localOpenSent;
    private boolean localCloseSent;
    private EventHandler<AMQPHeader> remoteHeaderHandler;
    private EventHandler<Session> remoteSessionOpenEventHandler;
    private EventHandler<Sender> remoteSenderOpenEventHandler;
    private EventHandler<Receiver> remoteReceiverOpenEventHandler;
    private EventHandler<TransactionManager> remoteTxnManagerOpenEventHandler;

    ProtonConnection(ProtonEngine engine) {
        super(engine);
        this.localOpen.setMaxFrameSize(65535);
    }

    @Override
    public Connection getParent() {
        return this;
    }

    @Override
    ProtonConnection self() {
        return this;
    }

    @Override
    public ConnectionState getState() {
        return this.localState;
    }

    @Override
    public ProtonConnection open() throws EngineStateException {
        if (this.getState() == ConnectionState.IDLE) {
            this.engine.checkShutdownOrFailed("Cannot open a connection when Engine is shutdown or failed.");
            this.localState = ConnectionState.ACTIVE;
            try {
                this.syncLocalStateWithRemote();
            }
            finally {
                this.fireLocalOpen();
                this.engine.handleLocalOpen(this);
            }
        }
        return this;
    }

    @Override
    public ProtonConnection close() throws EngineFailedException {
        if (this.getState() == ConnectionState.ACTIVE) {
            this.localState = ConnectionState.CLOSED;
            try {
                this.getEngine().checkFailed("Connection close called while engine .");
                this.syncLocalStateWithRemote();
            }
            finally {
                this.allSessions().forEach(session -> session.handleConnectionLocallyClosed(this));
                this.fireLocalClose();
            }
        }
        return this;
    }

    @Override
    public Connection negotiate() {
        return this.negotiate(header -> LOG.trace("Negotiation completed with remote returning AMQP Header: {}", header));
    }

    @Override
    public Connection negotiate(EventHandler<AMQPHeader> remoteAMQPHeaderHandler) {
        Objects.requireNonNull(remoteAMQPHeaderHandler, "Provided AMQP Header received handler cannot be null");
        this.checkConnectionClosed("Cannot start header negotiation on a closed connection");
        if (this.remoteHeader != null) {
            remoteAMQPHeaderHandler.handle(this.remoteHeader);
        } else {
            this.remoteHeaderHandler = remoteAMQPHeaderHandler;
        }
        this.syncLocalStateWithRemote();
        return this;
    }

    @Override
    public long tick(long current) {
        this.checkConnectionClosed("Cannot call tick on an already closed Connection");
        return this.engine.tick(current);
    }

    @Override
    public Connection tickAuto(ScheduledExecutorService executor) {
        this.checkConnectionClosed("Cannot call tickAuto on an already closed Connection");
        this.engine.tickAuto(executor);
        return this;
    }

    @Override
    public Connection tickAuto(Scheduler scheduler) {
        this.checkConnectionClosed("Cannot call tickAuto on an already closed Connection");
        this.engine.tickAuto(scheduler);
        return this;
    }

    @Override
    public boolean isLocallyClosed() {
        return this.getState() == ConnectionState.CLOSED;
    }

    @Override
    public boolean isRemotelyClosed() {
        return this.getRemoteState() == ConnectionState.CLOSED;
    }

    @Override
    public ProtonConnection setContainerId(String containerId) {
        this.checkNotOpened("Cannot set Container Id on already opened Connection");
        this.localOpen.setContainerId(containerId);
        return this;
    }

    @Override
    public String getContainerId() {
        return this.localOpen.getContainerId();
    }

    @Override
    public ProtonConnection setHostname(String hostname) {
        this.checkNotOpened("Cannot set Hostname on already opened Connection");
        this.localOpen.setHostname(hostname);
        return this;
    }

    @Override
    public String getHostname() {
        return this.localOpen.getHostname();
    }

    @Override
    public Connection setMaxFrameSize(long maxFrameSize) {
        this.checkNotOpened("Cannot set Max Frame Size on already opened Connection");
        if (maxFrameSize > Integer.MAX_VALUE) {
            throw new IllegalArgumentException(String.format("Given max frame size value %d larger than this implementations limit of %d", maxFrameSize, Integer.MAX_VALUE));
        }
        this.localOpen.setMaxFrameSize(maxFrameSize);
        return this;
    }

    @Override
    public long getMaxFrameSize() {
        return this.localOpen.getMaxFrameSize();
    }

    @Override
    public ProtonConnection setChannelMax(int channelMax) {
        this.checkNotOpened("Cannot set Channel Max on already opened Connection");
        this.localOpen.setChannelMax(channelMax);
        return this;
    }

    @Override
    public int getChannelMax() {
        return this.localOpen.getChannelMax();
    }

    @Override
    public ProtonConnection setIdleTimeout(long idleTimeout) {
        this.checkNotOpened("Cannot set Idle Timeout on already opened Connection");
        this.localOpen.setIdleTimeout(idleTimeout);
        return this;
    }

    @Override
    public long getIdleTimeout() {
        return this.localOpen.getIdleTimeout();
    }

    @Override
    public ProtonConnection setOfferedCapabilities(Symbol ... capabilities) {
        this.checkNotOpened("Cannot set Offered Capabilities on already opened Connection");
        if (capabilities != null) {
            this.localOpen.setOfferedCapabilities(Arrays.copyOf(capabilities, capabilities.length));
        } else {
            this.localOpen.setOfferedCapabilities(capabilities);
        }
        return this;
    }

    @Override
    public Symbol[] getOfferedCapabilities() {
        if (this.localOpen.getOfferedCapabilities() != null) {
            return Arrays.copyOf(this.localOpen.getOfferedCapabilities(), this.localOpen.getOfferedCapabilities().length);
        }
        return null;
    }

    @Override
    public ProtonConnection setDesiredCapabilities(Symbol ... capabilities) {
        this.checkNotOpened("Cannot set Desired Capabilities on already opened Connection");
        if (capabilities != null) {
            this.localOpen.setDesiredCapabilities(Arrays.copyOf(capabilities, capabilities.length));
        } else {
            this.localOpen.setDesiredCapabilities(capabilities);
        }
        return this;
    }

    @Override
    public Symbol[] getDesiredCapabilities() {
        if (this.localOpen.getDesiredCapabilities() != null) {
            return Arrays.copyOf(this.localOpen.getDesiredCapabilities(), this.localOpen.getDesiredCapabilities().length);
        }
        return null;
    }

    @Override
    public ProtonConnection setProperties(Map<Symbol, Object> properties) {
        this.checkNotOpened("Cannot set Properties on already opened Connection");
        if (properties != null) {
            this.localOpen.setProperties(new LinkedHashMap<Symbol, Object>(properties));
        } else {
            this.localOpen.setProperties(properties);
        }
        return this;
    }

    @Override
    public Map<Symbol, Object> getProperties() {
        if (this.localOpen.getProperties() != null) {
            return Collections.unmodifiableMap(this.localOpen.getProperties());
        }
        return null;
    }

    @Override
    public boolean isLocallyOpen() {
        return this.getState() == ConnectionState.ACTIVE;
    }

    @Override
    public boolean isRemotelyOpen() {
        return this.getRemoteState() == ConnectionState.ACTIVE;
    }

    @Override
    public String getRemoteContainerId() {
        return this.remoteOpen == null ? null : this.remoteOpen.getContainerId();
    }

    @Override
    public String getRemoteHostname() {
        return this.remoteOpen == null ? null : this.remoteOpen.getHostname();
    }

    @Override
    public long getRemoteMaxFrameSize() {
        return this.remoteOpen == null ? 512L : this.remoteOpen.getMaxFrameSize();
    }

    @Override
    public long getRemoteIdleTimeout() {
        return this.remoteOpen == null ? -1L : this.remoteOpen.getIdleTimeout();
    }

    @Override
    public Symbol[] getRemoteOfferedCapabilities() {
        if (this.remoteOpen != null && this.remoteOpen.getOfferedCapabilities() != null) {
            return Arrays.copyOf(this.remoteOpen.getOfferedCapabilities(), this.remoteOpen.getOfferedCapabilities().length);
        }
        return null;
    }

    @Override
    public Symbol[] getRemoteDesiredCapabilities() {
        if (this.remoteOpen != null && this.remoteOpen.getDesiredCapabilities() != null) {
            return Arrays.copyOf(this.remoteOpen.getDesiredCapabilities(), this.remoteOpen.getDesiredCapabilities().length);
        }
        return null;
    }

    @Override
    public Map<Symbol, Object> getRemoteProperties() {
        if (this.remoteOpen != null && this.remoteOpen.getProperties() != null) {
            return Collections.unmodifiableMap(this.remoteOpen.getProperties());
        }
        return null;
    }

    @Override
    public ConnectionState getRemoteState() {
        return this.remoteState;
    }

    @Override
    public ProtonSession session() throws IllegalStateException {
        this.checkConnectionClosed("Cannot create a Session from a Connection that is already closed");
        int localChannel = this.findFreeLocalChannel();
        ProtonSession newSession = new ProtonSession(this, localChannel);
        this.localSessions.put(localChannel, newSession);
        return newSession;
    }

    @Override
    public Set<Session> sessions() throws IllegalStateException {
        return Collections.unmodifiableSet(this.allSessions());
    }

    @Override
    public void handleAMQPHeader(AMQPHeader header, ProtonEngine context) {
        this.remoteHeader = header;
        if (this.remoteHeaderHandler != null) {
            this.remoteHeaderHandler.handle(this.remoteHeader);
            this.remoteHeaderHandler = null;
        }
        this.syncLocalStateWithRemote();
    }

    @Override
    public void handleSASLHeader(AMQPHeader header, ProtonEngine context) {
        context.engineFailed(new ProtocolViolationException("Received unexpected SASL Header"));
    }

    @Override
    public void handleOpen(Open open, ProtonBuffer payload, int channel, ProtonEngine context) {
        if (this.remoteOpen != null) {
            context.engineFailed(new ProtocolViolationException("Received second Open for Connection from remote"));
            return;
        }
        this.remoteState = ConnectionState.ACTIVE;
        this.remoteOpen = open;
        this.fireRemoteOpen();
        this.engine.handleRemoteOpen(this);
    }

    @Override
    public void handleClose(Close close, ProtonBuffer payload, int channel, ProtonEngine context) {
        this.remoteState = ConnectionState.CLOSED;
        this.setRemoteCondition(close.getError());
        this.allSessions().forEach(session -> session.handleConnectionRemotelyClosed(this));
        this.fireRemoteClose();
    }

    /*
     * Enabled aggressive block sorting
     */
    @Override
    public void handleBegin(Begin begin, ProtonBuffer payload, int channel, ProtonEngine context) {
        ProtonSession session = null;
        if (channel > this.localOpen.getChannelMax()) {
            ((Connection)this.setCondition(new ErrorCondition(ConnectionError.FRAMING_ERROR, "Channel Max Exceeded for session Begin"))).close();
            return;
        }
        if (this.remoteSessions.containsKey(channel)) {
            context.engineFailed(new ProtocolViolationException("Received second begin for Session from remote"));
            return;
        }
        if (begin.hasRemoteChannel()) {
            int localSessionChannel = begin.getRemoteChannel();
            session = this.localSessions.get(localSessionChannel);
            if (session == null) {
                if (!this.zombieSessions.containsKey(localSessionChannel)) {
                    ((Connection)this.setCondition(new ErrorCondition(AmqpError.PRECONDITION_FAILED, "No matching session found for remote channel given"))).close();
                    this.engine.engineFailed(new ProtocolViolationException("Received uncorrelated channel on Begin from remote: " + localSessionChannel));
                    return;
                }
                session = this.zombieSessions.get(localSessionChannel).get();
                if (session == null) return;
                this.zombieSessions.remove(localSessionChannel);
            }
        } else {
            session = this.session();
        }
        this.remoteSessions.put(channel, session);
        session.remoteBegin(begin, channel);
        if (session.getState() != SessionState.IDLE) return;
        if (this.remoteSessionOpenEventHandler == null) return;
        this.remoteSessionOpenEventHandler.handle(session);
    }

    @Override
    public void handleEnd(End end, ProtonBuffer payload, int channel, ProtonEngine context) {
        ProtonSession session = this.remoteSessions.remove(channel);
        if (session == null) {
            if (this.zombieSessions.remove(channel) == null) {
                this.engine.engineFailed(new ProtocolViolationException("Received uncorrelated channel on End from remote: " + channel));
            }
        } else {
            session.remoteEnd(end, channel);
        }
    }

    @Override
    public void handleAttach(Attach attach, ProtonBuffer payload, int channel, ProtonEngine context) {
        ProtonSession session = this.remoteSessions.get(channel);
        if (session == null) {
            this.engine.engineFailed(new ProtocolViolationException("Received uncorrelated channel on Attach from remote: " + channel));
        } else {
            session.remoteAttach(attach, channel);
        }
    }

    @Override
    public void handleDetach(Detach detach, ProtonBuffer payload, int channel, ProtonEngine context) {
        ProtonSession session = this.remoteSessions.get(channel);
        if (session == null) {
            this.engine.engineFailed(new ProtocolViolationException("Received uncorrelated channel on Detach from remote: " + channel));
        } else {
            session.remoteDetach(detach, channel);
        }
    }

    @Override
    public void handleFlow(Flow flow, ProtonBuffer payload, int channel, ProtonEngine context) {
        ProtonSession session = this.remoteSessions.get(channel);
        if (session == null) {
            this.engine.engineFailed(new ProtocolViolationException("Received uncorrelated channel on Flow from remote: " + channel));
        } else {
            session.remoteFlow(flow, channel);
        }
    }

    @Override
    public void handleTransfer(Transfer transfer, ProtonBuffer payload, int channel, ProtonEngine context) {
        ProtonSession session = this.remoteSessions.get(channel);
        if (session == null) {
            this.engine.engineFailed(new ProtocolViolationException("Received uncorrelated channel on Transfer from remote: " + channel));
        } else {
            session.remoteTransfer(transfer, payload, channel);
        }
    }

    @Override
    public void handleDisposition(Disposition disposition, ProtonBuffer payload, int channel, ProtonEngine context) {
        ProtonSession session = this.remoteSessions.get(channel);
        if (session == null) {
            this.engine.engineFailed(new ProtocolViolationException("Received uncorrelated channel on Disposition from remote: " + channel));
        } else {
            session.remoteDisposition(disposition, channel);
        }
    }

    @Override
    public ProtonConnection sessionOpenHandler(EventHandler<Session> remoteSessionOpenEventHandler) {
        this.remoteSessionOpenEventHandler = remoteSessionOpenEventHandler;
        return this;
    }

    @Override
    public ProtonConnection senderOpenHandler(EventHandler<Sender> remoteSenderOpenEventHandler) {
        this.remoteSenderOpenEventHandler = remoteSenderOpenEventHandler;
        return this;
    }

    EventHandler<Sender> senderOpenEventHandler() {
        return this.remoteSenderOpenEventHandler;
    }

    @Override
    public ProtonConnection receiverOpenHandler(EventHandler<Receiver> remoteReceiverOpenEventHandler) {
        this.remoteReceiverOpenEventHandler = remoteReceiverOpenEventHandler;
        return this;
    }

    EventHandler<Receiver> receiverOpenEventHandler() {
        return this.remoteReceiverOpenEventHandler;
    }

    @Override
    public ProtonConnection transactionManagerOpenHandler(EventHandler<TransactionManager> remoteTxnManagerOpenEventHandler) {
        this.remoteTxnManagerOpenEventHandler = remoteTxnManagerOpenEventHandler;
        return this;
    }

    EventHandler<TransactionManager> transactionManagerOpenHandler() {
        return this.remoteTxnManagerOpenEventHandler;
    }

    private void checkNotOpened(String errorMessage) {
        if (this.localState.ordinal() > ConnectionState.IDLE.ordinal()) {
            throw new IllegalStateException(errorMessage);
        }
    }

    private void checkConnectionClosed(String errorMessage) {
        if (this.isLocallyClosed() || this.isRemotelyClosed()) {
            throw new IllegalStateException(errorMessage);
        }
    }

    private void syncLocalStateWithRemote() {
        if (this.engine.isWritable()) {
            if (this.headerSent) {
                ConnectionState state = this.getState();
                if (state != ConnectionState.IDLE && this.remoteHeader != null) {
                    boolean resourceSyncNeeded = false;
                    if (!this.localOpenSent && !this.engine.isShutdown()) {
                        this.engine.fireWrite(this.localOpen, 0);
                        this.engine.configuration().recomputeEffectiveFrameSizeLimits();
                        this.localOpenSent = true;
                        resourceSyncNeeded = true;
                    }
                    if (this.isLocallyClosed() && !this.localCloseSent && !this.engine.isShutdown()) {
                        Close localClose = new Close().setError(this.getCondition());
                        this.engine.fireWrite(localClose, 0);
                        this.localCloseSent = true;
                        resourceSyncNeeded = false;
                    }
                    if (resourceSyncNeeded) {
                        this.allSessions().forEach(session -> session.trySyncLocalStateWithRemote());
                    }
                }
            } else if (this.remoteHeader != null || this.getState() == ConnectionState.ACTIVE || this.remoteHeaderHandler != null) {
                this.headerSent = true;
                this.engine.fireWrite(HeaderEnvelope.AMQP_HEADER_ENVELOPE);
            }
        }
    }

    void handleEngineStarted(ProtonEngine protonEngine) {
        this.syncLocalStateWithRemote();
    }

    void handleEngineShutdown(ProtonEngine protonEngine) {
        try {
            this.fireEngineShutdown();
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.allSessions().forEach(session -> session.handleEngineShutdown(protonEngine));
    }

    void handleEngineFailed(ProtonEngine protonEngine, Throwable cause) {
        if (this.localOpenSent && !this.localCloseSent) {
            this.localCloseSent = true;
            try {
                if (this.getCondition() == null) {
                    this.setCondition(this.errorConditionFromFailureCause(cause));
                }
                this.engine.fireWrite(new Close().setError(this.getCondition()), 0);
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }

    private ErrorCondition errorConditionFromFailureCause(Throwable cause) {
        Symbol condition;
        String description = cause.getMessage();
        if (cause instanceof ProtocolViolationException) {
            ProtocolViolationException error = (ProtocolViolationException)cause;
            condition = error.getErrorCondition();
        } else {
            condition = AmqpError.INTERNAL_ERROR;
        }
        return new ErrorCondition(condition, description);
    }

    private Set<ProtonSession> allSessions() {
        LinkedHashSet<ProtonSession> result;
        if (this.localSessions.isEmpty() && this.remoteSessions.isEmpty()) {
            result = Collections.EMPTY_SET;
        } else {
            result = new LinkedHashSet<ProtonSession>(this.localSessions.size());
            result.addAll(this.localSessions.values());
            result.addAll(this.remoteSessions.values());
        }
        return result;
    }

    private int findFreeLocalChannel() {
        int i;
        for (i = 0; i <= this.localOpen.getChannelMax(); ++i) {
            if (this.localSessions.containsKey(i) || this.zombieSessions.containsKey(i)) continue;
            return i;
        }
        for (i = 0; i <= this.localOpen.getChannelMax(); ++i) {
            if (this.localSessions.containsKey(i)) continue;
            return i;
        }
        throw new IllegalStateException("no local channel available for allocation");
    }

    void freeLocalChannel(int localChannel) {
        if (localChannel > 65535) {
            throw new IllegalArgumentException("Specified local channel is out of range: " + localChannel);
        }
        ProtonSession session = this.localSessions.remove(localChannel);
        if (session.getRemoteState() == SessionState.IDLE) {
            this.zombieSessions.put(localChannel, new SoftReference<ProtonSession>(session));
        }
    }

    boolean wasHeaderSent() {
        return this.headerSent;
    }

    boolean wasLocalOpenSent() {
        return this.localOpenSent;
    }

    boolean wasLocalCloseSent() {
        return this.localCloseSent;
    }
}

