package com.azure.spring.integration.eventhub.impl;

import com.azure.spring.cloud.context.core.util.Tuple;
import com.azure.spring.integration.core.api.PartitionSupplier;
import com.azure.spring.integration.eventhub.api.EventHubClientFactory;
import com.azure.spring.integration.eventhub.api.EventHubRxOperation;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.messaging.Message;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.subscriptions.Subscriptions;

@Deprecated
/* loaded from: input_file:com/azure/spring/integration/eventhub/impl/EventHubRxTemplate.class */
public class EventHubRxTemplate extends AbstractEventHubTemplate implements EventHubRxOperation {
    private final ConcurrentHashMap<Tuple<String, String>, Observable<Message<?>>> subjectByNameAndGroup;

    public EventHubRxTemplate(EventHubClientFactory eventHubClientFactory) {
        super(eventHubClientFactory);
        this.subjectByNameAndGroup = new ConcurrentHashMap<>();
    }

    private static <T> Observable<T> toObservable(Mono<T> mono) {
        return Observable.create(subscriber -> {
            mono.toFuture().whenComplete((obj, th) -> {
                if (th != null) {
                    subscriber.onError(th);
                } else {
                    subscriber.onNext(obj);
                    subscriber.onCompleted();
                }
            });
        });
    }

    public <T> Observable<Void> sendRx(String str, Message<T> message, PartitionSupplier partitionSupplier) {
        return toObservable(sendAsync(str, message, partitionSupplier));
    }

    public Observable<Message<?>> subscribe(String str, String str2, Class<?> cls) {
        Tuple<String, String> of = Tuple.of(str, str2);
        this.subjectByNameAndGroup.computeIfAbsent(of, tuple -> {
            return Observable.create(subscriber -> {
                Objects.requireNonNull(subscriber);
                createEventProcessorClient(str, str2, new EventHubProcessor((v1) -> {
                    r2.onNext(v1);
                }, cls, getCheckpointConfig(), getMessageConverter()));
                startEventProcessorClient(str, str2);
                subscriber.add(Subscriptions.create(() -> {
                    stopEventProcessorClient(str, str2);
                }));
            }).share();
        });
        return this.subjectByNameAndGroup.get(of);
    }
}
