package io.smallrye.reactive.messaging.mqtt;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.mqtt.Clients;
import io.smallrye.reactive.messaging.mqtt.MqttFailureHandler;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttExceptions;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttLogging;
import io.smallrye.reactive.messaging.mqtt.internal.MqttHelpers;
import io.smallrye.reactive.messaging.mqtt.internal.MqttTopicHelper;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.smallrye.reactive.messaging.mqtt.session.RequestedQoS;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.inject.Instance;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;

/* loaded from: input_file:io/smallrye/reactive/messaging/mqtt/MqttSource.class */
public class MqttSource {
    private final Flow.Publisher<ReceivingMqttMessage> source;
    private final String channel;
    private final Pattern pattern;
    private final boolean healthEnabled;
    private final Clients.ClientHolder holder;
    private final AtomicBoolean ready = new AtomicBoolean();
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicBoolean alive = new AtomicBoolean();

    public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration mqttConnectorIncomingConfiguration, Instance<MqttClientSessionOptions> instance) {
        MqttClientSessionOptions createClientOptions = MqttHelpers.createClientOptions(mqttConnectorIncomingConfiguration, instance);
        this.channel = mqttConnectorIncomingConfiguration.getChannel();
        String orElse = mqttConnectorIncomingConfiguration.getTopic().orElse(this.channel);
        int intValue = mqttConnectorIncomingConfiguration.getQos().intValue();
        boolean booleanValue = mqttConnectorIncomingConfiguration.getBroadcast().booleanValue();
        this.healthEnabled = mqttConnectorIncomingConfiguration.getHealthEnabled().booleanValue();
        MqttFailureHandler createFailureHandler = createFailureHandler(MqttFailureHandler.Strategy.from(mqttConnectorIncomingConfiguration.getFailureStrategy()), mqttConnectorIncomingConfiguration.getChannel());
        if (orElse.contains("#") || orElse.contains("+")) {
            this.pattern = Pattern.compile(MqttTopicHelper.escapeTopicSpecialWord(MqttHelpers.rebuildMatchesWithSharedSubscription(orElse)).replace("+", "[^/]+").replace("#", ".+"));
        } else {
            this.pattern = null;
        }
        this.holder = Clients.getHolder(vertx, createClientOptions);
        this.holder.start().onSuccess(r4 -> {
            this.started.set(true);
        });
        this.holder.getClient().subscribe(orElse, RequestedQoS.valueOf(Integer.valueOf(intValue))).onFailure(th -> {
            MqttLogging.log.info("Subscription failed!");
        }).onSuccess(num -> {
            MqttLogging.log.info("Subscription success on topic " + orElse + ", Max QoS " + num + ".");
            this.alive.set(true);
        });
        this.source = ((Multi) this.holder.stream().select().where(mqttPublishMessage -> {
            return MqttTopicHelper.matches(orElse, this.pattern, mqttPublishMessage);
        }).onItem().transform(mqttPublishMessage2 -> {
            return new ReceivingMqttMessage(mqttPublishMessage2, createFailureHandler);
        }).stage(multi -> {
            return booleanValue ? multi.broadcast().toAllSubscribers() : multi;
        })).onOverflow().buffer(mqttConnectorIncomingConfiguration.getBufferSize().intValue()).onCancellation().call(() -> {
            this.alive.set(false);
            return mqttConnectorIncomingConfiguration.getUnsubscribeOnDisconnection().booleanValue() ? Uni.createFrom().completionStage(this.holder.getClient().unsubscribe(orElse).toCompletionStage()) : Uni.createFrom().voidItem();
        }).onFailure().invoke(th2 -> {
            this.alive.set(false);
            MqttLogging.log.unableToConnectToBroker(th2);
        });
    }

    private MqttFailureHandler createFailureHandler(MqttFailureHandler.Strategy strategy, String str) {
        switch (strategy) {
            case IGNORE:
                return new MqttIgnoreFailure(str);
            case FAIL:
                return new MqttFailStop(str);
            default:
                throw MqttExceptions.ex.illegalArgumentUnknownStrategy(strategy.toString());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flow.Publisher<ReceivingMqttMessage> getSource() {
        return this.source;
    }

    public void isStarted(HealthReport.HealthReportBuilder healthReportBuilder) {
        if (this.healthEnabled) {
            healthReportBuilder.add(this.channel, this.started.get());
        }
    }

    public void isReady(HealthReport.HealthReportBuilder healthReportBuilder) {
        if (this.healthEnabled) {
            healthReportBuilder.add(this.channel, this.holder.getClient().isConnected());
        }
    }

    public void isAlive(HealthReport.HealthReportBuilder healthReportBuilder) {
        if (this.healthEnabled) {
            healthReportBuilder.add(this.channel, this.alive.get());
        }
    }
}
