package com.azure.spring.integration.eventhub.support;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.spring.integration.core.api.PartitionSupplier;
import com.azure.spring.integration.core.api.StartPosition;
import com.azure.spring.integration.eventhub.api.EventHubClientFactory;
import com.azure.spring.integration.eventhub.impl.EventHubProcessor;
import com.azure.spring.integration.eventhub.impl.EventHubTemplate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/spring/integration/eventhub/support/EventHubTestOperation.class */
public class EventHubTestOperation extends EventHubTemplate {
    private final Map<String, List<EventData>> eventHubsByName;
    private final Map<String, Map<String, EventHubProcessor>> processorsByNameAndGroup;
    private final Supplier<EventContext> eventContextSupplier;

    public EventHubTestOperation(EventHubClientFactory eventHubClientFactory, Supplier<EventContext> supplier) {
        super(eventHubClientFactory);
        this.eventHubsByName = new HashMap();
        this.processorsByNameAndGroup = new ConcurrentHashMap();
        this.eventContextSupplier = supplier;
    }

    @Override // com.azure.spring.integration.eventhub.impl.AbstractEventHubTemplate
    public <U> Mono<Void> sendAsync(String str, @NonNull Message<U> message, PartitionSupplier partitionSupplier) {
        EventData eventData = (EventData) getMessageConverter().fromMessage(message, EventData.class);
        if (this.eventHubsByName.containsKey(str)) {
            this.eventHubsByName.get(str).add(eventData);
        } else {
            ArrayList arrayList = new ArrayList(1);
            arrayList.add(eventData);
            this.eventHubsByName.put(str, arrayList);
        }
        this.processorsByNameAndGroup.putIfAbsent(str, new ConcurrentHashMap());
        this.processorsByNameAndGroup.get(str).values().forEach(eventHubProcessor -> {
            ((EventHubProcessorSupport) eventHubProcessor).onEvent(this.eventContextSupplier.get(), eventData);
        });
        return Mono.empty();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.azure.spring.integration.eventhub.impl.AbstractEventHubTemplate
    public synchronized void createEventProcessorClient(String str, String str2, EventHubProcessor eventHubProcessor) {
        this.processorsByNameAndGroup.putIfAbsent(str, new ConcurrentHashMap());
        this.processorsByNameAndGroup.get(str).putIfAbsent(str2, eventHubProcessor);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.azure.spring.integration.eventhub.impl.AbstractEventHubTemplate
    public void startEventProcessorClient(String str, String str2) {
        if (getStartPosition() == StartPosition.EARLIEST) {
            this.processorsByNameAndGroup.get(str).values().forEach(eventHubProcessor -> {
                EventHubProcessorSupport eventHubProcessorSupport = (EventHubProcessorSupport) eventHubProcessor;
                if (this.eventHubsByName.containsKey(str)) {
                    this.eventHubsByName.get(str).forEach(eventData -> {
                        eventHubProcessorSupport.onEvent(this.eventContextSupplier.get(), eventData);
                    });
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.azure.spring.integration.eventhub.impl.AbstractEventHubTemplate
    public void stopEventProcessorClient(String str, String str2) {
        this.processorsByNameAndGroup.get(str).remove(str2);
    }

    @Override // com.azure.spring.integration.eventhub.impl.EventHubTemplate
    public EventHubProcessor createEventProcessor(Consumer<Message<?>> consumer, Class<?> cls) {
        return new EventHubProcessorSupport(consumer, cls, getCheckpointConfig(), getMessageConverter());
    }
}
