package io.smallrye.reactive.messaging.rabbitmq;

import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.impl.CredentialsProvider;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniOnFailure;
import io.smallrye.mutiny.groups.UniSubscribe;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAck;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAckHandler;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAutoAck;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging;
import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQTrace;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
import io.vertx.mutiny.rabbitmq.RabbitMQConsumer;
import io.vertx.mutiny.rabbitmq.RabbitMQPublisher;
import io.vertx.rabbitmq.QueueOptions;
import io.vertx.rabbitmq.RabbitMQOptions;
import io.vertx.rabbitmq.RabbitMQPublisherOptions;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.BeforeDestroyed;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.event.Reception;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

@ApplicationScoped
@Connector(RabbitMQConnector.CONNECTOR_NAME)
@ConnectorAttributes({@ConnectorAttribute(name = "username", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The username used to authenticate to the broker", type = "string", alias = "rabbitmq-username"), @ConnectorAttribute(name = "password", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The password used to authenticate to the broker", type = "string", alias = "rabbitmq-password"), @ConnectorAttribute(name = "host", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The broker hostname", type = "string", alias = "rabbitmq-host", defaultValue = "localhost"), @ConnectorAttribute(name = "port", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The broker port", type = "int", alias = "rabbitmq-port", defaultValue = "5672"), @ConnectorAttribute(name = "ssl", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether or not the connection should use SSL", type = "boolean", alias = "rabbitmq-ssl", defaultValue = "false"), @ConnectorAttribute(name = "trust-all", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether to skip trust certificate verification", type = "boolean", alias = "rabbitmq-trust-all", defaultValue = "false"), @ConnectorAttribute(name = "trust-store-path", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The path to a JKS trust store", type = "string", alias = "rabbitmq-trust-store-path"), @ConnectorAttribute(name = "trust-store-password", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The password of the JKS trust store", type = "string", alias = "rabbitmq-trust-store-password"), @ConnectorAttribute(name = "connection-timeout", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The TCP connection timeout (ms); 0 is interpreted as no timeout", type = "int", defaultValue = "60000"), @ConnectorAttribute(name = "handshake-timeout", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The AMQP 0-9-1 protocol handshake timeout (ms)", type = "int", defaultValue = "10000"), @ConnectorAttribute(name = "automatic-recovery-enabled", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether automatic connection recovery is enabled", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "automatic-recovery-on-initial-connection", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether automatic recovery on initial connections is enabled", type = "boolean", defaultValue = "true"), @ConnectorAttribute(name = "reconnect-attempts", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The number of reconnection attempts", type = "int", alias = "rabbitmq-reconnect-attempts", defaultValue = "100"), @ConnectorAttribute(name = "reconnect-interval", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The interval (in seconds) between two reconnection attempts", type = "int", alias = "rabbitmq-reconnect-interval", defaultValue = "10"), @ConnectorAttribute(name = "network-recovery-interval", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "How long (ms) will automatic recovery wait before attempting to reconnect", type = "int", defaultValue = "5000"), @ConnectorAttribute(name = "user", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The user name to use when connecting to the broker", type = "string", defaultValue = "guest"), @ConnectorAttribute(name = "include-properties", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether to include properties when a broker message is passed on the event bus", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "requested-channel-max", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The initially requested maximum channel number", type = "int", defaultValue = "2047"), @ConnectorAttribute(name = "requested-heartbeat", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The initially requested heartbeat interval (seconds), zero for none", type = "int", defaultValue = "60"), @ConnectorAttribute(name = "use-nio", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether usage of NIO Sockets is enabled", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "virtual-host", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The virtual host to use when connecting to the broker", type = "string", defaultValue = "/", alias = "rabbitmq-virtual-host"), @ConnectorAttribute(name = "client-options-name", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The name of the RabbitMQ Client Option bean used to customize the RabbitMQ client configuration", type = "string", alias = "rabbitmq-client-options-name"), @ConnectorAttribute(name = "credentials-provider-name", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The name of the RabbitMQ Credentials Provider bean used to provide dynamic credentials to the RabbitMQ client", type = "string", alias = "rabbitmq-credentials-provider-name"), @ConnectorAttribute(name = "exchange.name", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The exchange that messages are published to or consumed from. If not set, the channel name is used. If set to \"\", the default exchange is used.", type = "string"), @ConnectorAttribute(name = "exchange.durable", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether the exchange is durable", type = "boolean", defaultValue = "true"), @ConnectorAttribute(name = "exchange.auto-delete", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether the exchange should be deleted after use", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "exchange.type", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The exchange type: direct, fanout, headers or topic (default)", type = "string", defaultValue = "topic"), @ConnectorAttribute(name = "exchange.declare", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether to declare the exchange; set to false if the exchange is expected to be set up independently", type = "boolean", defaultValue = "true"), @ConnectorAttribute(name = "queue.name", direction = ConnectorAttribute.Direction.INCOMING, description = "The queue from which messages are consumed.", type = "string", mandatory = true), @ConnectorAttribute(name = "queue.durable", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the queue is durable", type = "boolean", defaultValue = "true"), @ConnectorAttribute(name = "queue.exclusive", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the queue is for exclusive use", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "queue.auto-delete", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the queue should be deleted after use", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "queue.declare", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether to declare the queue and binding; set to false if these are expected to be set up independently", type = "boolean", defaultValue = "true"), @ConnectorAttribute(name = "queue.ttl", direction = ConnectorAttribute.Direction.INCOMING, description = "If specified, the time (ms) for which a message can remain in the queue undelivered before it is dead", type = "long"), @ConnectorAttribute(name = "queue.single-active-consumer", direction = ConnectorAttribute.Direction.INCOMING, description = "If set to true, only one consumer can actively consume messages", type = "boolean"), @ConnectorAttribute(name = "queue.x-queue-type", direction = ConnectorAttribute.Direction.INCOMING, description = "If automatically declare queue, we can choose different types of queue [quorum, classic, stream]", type = "string"), @ConnectorAttribute(name = "queue.x-queue-mode", direction = ConnectorAttribute.Direction.INCOMING, description = "If automatically declare queue, we can choose different modes of queue [lazy, default]", type = "string"), @ConnectorAttribute(name = "max-outgoing-internal-queue-size", direction = ConnectorAttribute.Direction.OUTGOING, description = "The maximum size of the outgoing internal queue", type = "int"), @ConnectorAttribute(name = "max-incoming-internal-queue-size", direction = ConnectorAttribute.Direction.INCOMING, description = "The maximum size of the incoming internal queue", type = "int", defaultValue = "500000"), @ConnectorAttribute(name = "connection-count", direction = ConnectorAttribute.Direction.INCOMING, description = "The number of RabbitMQ connections to create for consuming from this queue. This might be necessary to consume from a sharded queue with a single client.", type = "int", defaultValue = "1"), @ConnectorAttribute(name = "queue.x-max-priority", direction = ConnectorAttribute.Direction.INCOMING, description = "Define priority level queue consumer", type = "int"), @ConnectorAttribute(name = "queue.x-delivery-limit", direction = ConnectorAttribute.Direction.INCOMING, description = "If queue.x-queue-type is quorum, when a message has been returned more times than the limit the message will be dropped or dead-lettered", type = "long"), @ConnectorAttribute(name = "auto-bind-dlq", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether to automatically declare the DLQ and bind it to the binder DLX", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "dead-letter-queue-name", direction = ConnectorAttribute.Direction.INCOMING, description = "The name of the DLQ; if not supplied will default to the queue name with '.dlq' appended", type = "string"), @ConnectorAttribute(name = "dead-letter-exchange", direction = ConnectorAttribute.Direction.INCOMING, description = "A DLX to assign to the queue. Relevant only if auto-bind-dlq is true", type = "string", defaultValue = "DLX"), @ConnectorAttribute(name = "dead-letter-exchange-type", direction = ConnectorAttribute.Direction.INCOMING, description = "The type of the DLX to assign to the queue. Relevant only if auto-bind-dlq is true", type = "string", defaultValue = "direct"), @ConnectorAttribute(name = "dead-letter-routing-key", direction = ConnectorAttribute.Direction.INCOMING, description = "A dead letter routing key to assign to the queue; if not supplied will default to the queue name", type = "string"), @ConnectorAttribute(name = "dlx.declare", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether to declare the dead letter exchange binding. Relevant only if auto-bind-dlq is true; set to false if these are expected to be set up independently", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "dead-letter-queue-type", direction = ConnectorAttribute.Direction.INCOMING, description = "If automatically declare DLQ, we can choose different types of DLQ [quorum, classic, stream]", type = "string"), @ConnectorAttribute(name = "dead-letter-queue-mode", direction = ConnectorAttribute.Direction.INCOMING, description = "If automatically declare DLQ, we can choose different modes of DLQ [lazy, default]", type = "string"), @ConnectorAttribute(name = "dead-letter-ttl", direction = ConnectorAttribute.Direction.INCOMING, description = "If specified, the time (ms) for which a message can remain in DLQ undelivered before it is dead. Relevant only if auto-bind-dlq is true", type = "long"), @ConnectorAttribute(name = "dead-letter-dlx", direction = ConnectorAttribute.Direction.INCOMING, description = "If specified, a DLX to assign to the DLQ. Relevant only if auto-bind-dlq is true", type = "string"), @ConnectorAttribute(name = "dead-letter-dlx-routing-key", direction = ConnectorAttribute.Direction.INCOMING, description = "If specified, a dead letter routing key to assign to the DLQ. Relevant only if auto-bind-dlq is true", type = "string"), @ConnectorAttribute(name = "failure-strategy", direction = ConnectorAttribute.Direction.INCOMING, description = "The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are `fail`, `accept`, `reject` (default) or name of a bean", type = "string", defaultValue = RabbitMQFailureHandler.Strategy.REJECT), @ConnectorAttribute(name = "broadcast", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the received RabbitMQ messages must be dispatched to multiple _subscribers_", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "auto-acknowledgement", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the received RabbitMQ messages must be acknowledged when received; if true then delivery constitutes acknowledgement", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "keep-most-recent", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether to discard old messages instead of recent ones", type = "boolean", defaultValue = "false"), @ConnectorAttribute(name = "routing-keys", direction = ConnectorAttribute.Direction.INCOMING, description = "A comma-separated list of routing keys to bind the queue to the exchange. Relevant only if 'exchange.type' is topic or direct", type = "string", defaultValue = "#"), @ConnectorAttribute(name = "arguments", direction = ConnectorAttribute.Direction.INCOMING, description = "A comma-separated list of arguments [key1:value1,key2:value2,...] to bind the queue to the exchange. Relevant only if 'exchange.type' is headers", type = "string"), @ConnectorAttribute(name = "content-type-override", direction = ConnectorAttribute.Direction.INCOMING, description = "Override the content_type attribute of the incoming message, should be a valid MINE type", type = "string"), @ConnectorAttribute(name = "max-outstanding-messages", direction = ConnectorAttribute.Direction.INCOMING, description = "The maximum number of outstanding/unacknowledged messages being processed by the connector at a time; must be a positive number", type = "int"), @ConnectorAttribute(name = "max-inflight-messages", direction = ConnectorAttribute.Direction.OUTGOING, description = "The maximum number of messages to be written to RabbitMQ concurrently; must be a positive number", type = "long", defaultValue = "1024"), @ConnectorAttribute(name = "default-routing-key", direction = ConnectorAttribute.Direction.OUTGOING, description = "The default routing key to use when sending messages to the exchange", type = "string", defaultValue = ""), @ConnectorAttribute(name = "default-ttl", direction = ConnectorAttribute.Direction.OUTGOING, description = "If specified, the time (ms) sent messages can remain in queues undelivered before they are dead", type = "long"), @ConnectorAttribute(name = "tracing.enabled", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", type = "boolean", defaultValue = "true"), @ConnectorAttribute(name = "tracing.attribute-headers", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "A comma-separated list of headers that should be recorded as span attributes. Relevant only if tracing.enabled=true", type = "string", defaultValue = "")})
/* loaded from: input_file:io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.class */
public class RabbitMQConnector implements InboundConnector, OutboundConnector, HealthReporter {
    public static final String CONNECTOR_NAME = "smallrye-rabbitmq";
    private final Map<String, RabbitMQClient> clients = new ConcurrentHashMap();
    private final List<RabbitMQConsumer> consumers = new CopyOnWriteArrayList();
    private final Map<String, Flow.Subscription> subscriptions = new ConcurrentHashMap();
    private final Map<String, ChannelStatus> incomingChannelStatus = new ConcurrentHashMap();
    private final Map<String, ChannelStatus> outgoingChannelStatus = new ConcurrentHashMap();

    @Inject
    ExecutionHolder executionHolder;

    @Inject
    @Any
    Instance<RabbitMQOptions> clientOptions;

    @Inject
    @Any
    Instance<CredentialsProvider> credentialsProviders;
    private volatile RabbitMQOpenTelemetryInstrumenter instrumenter;

    @Inject
    @Any
    Instance<RabbitMQFailureHandler.Factory> failureHandlerFactories;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector$ChannelStatus.class */
    public enum ChannelStatus {
        CONNECTED,
        NOT_CONNECTED,
        INITIALISING
    }

    RabbitMQConnector() {
    }

    public static String getExchangeName(RabbitMQConnectorCommonConfiguration rabbitMQConnectorCommonConfiguration) {
        return (String) rabbitMQConnectorCommonConfiguration.getExchangeName().map(str -> {
            return "\"\"".equals(str) ? "" : str;
        }).orElse(rabbitMQConnectorCommonConfiguration.getChannel());
    }

    private Multi<? extends Message<?>> getStreamOfMessages(RabbitMQConsumer rabbitMQConsumer, ConnectionHolder connectionHolder, RabbitMQConnectorIncomingConfiguration rabbitMQConnectorIncomingConfiguration, RabbitMQFailureHandler rabbitMQFailureHandler, RabbitMQAckHandler rabbitMQAckHandler) {
        String queueName = rabbitMQConnectorIncomingConfiguration.getQueueName();
        boolean booleanValue = rabbitMQConnectorIncomingConfiguration.getTracingEnabled().booleanValue();
        String orElse = rabbitMQConnectorIncomingConfiguration.getContentTypeOverride().orElse(null);
        RabbitMQLogging.log.receiverListeningAddress(queueName);
        return booleanValue ? rabbitMQConsumer.toMulti().map(rabbitMQMessage -> {
            return new IncomingRabbitMQMessage(rabbitMQMessage, connectionHolder, rabbitMQFailureHandler, rabbitMQAckHandler, orElse);
        }).map(incomingRabbitMQMessage -> {
            return this.instrumenter.traceIncoming(incomingRabbitMQMessage, RabbitMQTrace.traceQueue(queueName, incomingRabbitMQMessage.message.envelope().getRoutingKey(), incomingRabbitMQMessage.getHeaders()));
        }) : rabbitMQConsumer.toMulti().map(rabbitMQMessage2 -> {
            return new IncomingRabbitMQMessage(rabbitMQMessage2, connectionHolder, rabbitMQFailureHandler, rabbitMQAckHandler, orElse);
        });
    }

    public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
        RabbitMQConnectorIncomingConfiguration rabbitMQConnectorIncomingConfiguration = new RabbitMQConnectorIncomingConfiguration(config);
        if (rabbitMQConnectorIncomingConfiguration.getTracingEnabled().booleanValue() && this.instrumenter == null) {
            this.instrumenter = RabbitMQOpenTelemetryInstrumenter.createForConnector();
        }
        this.incomingChannelStatus.put(rabbitMQConnectorIncomingConfiguration.getChannel(), ChannelStatus.INITIALISING);
        RabbitMQFailureHandler createFailureHandler = createFailureHandler(rabbitMQConnectorIncomingConfiguration);
        RabbitMQAckHandler createAckHandler = createAckHandler(rabbitMQConnectorIncomingConfiguration);
        Multi flatMap = Multi.createFrom().range(0, rabbitMQConnectorIncomingConfiguration.getConnectionCount().intValue()).onItem().transformToUniAndMerge(num -> {
            RabbitMQClient createClient = RabbitMQClientHelper.createClient(this, rabbitMQConnectorIncomingConfiguration, this.clientOptions, this.credentialsProviders);
            createClient.getDelegate().addConnectionEstablishedCallback(promise -> {
                UniSubscribe subscribe = Uni.createFrom().nullItem().onItem().call(obj -> {
                    return rabbitMQConnectorIncomingConfiguration.getMaxOutstandingMessages().isPresent() ? createClient.basicQos(rabbitMQConnectorIncomingConfiguration.getMaxOutstandingMessages().get().intValue(), false) : Uni.createFrom().nullItem();
                }).onItem().call(() -> {
                    return establishQueue(createClient, rabbitMQConnectorIncomingConfiguration);
                }).onItem().call(() -> {
                    return establishDLQ(createClient, rabbitMQConnectorIncomingConfiguration);
                }).subscribe();
                Consumer consumer = obj2 -> {
                    promise.complete();
                };
                Objects.requireNonNull(promise);
                subscribe.with(consumer, promise::fail);
            });
            ConnectionHolder connectionHolder = new ConnectionHolder(createClient, rabbitMQConnectorIncomingConfiguration, getVertx());
            return connectionHolder.getOrEstablishConnection().invoke(() -> {
                RabbitMQLogging.log.connectionEstablished(num.intValue(), rabbitMQConnectorIncomingConfiguration.getChannel());
            }).flatMap(rabbitMQClient -> {
                return createConsumer(rabbitMQConnectorIncomingConfiguration, rabbitMQClient).map(rabbitMQConsumer -> {
                    return Tuple2.of(connectionHolder, rabbitMQConsumer);
                });
            });
        }).collect().asList().onItem().invoke(() -> {
            this.incomingChannelStatus.put(rabbitMQConnectorIncomingConfiguration.getChannel(), ChannelStatus.CONNECTED);
        }).onItem().transformToMulti(list -> {
            return Multi.createFrom().iterable(list);
        }).flatMap(tuple2 -> {
            return getStreamOfMessages((RabbitMQConsumer) tuple2.getItem2(), (ConnectionHolder) tuple2.getItem1(), rabbitMQConnectorIncomingConfiguration, createFailureHandler, createAckHandler);
        });
        if (Boolean.TRUE.equals(rabbitMQConnectorIncomingConfiguration.getBroadcast())) {
            flatMap = flatMap.broadcast().toAllSubscribers();
        }
        return flatMap;
    }

    private Uni<RabbitMQConsumer> createConsumer(RabbitMQConnectorIncomingConfiguration rabbitMQConnectorIncomingConfiguration, RabbitMQClient rabbitMQClient) {
        return rabbitMQClient.basicConsumer(serverQueueName(rabbitMQConnectorIncomingConfiguration.getQueueName()), new QueueOptions().setAutoAck(rabbitMQConnectorIncomingConfiguration.getAutoAcknowledgement().booleanValue()).setMaxInternalQueueSize(rabbitMQConnectorIncomingConfiguration.getMaxIncomingInternalQueueSize().intValue()).setKeepMostRecent(rabbitMQConnectorIncomingConfiguration.getKeepMostRecent().booleanValue())).onItem().invoke(rabbitMQConsumer -> {
            this.consumers.add(rabbitMQConsumer);
        });
    }

    private Uni<?> establishDLQ(RabbitMQClient rabbitMQClient, RabbitMQConnectorIncomingConfiguration rabbitMQConnectorIncomingConfiguration) {
        String orElse = rabbitMQConnectorIncomingConfiguration.getDeadLetterQueueName().orElse(String.format("%s.dlq", rabbitMQConnectorIncomingConfiguration.getQueueName()));
        String deadLetterExchange = rabbitMQConnectorIncomingConfiguration.getDeadLetterExchange();
        String orElse2 = rabbitMQConnectorIncomingConfiguration.getDeadLetterRoutingKey().orElse(rabbitMQConnectorIncomingConfiguration.getQueueName());
        Uni switchTo = Uni.createFrom().item(() -> {
            if (rabbitMQConnectorIncomingConfiguration.getAutoBindDlq().booleanValue() && rabbitMQConnectorIncomingConfiguration.getDlxDeclare().booleanValue()) {
                return null;
            }
            return deadLetterExchange;
        }).onItem().ifNull().switchTo(() -> {
            return rabbitMQClient.exchangeDeclare(deadLetterExchange, rabbitMQConnectorIncomingConfiguration.getDeadLetterExchangeType(), true, false).onItem().invoke(() -> {
                RabbitMQLogging.log.dlxEstablished(deadLetterExchange);
            }).onFailure().invoke(th -> {
                RabbitMQLogging.log.unableToEstablishDlx(deadLetterExchange, th);
            }).onItem().transform(r3 -> {
                return deadLetterExchange;
            });
        });
        JsonObject jsonObject = new JsonObject();
        rabbitMQConnectorIncomingConfiguration.getDeadLetterDlx().ifPresent(str -> {
            jsonObject.put("x-dead-letter-exchange", str);
        });
        rabbitMQConnectorIncomingConfiguration.getDeadLetterDlxRoutingKey().ifPresent(str2 -> {
            jsonObject.put("x-dead-letter-routing-key", str2);
        });
        rabbitMQConnectorIncomingConfiguration.getDeadLetterQueueType().ifPresent(str3 -> {
            jsonObject.put("x-queue-type", str3);
        });
        rabbitMQConnectorIncomingConfiguration.getDeadLetterQueueMode().ifPresent(str4 -> {
            jsonObject.put("x-queue-mode", str4);
        });
        rabbitMQConnectorIncomingConfiguration.getDeadLetterTtl().ifPresent(l -> {
            if (l.longValue() < 0) {
                throw RabbitMQExceptions.ex.illegalArgumentInvalidQueueTtl();
            }
            jsonObject.put("x-message-ttl", l);
        });
        return switchTo.onItem().transform(str5 -> {
            if (Boolean.TRUE.equals(rabbitMQConnectorIncomingConfiguration.getAutoBindDlq())) {
                return null;
            }
            return orElse;
        }).onItem().ifNull().switchTo(() -> {
            return rabbitMQClient.queueDeclare(orElse, true, false, false, jsonObject).onItem().invoke(() -> {
                RabbitMQLogging.log.queueEstablished(orElse);
            }).onFailure().invoke(th -> {
                RabbitMQLogging.log.unableToEstablishQueue(orElse, th);
            }).onItem().call(declareOk -> {
                return rabbitMQClient.queueBind(orElse, deadLetterExchange, orElse2);
            }).onItem().invoke(() -> {
                RabbitMQLogging.log.deadLetterBindingEstablished(orElse, deadLetterExchange, orElse2);
            }).onFailure().invoke(th2 -> {
                RabbitMQLogging.log.unableToEstablishBinding(orElse, deadLetterExchange, th2);
            }).onItem().transform(declareOk2 -> {
                return orElse;
            });
        });
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
        RabbitMQConnectorOutgoingConfiguration rabbitMQConnectorOutgoingConfiguration = new RabbitMQConnectorOutgoingConfiguration(config);
        this.outgoingChannelStatus.put(rabbitMQConnectorOutgoingConfiguration.getChannel(), ChannelStatus.INITIALISING);
        RabbitMQClient createClient = RabbitMQClientHelper.createClient(this, rabbitMQConnectorOutgoingConfiguration, this.clientOptions, this.credentialsProviders);
        createClient.getDelegate().addConnectionEstablishedCallback(promise -> {
            UniSubscribe subscribe = establishExchange(createClient, rabbitMQConnectorOutgoingConfiguration).subscribe();
            Consumer consumer = str -> {
                promise.complete();
            };
            Objects.requireNonNull(promise);
            subscribe.with(consumer, promise::fail);
        });
        RabbitMQMessageSender rabbitMQMessageSender = new RabbitMQMessageSender(rabbitMQConnectorOutgoingConfiguration, new ConnectionHolder(createClient, rabbitMQConnectorOutgoingConfiguration, getVertx()).getOrEstablishConnection().onItem().transformToUni(rabbitMQClient -> {
            return Uni.createFrom().item(RabbitMQPublisher.create(getVertx(), rabbitMQClient, new RabbitMQPublisherOptions().setReconnectAttempts(rabbitMQConnectorOutgoingConfiguration.getReconnectAttempts()).setReconnectInterval(Duration.ofSeconds(rabbitMQConnectorOutgoingConfiguration.getReconnectInterval().intValue()).toMillis()).setMaxInternalQueueSize(rabbitMQConnectorOutgoingConfiguration.getMaxOutgoingInternalQueueSize().orElse(Integer.MAX_VALUE).intValue())));
        }).onItem().call((v0) -> {
            return v0.start();
        }).invoke(rabbitMQPublisher -> {
            this.outgoingChannelStatus.put(rabbitMQConnectorOutgoingConfiguration.getChannel(), ChannelStatus.CONNECTED);
        }).onFailure().invoke(th -> {
            this.outgoingChannelStatus.put(rabbitMQConnectorOutgoingConfiguration.getChannel(), ChannelStatus.NOT_CONNECTED);
        }).onFailure().recoverWithNull().memoize().indefinitely().onCancellation().invoke(() -> {
            this.outgoingChannelStatus.put(rabbitMQConnectorOutgoingConfiguration.getChannel(), ChannelStatus.NOT_CONNECTED);
        }));
        this.subscriptions.put(rabbitMQConnectorOutgoingConfiguration.getChannel(), rabbitMQMessageSender);
        return MultiUtils.via(rabbitMQMessageSender, multi -> {
            return multi.onFailure().invoke(th2 -> {
                RabbitMQLogging.log.error(rabbitMQConnectorOutgoingConfiguration.getChannel(), th2);
                this.outgoingChannelStatus.put(rabbitMQConnectorOutgoingConfiguration.getChannel(), ChannelStatus.NOT_CONNECTED);
            });
        });
    }

    public HealthReport getReadiness() {
        return getHealth(false);
    }

    public HealthReport getLiveness() {
        return getHealth(false);
    }

    public HealthReport getHealth(boolean z) {
        HealthReport.HealthReportBuilder builder = HealthReport.builder();
        this.incomingChannelStatus.forEach((str, channelStatus) -> {
            builder.add(str, channelStatus == ChannelStatus.CONNECTED);
        });
        this.outgoingChannelStatus.forEach((str2, channelStatus2) -> {
            builder.add(str2, z ? channelStatus2 == ChannelStatus.CONNECTED : channelStatus2 != ChannelStatus.NOT_CONNECTED);
        });
        return builder.build();
    }

    public void terminate(@Priority(50) @Observes(notifyObserver = Reception.IF_EXISTS) @BeforeDestroyed(ApplicationScoped.class) Object obj) {
        this.subscriptions.forEach((str, subscription) -> {
            subscription.cancel();
        });
        this.consumers.forEach(rabbitMQConsumer -> {
            try {
                rabbitMQConsumer.cancelAndAwait();
            } catch (AlreadyClosedException e) {
            }
        });
        this.consumers.clear();
        this.clients.forEach((str2, rabbitMQClient) -> {
            rabbitMQClient.stopAndAwait();
        });
        this.clients.clear();
    }

    public Vertx getVertx() {
        return this.executionHolder.vertx();
    }

    public void addClient(String str, RabbitMQClient rabbitMQClient) {
        this.clients.put(str, rabbitMQClient);
    }

    public void establishQueue(String str, RabbitMQConnectorIncomingConfiguration rabbitMQConnectorIncomingConfiguration) {
        RabbitMQClient rabbitMQClient = this.clients.get(str);
        rabbitMQClient.getDelegate().addConnectionEstablishedCallback(promise -> {
            UniSubscribe subscribe = establishQueue(rabbitMQClient, rabbitMQConnectorIncomingConfiguration).subscribe();
            Consumer consumer = str2 -> {
                promise.complete();
            };
            Objects.requireNonNull(promise);
            subscribe.with(consumer, promise::fail);
        });
    }

    private Uni<String> establishExchange(RabbitMQClient rabbitMQClient, RabbitMQConnectorCommonConfiguration rabbitMQConnectorCommonConfiguration) {
        String exchangeName = getExchangeName(rabbitMQConnectorCommonConfiguration);
        return Boolean.TRUE.equals(rabbitMQConnectorCommonConfiguration.getExchangeDeclare()) && exchangeName.length() != 0 ? rabbitMQClient.exchangeDeclare(exchangeName, rabbitMQConnectorCommonConfiguration.getExchangeType(), rabbitMQConnectorCommonConfiguration.getExchangeDurable().booleanValue(), rabbitMQConnectorCommonConfiguration.getExchangeAutoDelete().booleanValue()).onItem().invoke(() -> {
            RabbitMQLogging.log.exchangeEstablished(exchangeName);
        }).onFailure().invoke(th -> {
            RabbitMQLogging.log.unableToEstablishExchange(exchangeName, th);
        }).onItem().transform(r3 -> {
            return exchangeName;
        }) : Uni.createFrom().item(exchangeName);
    }

    private Uni<String> establishQueue(RabbitMQClient rabbitMQClient, RabbitMQConnectorIncomingConfiguration rabbitMQConnectorIncomingConfiguration) {
        String queueName = rabbitMQConnectorIncomingConfiguration.getQueueName();
        JsonObject jsonObject = new JsonObject();
        if (rabbitMQConnectorIncomingConfiguration.getAutoBindDlq().booleanValue()) {
            jsonObject.put("x-dead-letter-exchange", rabbitMQConnectorIncomingConfiguration.getDeadLetterExchange());
            jsonObject.put("x-dead-letter-routing-key", rabbitMQConnectorIncomingConfiguration.getDeadLetterRoutingKey().orElse(queueName));
        }
        rabbitMQConnectorIncomingConfiguration.getQueueSingleActiveConsumer().ifPresent(bool -> {
            jsonObject.put("x-single-active-consumer", bool);
        });
        rabbitMQConnectorIncomingConfiguration.getQueueXQueueType().ifPresent(str -> {
            jsonObject.put("x-queue-type", str);
        });
        rabbitMQConnectorIncomingConfiguration.getQueueXQueueMode().ifPresent(str2 -> {
            jsonObject.put("x-queue-mode", str2);
        });
        rabbitMQConnectorIncomingConfiguration.getQueueTtl().ifPresent(l -> {
            if (l.longValue() < 0) {
                throw RabbitMQExceptions.ex.illegalArgumentInvalidQueueTtl();
            }
            jsonObject.put("x-message-ttl", l);
        });
        rabbitMQConnectorIncomingConfiguration.getQueueXMaxPriority().ifPresent(num -> {
            jsonObject.put("x-max-priority", num);
        });
        rabbitMQConnectorIncomingConfiguration.getQueueXDeliveryLimit().ifPresent(l2 -> {
            jsonObject.put("x-delivery-limit", l2);
        });
        return establishExchange(rabbitMQClient, rabbitMQConnectorIncomingConfiguration).onItem().transform(str3 -> {
            if (Boolean.TRUE.equals(rabbitMQConnectorIncomingConfiguration.getQueueDeclare())) {
                return null;
            }
            return queueName;
        }).onItem().ifNotNull().call(str4 -> {
            UniOnFailure onFailure = rabbitMQClient.messageCount(str4).onFailure();
            RabbitMQLogging rabbitMQLogging = RabbitMQLogging.log;
            Objects.requireNonNull(rabbitMQLogging);
            return onFailure.invoke(rabbitMQLogging::unableToConnectToBroker);
        }).onItem().ifNull().switchTo(() -> {
            String serverQueueName = serverQueueName(queueName);
            return (serverQueueName.isEmpty() ? rabbitMQClient.queueDeclare(serverQueueName, false, true, true) : rabbitMQClient.queueDeclare(serverQueueName, rabbitMQConnectorIncomingConfiguration.getQueueDurable().booleanValue(), rabbitMQConnectorIncomingConfiguration.getQueueExclusive().booleanValue(), rabbitMQConnectorIncomingConfiguration.getQueueAutoDelete().booleanValue(), jsonObject)).onItem().invoke(() -> {
                RabbitMQLogging.log.queueEstablished(queueName);
            }).onFailure().invoke(th -> {
                RabbitMQLogging.log.unableToEstablishQueue(queueName, th);
            }).onItem().transformToMulti(declareOk -> {
                return establishBindings(rabbitMQClient, rabbitMQConnectorIncomingConfiguration);
            }).onItem().ignoreAsUni().onItem().transform(r3 -> {
                return queueName;
            });
        });
    }

    private Multi<String> establishBindings(RabbitMQClient rabbitMQClient, RabbitMQConnectorIncomingConfiguration rabbitMQConnectorIncomingConfiguration) {
        String exchangeName = getExchangeName(rabbitMQConnectorIncomingConfiguration);
        String queueName = rabbitMQConnectorIncomingConfiguration.getQueueName();
        List list = (List) Arrays.stream(rabbitMQConnectorIncomingConfiguration.getRoutingKeys().split(",")).map((v0) -> {
            return v0.trim();
        }).collect(Collectors.toList());
        Map<String, Object> parseArguments = parseArguments(rabbitMQConnectorIncomingConfiguration.getArguments());
        return exchangeName.isEmpty() ? Multi.createFrom().empty() : Multi.createFrom().iterable(list).onItem().call(str -> {
            return rabbitMQClient.queueBind(serverQueueName(queueName), exchangeName, str, parseArguments);
        }).onItem().invoke(str2 -> {
            RabbitMQLogging.log.bindingEstablished(queueName, exchangeName, str2, parseArguments.toString());
        }).onFailure().invoke(th -> {
            RabbitMQLogging.log.unableToEstablishBinding(queueName, exchangeName, th);
        });
    }

    private Map<String, Object> parseArguments(Optional<String> optional) {
        HashMap hashMap = new HashMap();
        optional.ifPresent(str -> {
            Arrays.stream(str.split(",")).map((v0) -> {
                return v0.trim();
            }).forEach(str -> {
                String[] split = str.split(":");
                if (split.length == 2) {
                    hashMap.put(split[0], split[1]);
                }
            });
        });
        return hashMap;
    }

    private RabbitMQFailureHandler createFailureHandler(RabbitMQConnectorIncomingConfiguration rabbitMQConnectorIncomingConfiguration) {
        String failureStrategy = rabbitMQConnectorIncomingConfiguration.getFailureStrategy();
        Instance instanceById = CDIUtils.getInstanceById(this.failureHandlerFactories, failureStrategy);
        if (instanceById.isResolvable()) {
            return ((RabbitMQFailureHandler.Factory) instanceById.get()).create(rabbitMQConnectorIncomingConfiguration, this);
        }
        throw RabbitMQExceptions.ex.illegalArgumentInvalidFailureStrategy(failureStrategy);
    }

    private String serverQueueName(String str) {
        return str.equals("(server.auto)") ? "" : str;
    }

    public void reportIncomingFailure(String str, Throwable th) {
        RabbitMQLogging.log.failureReported(str, th);
        this.incomingChannelStatus.put(str, ChannelStatus.NOT_CONNECTED);
        Flow.Subscription remove = this.subscriptions.remove(str);
        if (remove != null) {
            remove.cancel();
        }
        RabbitMQClient remove2 = this.clients.remove(str);
        if (remove2 != null) {
            remove2.stopAndForget();
        }
    }

    public RabbitMQAckHandler createAckHandler(RabbitMQConnectorIncomingConfiguration rabbitMQConnectorIncomingConfiguration) {
        return Boolean.TRUE.equals(rabbitMQConnectorIncomingConfiguration.getAutoAcknowledgement()) ? new RabbitMQAutoAck(rabbitMQConnectorIncomingConfiguration.getChannel()) : new RabbitMQAck(rabbitMQConnectorIncomingConfiguration.getChannel());
    }
}
