/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.core.impl;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
import org.apache.activemq.artemis.api.core.ActiveMQRoutingException;
import org.apache.activemq.artemis.api.core.DisconnectReason;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V3;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.shaded.org.jboss.logging.Logger;
import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;

public class RemotingConnectionImpl
extends AbstractRemotingConnection
implements CoreRemotingConnection {
    private static final Logger logger = Logger.getLogger(RemotingConnectionImpl.class);
    private final PacketDecoder packetDecoder;
    private final Map<Long, Channel> channels = new ConcurrentHashMap<Long, Channel>();
    private final long blockingCallTimeout;
    private final long blockingCallFailoverTimeout;
    private final List<Interceptor> incomingInterceptors;
    private final List<Interceptor> outgoingInterceptors;
    private volatile boolean destroyed;
    private final boolean client;
    private int channelVersion;
    private volatile SimpleIDGenerator idGenerator;
    private boolean idGeneratorSynced;
    private final Object transferLock;
    private final Object failLock;
    private final SimpleString nodeID;

    @Override
    public void scheduledFlush() {
        this.flush();
    }

    public RemotingConnectionImpl(PacketDecoder packetDecoder, Connection transportConnection, long blockingCallTimeout, long blockingCallFailoverTimeout, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, Executor connectionExecutor) {
        this(packetDecoder, transportConnection, blockingCallTimeout, blockingCallFailoverTimeout, incomingInterceptors, outgoingInterceptors, true, null, connectionExecutor);
    }

    RemotingConnectionImpl(PacketDecoder packetDecoder, Connection transportConnection, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, SimpleString nodeID, Executor connectionExecutor) {
        this(packetDecoder, transportConnection, -1L, -1L, incomingInterceptors, outgoingInterceptors, false, nodeID, connectionExecutor);
    }

    private RemotingConnectionImpl(PacketDecoder packetDecoder, Connection transportConnection, long blockingCallTimeout, long blockingCallFailoverTimeout, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, boolean client, SimpleString nodeID, Executor connectionExecutor) {
        super(transportConnection, connectionExecutor);
        this.idGenerator = new SimpleIDGenerator(ChannelImpl.CHANNEL_ID.USER.id);
        this.idGeneratorSynced = false;
        this.transferLock = new Object();
        this.failLock = new Object();
        this.packetDecoder = packetDecoder;
        this.blockingCallTimeout = blockingCallTimeout;
        this.blockingCallFailoverTimeout = blockingCallFailoverTimeout;
        this.incomingInterceptors = incomingInterceptors;
        this.outgoingInterceptors = outgoingInterceptors;
        this.client = client;
        this.nodeID = nodeID;
        transportConnection.setProtocolConnection(this);
        if (logger.isTraceEnabled()) {
            logger.trace("RemotingConnectionImpl created: " + this);
        }
    }

    public String toString() {
        return "RemotingConnectionImpl [ID=" + this.getID() + ", clientID=" + this.getClientID() + ", nodeID=" + this.nodeID + ", transportConnection=" + this.getTransportConnection() + "]";
    }

    @Override
    public int getChannelVersion() {
        return this.channelVersion;
    }

    @Override
    public void setChannelVersion(int clientVersion) {
        this.channelVersion = clientVersion;
    }

    @Override
    public synchronized Channel getChannel(long channelID, int confWindowSize) {
        Channel channel = this.channels.get(channelID);
        if (channel == null) {
            channel = new ChannelImpl(this, channelID, confWindowSize, this.outgoingInterceptors);
            this.channels.put(channelID, channel);
        }
        return channel;
    }

    @Override
    public synchronized boolean removeChannel(long channelID) {
        return this.channels.remove(channelID) != null;
    }

    @Override
    public synchronized void putChannel(long channelID, Channel channel) {
        this.channels.put(channelID, channel);
    }

    public List<Interceptor> getIncomingInterceptors() {
        return this.incomingInterceptors;
    }

    public List<Interceptor> getOutgoingInterceptors() {
        return this.outgoingInterceptors;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
        Iterator<Channel> iterator = this.failLock;
        synchronized (iterator) {
            if (this.destroyed) {
                return;
            }
            this.destroyed = true;
        }
        if (!(me instanceof ActiveMQRemoteDisconnectException) && !(me instanceof ActiveMQRoutingException)) {
            ActiveMQClientLogger.LOGGER.connectionFailureDetected(this.transportConnection.getRemoteAddress(), me.getMessage(), me.getType());
        }
        try {
            this.transportConnection.forceClose();
        }
        catch (Throwable e) {
            ActiveMQClientLogger.LOGGER.failedForceClose(e);
        }
        this.callFailureListeners(me, scaleDownTargetNodeID);
        this.callClosingListeners();
        this.internalClose();
        for (Channel channel : this.channels.values()) {
            channel.returnBlocking(me);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void destroy() {
        Object object = this.failLock;
        synchronized (object) {
            if (this.destroyed) {
                return;
            }
            this.destroyed = true;
        }
        this.internalClose();
        this.callClosingListeners();
    }

    @Override
    public boolean blockUntilWritable(long timeout) {
        return this.transportConnection.blockUntilWritable(timeout, TimeUnit.MILLISECONDS);
    }

    @Override
    public void disconnect(boolean criticalError) {
        this.disconnect(criticalError ? DisconnectReason.SHUT_DOWN_ON_CRITICAL_ERROR : DisconnectReason.SHUT_DOWN, null, null);
    }

    @Override
    public void disconnect(String scaleDownNodeID, boolean criticalError) {
        this.disconnect(criticalError ? DisconnectReason.SCALE_DOWN_ON_CRITICAL_ERROR : DisconnectReason.SCALE_DOWN, scaleDownNodeID, null);
    }

    @Override
    public void disconnect(DisconnectReason reason, String targetNodeID, TransportConfiguration targetConnector) {
        Channel channel0 = this.getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1);
        HashSet<Channel> allChannels = new HashSet<Channel>(this.channels.values());
        if (!reason.isCriticalError()) {
            this.removeAllChannels();
        } else {
            this.channels.clear();
        }
        if (!reason.isCriticalError()) {
            for (Channel channel : allChannels) {
                channel.flushConfirmations();
            }
        }
        DisconnectMessage disconnect = channel0.supports((byte)-19) ? new DisconnectMessage_V3(this.nodeID, reason, SimpleString.toSimpleString(targetNodeID), targetConnector) : (channel0.supports((byte)124) ? new DisconnectMessage_V2(this.nodeID, reason.isScaleDown() ? targetNodeID : null) : new DisconnectMessage(this.nodeID));
        channel0.sendAndFlush(disconnect);
    }

    @Override
    public long generateChannelID() {
        return this.idGenerator.generateID();
    }

    @Override
    public synchronized void syncIDGeneratorSequence(long id) {
        if (!this.idGeneratorSynced) {
            this.idGenerator = new SimpleIDGenerator(id);
            this.idGeneratorSynced = true;
        }
    }

    @Override
    public long getIDGeneratorSequence() {
        return this.idGenerator.getCurrentID();
    }

    @Override
    public Object getTransferLock() {
        return this.transferLock;
    }

    @Override
    public boolean isClient() {
        return this.client;
    }

    @Override
    public boolean isDestroyed() {
        return this.destroyed;
    }

    @Override
    public long getBlockingCallTimeout() {
        return this.blockingCallTimeout;
    }

    @Override
    public long getBlockingCallFailoverTimeout() {
        return this.blockingCallFailoverTimeout;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void flush() {
        Object object = this.transferLock;
        synchronized (object) {
            for (Channel channel : this.channels.values()) {
                channel.flushConfirmations();
            }
        }
    }

    @Override
    public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
        return this.getTransportConnection().getDefaultActiveMQPrincipal();
    }

    @Override
    public boolean isSupportReconnect() {
        for (Channel channel : this.channels.values()) {
            if (channel.getConfirmationWindowSize() <= 0) continue;
            return true;
        }
        return false;
    }

    @Override
    public String getProtocolName() {
        return "CORE";
    }

    @Override
    public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
        try {
            Packet packet = this.packetDecoder.decode(buffer, (CoreRemotingConnection)this);
            if (logger.isTraceEnabled()) {
                logger.trace("RemotingConnectionID=" + this.getID() + " handling packet " + packet);
            }
            this.dataReceived = true;
            this.doBufferReceived(packet);
            super.bufferReceived(connectionID, buffer);
        }
        catch (Throwable e) {
            ActiveMQClientLogger.LOGGER.errorDecodingPacket(e);
            throw new IllegalStateException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void endOfBatch(Object connectionID) {
        super.endOfBatch(connectionID);
        Object object = this.transferLock;
        synchronized (object) {
            this.channels.forEach((channelID, channel) -> channel.endOfBatch());
        }
    }

    @Override
    public String getTransportLocalAddress() {
        return this.getTransportConnection().getLocalAddress();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doBufferReceived(Packet packet) {
        if (ChannelImpl.invokeInterceptors(packet, this.incomingInterceptors, this) != null) {
            return;
        }
        Object object = this.transferLock;
        synchronized (object) {
            Channel channel = this.channels.get(packet.getChannelID());
            if (channel != null) {
                channel.handlePacket(packet);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void removeAllChannels() {
        Object object = this.transferLock;
        synchronized (object) {
            this.channels.clear();
        }
    }

    private void internalClose() {
        this.getTransportConnection().close();
        for (Channel channel : this.channels.values()) {
            channel.close();
        }
    }

    @Override
    public void killMessage(SimpleString nodeID) {
        if (this.channelVersion < 128) {
            return;
        }
        Channel clientChannel = this.getChannel(1L, -1);
        DisconnectConsumerWithKillMessage response = new DisconnectConsumerWithKillMessage(nodeID);
        clientChannel.send((Packet)response, -1);
    }
}

