package io.moquette.broker;

import io.moquette.broker.SessionRegistry;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/moquette/broker/Session.class */
public class Session {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Session.class);
    private static final int FLIGHT_BEFORE_RESEND_MS = 5000;
    private static final int INFLIGHT_WINDOW_SIZE = 10;
    private final String clientId;
    private boolean clean;
    private Will will;
    private final Queue<SessionRegistry.EnqueuedMessage> sessionQueue;
    private final AtomicReference<SessionStatus> status;
    private MQTTConnection mqttConnection;
    private List<Subscription> subscriptions;
    private final Map<Integer, SessionRegistry.EnqueuedMessage> inflightWindow;
    private final DelayQueue<InFlightPacket> inflightTimeouts;
    private final Map<Integer, MqttPublishMessage> qos2Receiving;
    private final AtomicInteger inflightSlots;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/moquette/broker/Session$InFlightPacket.class */
    public static class InFlightPacket implements Delayed {
        final int packetId;
        private long startTime;

        InFlightPacket(int i, long j) {
            this.packetId = i;
            this.startTime = System.currentTimeMillis() + j;
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(TimeUnit timeUnit) {
            return timeUnit.convert(this.startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override // java.lang.Comparable
        public int compareTo(Delayed delayed) {
            if (this.startTime - ((InFlightPacket) delayed).startTime == 0) {
                return 0;
            }
            return this.startTime - ((InFlightPacket) delayed).startTime > 0 ? 1 : -1;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/moquette/broker/Session$SessionStatus.class */
    public enum SessionStatus {
        CONNECTED,
        CONNECTING,
        DISCONNECTING,
        DISCONNECTED
    }

    /* loaded from: input_file:io/moquette/broker/Session$Will.class */
    static final class Will {
        final String topic;
        final ByteBuf payload;
        final MqttQoS qos;
        final boolean retained;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Will(String str, ByteBuf byteBuf, MqttQoS mqttQoS, boolean z) {
            this.topic = str;
            this.payload = byteBuf;
            this.qos = mqttQoS;
            this.retained = z;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Session(String str, boolean z, Will will, Queue<SessionRegistry.EnqueuedMessage> queue) {
        this(str, z, queue);
        this.will = will;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Session(String str, boolean z, Queue<SessionRegistry.EnqueuedMessage> queue) {
        this.status = new AtomicReference<>(SessionStatus.DISCONNECTED);
        this.subscriptions = new ArrayList();
        this.inflightWindow = new HashMap();
        this.inflightTimeouts = new DelayQueue<>();
        this.qos2Receiving = new HashMap();
        this.inflightSlots = new AtomicInteger(10);
        if (queue == null) {
            throw new IllegalArgumentException("sessionQueue parameter can't be null");
        }
        this.clientId = str;
        this.clean = z;
        this.sessionQueue = queue;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void update(boolean z, Will will) {
        this.clean = z;
        this.will = will;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void markConnecting() {
        assignState(SessionStatus.DISCONNECTED, SessionStatus.CONNECTING);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean completeConnection() {
        return assignState(SessionStatus.CONNECTING, SessionStatus.CONNECTED);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void bind(MQTTConnection mQTTConnection) {
        this.mqttConnection = mQTTConnection;
    }

    public boolean disconnected() {
        return this.status.get() == SessionStatus.DISCONNECTED;
    }

    public boolean connected() {
        return this.status.get() == SessionStatus.CONNECTED;
    }

    public String getClientID() {
        return this.clientId;
    }

    public List<Subscription> getSubscriptions() {
        return new ArrayList(this.subscriptions);
    }

    public void addSubscriptions(List<Subscription> list) {
        this.subscriptions.addAll(list);
    }

    public boolean hasWill() {
        return this.will != null;
    }

    public Will getWill() {
        return this.will;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean assignState(SessionStatus sessionStatus, SessionStatus sessionStatus2) {
        return this.status.compareAndSet(sessionStatus, sessionStatus2);
    }

    public void closeImmediately() {
        this.mqttConnection.dropConnection();
    }

    public void disconnect() {
        if (assignState(SessionStatus.CONNECTED, SessionStatus.DISCONNECTING)) {
            this.mqttConnection = null;
            this.will = null;
            assignState(SessionStatus.DISCONNECTING, SessionStatus.DISCONNECTED);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isClean() {
        return this.clean;
    }

    public void processPubRec(int i) {
        SessionRegistry.EnqueuedMessage remove = this.inflightWindow.remove(Integer.valueOf(i));
        if (remove != null) {
            remove.release();
            this.inflightSlots.incrementAndGet();
        }
        this.inflightSlots.decrementAndGet();
        this.inflightWindow.put(Integer.valueOf(i), new SessionRegistry.PubRelMarker());
        this.inflightTimeouts.add((DelayQueue<InFlightPacket>) new InFlightPacket(i, 5000L));
        this.mqttConnection.sendIfWritableElseDrop(MQTTConnection.pubrel(i));
        drainQueueToConnection();
    }

    public void processPubComp(int i) {
        SessionRegistry.EnqueuedMessage remove = this.inflightWindow.remove(Integer.valueOf(i));
        if (remove != null) {
            remove.release();
            this.inflightSlots.incrementAndGet();
        }
        drainQueueToConnection();
    }

    public void sendPublishOnSessionAtQos(Topic topic, MqttQoS mqttQoS, ByteBuf byteBuf) {
        switch (mqttQoS) {
            case AT_MOST_ONCE:
                if (connected()) {
                    this.mqttConnection.sendPublishNotRetainedQos0(topic, mqttQoS, byteBuf);
                    return;
                } else {
                    byteBuf.release();
                    return;
                }
            case AT_LEAST_ONCE:
                sendPublishQos1(topic, mqttQoS, byteBuf);
                return;
            case EXACTLY_ONCE:
                sendPublishQos2(topic, mqttQoS, byteBuf);
                return;
            case FAILURE:
                LOG.error("Not admissible");
                byteBuf.release();
                return;
            default:
                return;
        }
    }

    private void sendPublishQos1(Topic topic, MqttQoS mqttQoS, ByteBuf byteBuf) {
        if (!connected() && isClean()) {
            byteBuf.release();
            return;
        }
        if (!canSkipQueue()) {
            this.sessionQueue.add(new SessionRegistry.PublishedMessage(topic, mqttQoS, byteBuf));
            return;
        }
        this.inflightSlots.decrementAndGet();
        int nextPacketId = this.mqttConnection.nextPacketId();
        this.inflightWindow.put(Integer.valueOf(nextPacketId), new SessionRegistry.PublishedMessage(topic, mqttQoS, byteBuf));
        this.inflightTimeouts.add((DelayQueue<InFlightPacket>) new InFlightPacket(nextPacketId, 5000L));
        byteBuf.retain();
        this.mqttConnection.sendPublish(MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), mqttQoS, byteBuf, nextPacketId));
    }

    private void sendPublishQos2(Topic topic, MqttQoS mqttQoS, ByteBuf byteBuf) {
        if (!canSkipQueue()) {
            this.sessionQueue.add(new SessionRegistry.PublishedMessage(topic, mqttQoS, byteBuf));
            return;
        }
        this.inflightSlots.decrementAndGet();
        int nextPacketId = this.mqttConnection.nextPacketId();
        this.inflightWindow.put(Integer.valueOf(nextPacketId), new SessionRegistry.PublishedMessage(topic, mqttQoS, byteBuf));
        this.inflightTimeouts.add((DelayQueue<InFlightPacket>) new InFlightPacket(nextPacketId, 5000L));
        byteBuf.retain();
        this.mqttConnection.sendPublish(MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), mqttQoS, byteBuf, nextPacketId));
        drainQueueToConnection();
    }

    private boolean canSkipQueue() {
        return this.sessionQueue.isEmpty() && this.inflightSlots.get() > 0 && connected() && this.mqttConnection.channel.isWritable();
    }

    private boolean inflighHasSlotsAndConnectionIsUp() {
        return this.inflightSlots.get() > 0 && connected() && this.mqttConnection.channel.isWritable();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void pubAckReceived(int i) {
        SessionRegistry.EnqueuedMessage remove = this.inflightWindow.remove(Integer.valueOf(i));
        if (remove != null) {
            remove.release();
            this.inflightSlots.incrementAndGet();
        }
        drainQueueToConnection();
    }

    public void flushAllQueuedMessages() {
        drainQueueToConnection();
    }

    public void resendInflightNotAcked() {
        ArrayList arrayList = new ArrayList(10);
        this.inflightTimeouts.drainTo(arrayList);
        debugLogPacketIds(arrayList);
        for (InFlightPacket inFlightPacket : arrayList) {
            SessionRegistry.EnqueuedMessage enqueuedMessage = this.inflightWindow.get(Integer.valueOf(inFlightPacket.packetId));
            if (enqueuedMessage != null) {
                if (enqueuedMessage instanceof SessionRegistry.PubRelMarker) {
                    MqttMessage pubrel = MQTTConnection.pubrel(inFlightPacket.packetId);
                    this.inflightTimeouts.add((DelayQueue<InFlightPacket>) new InFlightPacket(inFlightPacket.packetId, 5000L));
                    this.mqttConnection.sendIfWritableElseDrop(pubrel);
                } else {
                    SessionRegistry.PublishedMessage publishedMessage = (SessionRegistry.PublishedMessage) enqueuedMessage;
                    MqttPublishMessage publishNotRetainedDuplicated = publishNotRetainedDuplicated(inFlightPacket, publishedMessage.topic, publishedMessage.publishingQos, publishedMessage.payload.retainedDuplicate());
                    this.inflightTimeouts.add((DelayQueue<InFlightPacket>) new InFlightPacket(inFlightPacket.packetId, 5000L));
                    this.mqttConnection.sendPublish(publishNotRetainedDuplicated);
                }
            }
        }
    }

    private void debugLogPacketIds(Collection<InFlightPacket> collection) {
        if (!LOG.isDebugEnabled() || collection.isEmpty()) {
            return;
        }
        StringBuilder sb = new StringBuilder();
        Iterator<InFlightPacket> it = collection.iterator();
        while (it.hasNext()) {
            sb.append(it.next().packetId).append(", ");
        }
        LOG.debug("Resending {} in flight packets [{}]", Integer.valueOf(collection.size()), sb);
    }

    private MqttPublishMessage publishNotRetainedDuplicated(InFlightPacket inFlightPacket, Topic topic, MqttQoS mqttQoS, ByteBuf byteBuf) {
        return new MqttPublishMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, true, mqttQoS, false, 0), new MqttPublishVariableHeader(topic.toString(), inFlightPacket.packetId), byteBuf);
    }

    private void drainQueueToConnection() {
        SessionRegistry.EnqueuedMessage poll;
        while (!this.sessionQueue.isEmpty() && inflighHasSlotsAndConnectionIsUp() && (poll = this.sessionQueue.poll()) != null) {
            int nextPacketId = this.mqttConnection.nextPacketId();
            this.inflightSlots.decrementAndGet();
            this.inflightWindow.put(Integer.valueOf(nextPacketId), poll);
            this.inflightTimeouts.add((DelayQueue<InFlightPacket>) new InFlightPacket(nextPacketId, 5000L));
            if (poll instanceof SessionRegistry.PubRelMarker) {
                this.mqttConnection.sendIfWritableElseDrop(MQTTConnection.pubrel(nextPacketId));
            } else {
                SessionRegistry.PublishedMessage publishedMessage = (SessionRegistry.PublishedMessage) poll;
                publishedMessage.payload.retain();
                this.mqttConnection.sendPublish(MQTTConnection.notRetainedPublishWithMessageId(publishedMessage.topic.toString(), publishedMessage.publishingQos, publishedMessage.payload, nextPacketId));
            }
        }
    }

    public void writabilityChanged() {
        drainQueueToConnection();
    }

    public void sendQueuedMessagesWhileOffline() {
        LOG.trace("Republishing all saved messages for session {}", this);
        drainQueueToConnection();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendRetainedPublishOnSessionAtQos(Topic topic, MqttQoS mqttQoS, ByteBuf byteBuf) {
        if (mqttQoS != MqttQoS.AT_MOST_ONCE) {
            this.mqttConnection.sendPublishRetainedWithPacketId(topic, mqttQoS, byteBuf);
        } else {
            this.mqttConnection.sendPublishRetainedQos0(topic, mqttQoS, byteBuf);
        }
    }

    public void receivedPublishQos2(int i, MqttPublishMessage mqttPublishMessage) {
        this.qos2Receiving.put(Integer.valueOf(i), mqttPublishMessage);
        mqttPublishMessage.retain();
        this.mqttConnection.sendPublishReceived(i);
    }

    public void receivedPubRelQos2(int i) {
        ReferenceCountUtil.release(this.qos2Receiving.remove(Integer.valueOf(i)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<InetSocketAddress> remoteAddress() {
        return connected() ? Optional.of(this.mqttConnection.remoteAddress()) : Optional.empty();
    }

    public String toString() {
        return "Session{clientId='" + this.clientId + "', clean=" + this.clean + ", status=" + this.status + ", inflightSlots=" + this.inflightSlots + '}';
    }
}
