package io.moquette.broker;

import io.moquette.broker.Session;
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/moquette/broker/SessionRegistry.class */
public class SessionRegistry {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SessionRegistry.class);
    private final ISubscriptionsDirectory subscriptionsDirectory;
    private final IQueueRepository queueRepository;
    private final Authorizator authorizator;
    private final ConcurrentMap<String, Session> pool = new ConcurrentHashMap();
    private final ConcurrentMap<String, Queue<EnqueuedMessage>> queues = new ConcurrentHashMap();

    /* loaded from: input_file:io/moquette/broker/SessionRegistry$CreationModeEnum.class */
    public enum CreationModeEnum {
        CREATED_CLEAN_NEW,
        REOPEN_EXISTING,
        DROP_EXISTING
    }

    /* loaded from: input_file:io/moquette/broker/SessionRegistry$EnqueuedMessage.class */
    public static abstract class EnqueuedMessage implements Serializable {
        public void release() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/moquette/broker/SessionRegistry$PubRelMarker.class */
    public static final class PubRelMarker extends EnqueuedMessage {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/moquette/broker/SessionRegistry$PublishedMessage.class */
    public static class PublishedMessage extends EnqueuedMessage {
        Topic topic;
        MqttQoS publishingQos;
        transient ByteBuf payload;

        public PublishedMessage() {
            this.topic = null;
            this.publishingQos = null;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public PublishedMessage(Topic topic, MqttQoS mqttQoS, ByteBuf byteBuf) {
            this.topic = topic;
            this.publishingQos = mqttQoS;
            this.payload = byteBuf;
        }

        private void writeObject(ObjectOutputStream objectOutputStream) throws IOException {
            objectOutputStream.defaultWriteObject();
            byte[] bArr = new byte[this.payload.readableBytes()];
            ByteBuf copy = this.payload.copy();
            copy.readBytes(bArr);
            copy.release();
            objectOutputStream.writeInt(bArr.length);
            objectOutputStream.write(bArr);
        }

        private void readObject(ObjectInputStream objectInputStream) throws ClassNotFoundException, IOException {
            objectInputStream.defaultReadObject();
            byte[] bArr = new byte[objectInputStream.readInt()];
            objectInputStream.read(bArr);
            this.payload = Unpooled.wrappedBuffer(bArr);
        }

        @Override // io.moquette.broker.SessionRegistry.EnqueuedMessage
        public void release() {
            this.payload.release();
        }
    }

    /* loaded from: input_file:io/moquette/broker/SessionRegistry$SessionCreationResult.class */
    public static class SessionCreationResult {
        final Session session;
        final CreationModeEnum mode;
        final boolean alreadyStored;

        public SessionCreationResult(Session session, CreationModeEnum creationModeEnum, boolean z) {
            this.session = session;
            this.mode = creationModeEnum;
            this.alreadyStored = z;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SessionRegistry(ISubscriptionsDirectory iSubscriptionsDirectory, IQueueRepository iQueueRepository, Authorizator authorizator) {
        this.subscriptionsDirectory = iSubscriptionsDirectory;
        this.queueRepository = iQueueRepository;
        this.authorizator = authorizator;
        reloadPersistentQueues();
        recreateSessionPool();
    }

    private void reloadPersistentQueues() {
        Map<String, Queue<EnqueuedMessage>> listAllQueues = this.queueRepository.listAllQueues();
        ConcurrentMap<String, Queue<EnqueuedMessage>> concurrentMap = this.queues;
        Objects.requireNonNull(concurrentMap);
        listAllQueues.forEach((v1, v2) -> {
            r1.put(v1, v2);
        });
    }

    private void recreateSessionPool() {
        for (String str : this.subscriptionsDirectory.listAllSessionIds()) {
            Queue<EnqueuedMessage> queue = this.queues.get(str);
            if (queue != null) {
                this.pool.put(str, new Session(str, false, queue));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SessionCreationResult createOrReopenSession(MqttConnectMessage mqttConnectMessage, String str, String str2) {
        SessionCreationResult reopenExistingSession;
        Session createNewSession = createNewSession(mqttConnectMessage, str);
        Session session = this.pool.get(str);
        if (session == null) {
            reopenExistingSession = new SessionCreationResult(createNewSession, CreationModeEnum.CREATED_CLEAN_NEW, false);
            Session putIfAbsent = this.pool.putIfAbsent(str, createNewSession);
            if (putIfAbsent == null) {
                LOG.trace("case 1, not existing session with CId {}", str);
            } else {
                reopenExistingSession = reopenExistingSession(mqttConnectMessage, str, putIfAbsent, createNewSession, str2);
            }
        } else {
            reopenExistingSession = reopenExistingSession(mqttConnectMessage, str, session, createNewSession, str2);
        }
        return reopenExistingSession;
    }

    private SessionCreationResult reopenExistingSession(MqttConnectMessage mqttConnectMessage, String str, Session session, Session session2, String str2) {
        SessionCreationResult sessionCreationResult;
        boolean isCleanSession = mqttConnectMessage.variableHeader().isCleanSession();
        if (!session.disconnected()) {
            LOG.trace("case 4, oldSession with same CId {} still connected, force to close", str);
            session.closeImmediately();
            sessionCreationResult = new SessionCreationResult(session2, CreationModeEnum.DROP_EXISTING, true);
        } else if (isCleanSession) {
            if (!session.assignState(Session.SessionStatus.DISCONNECTED, Session.SessionStatus.CONNECTING)) {
                throw new SessionCorruptedException("old session was already changed state");
            }
            dropQueuesForClient(str);
            unsubscribe(session);
            copySessionConfig(mqttConnectMessage, session);
            LOG.trace("case 2, oldSession with same CId {} disconnected", str);
            sessionCreationResult = new SessionCreationResult(session, CreationModeEnum.CREATED_CLEAN_NEW, true);
        } else {
            if (!session.assignState(Session.SessionStatus.DISCONNECTED, Session.SessionStatus.CONNECTING)) {
                throw new SessionCorruptedException("old session moved in connected state by other thread");
            }
            reactivateSubscriptions(session, str2);
            LOG.trace("case 3, oldSession with same CId {} disconnected", str);
            sessionCreationResult = new SessionCreationResult(session, CreationModeEnum.REOPEN_EXISTING, true);
        }
        if (sessionCreationResult.mode == CreationModeEnum.DROP_EXISTING) {
            LOG.debug("Drop session of already connected client with same id");
            if (!this.pool.replace(str, session, session2)) {
                this.pool.put(str, session2);
            }
        } else {
            LOG.debug("Replace session of client with same id");
            if (!this.pool.replace(str, session, session)) {
                throw new SessionCorruptedException("old session was already removed");
            }
        }
        return sessionCreationResult;
    }

    private void reactivateSubscriptions(Session session, String str) {
        for (Subscription subscription : session.getSubscriptions()) {
            if (!this.authorizator.canRead(subscription.getTopicFilter(), str, session.getClientID())) {
                this.subscriptionsDirectory.removeSubscription(subscription.getTopicFilter(), session.getClientID());
            }
        }
    }

    private void unsubscribe(Session session) {
        Iterator<Subscription> it = session.getSubscriptions().iterator();
        while (it.hasNext()) {
            this.subscriptionsDirectory.removeSubscription(it.next().getTopicFilter(), session.getClientID());
        }
    }

    private Session createNewSession(MqttConnectMessage mqttConnectMessage, String str) {
        boolean isCleanSession = mqttConnectMessage.variableHeader().isCleanSession();
        Queue<EnqueuedMessage> computeIfAbsent = this.queues.computeIfAbsent(str, str2 -> {
            return this.queueRepository.createQueue(str2, isCleanSession);
        });
        Session session = mqttConnectMessage.variableHeader().isWillFlag() ? new Session(str, isCleanSession, createWill(mqttConnectMessage), computeIfAbsent) : new Session(str, isCleanSession, computeIfAbsent);
        session.markConnecting();
        return session;
    }

    private void copySessionConfig(MqttConnectMessage mqttConnectMessage, Session session) {
        session.update(mqttConnectMessage.variableHeader().isCleanSession(), mqttConnectMessage.variableHeader().isWillFlag() ? createWill(mqttConnectMessage) : null);
    }

    private Session.Will createWill(MqttConnectMessage mqttConnectMessage) {
        ByteBuf copiedBuffer = Unpooled.copiedBuffer(mqttConnectMessage.payload().willMessageInBytes());
        return new Session.Will(mqttConnectMessage.payload().willTopic(), copiedBuffer, MqttQoS.valueOf(mqttConnectMessage.variableHeader().willQos()), mqttConnectMessage.variableHeader().isWillRetain());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Session retrieve(String str) {
        return this.pool.get(str);
    }

    public void remove(Session session) {
        this.pool.remove(session.getClientID(), session);
    }

    private void dropQueuesForClient(String str) {
        this.queues.remove(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<ClientDescriptor> listConnectedClients() {
        return (Collection) this.pool.values().stream().filter((v0) -> {
            return v0.connected();
        }).map(this::createClientDescriptor).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.toList());
    }

    private Optional<ClientDescriptor> createClientDescriptor(Session session) {
        String clientID = session.getClientID();
        return session.remoteAddress().map(inetSocketAddress -> {
            return new ClientDescriptor(clientID, inetSocketAddress.getHostString(), inetSocketAddress.getPort());
        });
    }
}
