package org.axonframework.extensions.multitenancy.components.eventstore;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.MultiStreamableMessageSource;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.DomainEventStream;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.extensions.multitenancy.components.MultiTenantAwareComponent;
import org.axonframework.extensions.multitenancy.components.MultiTenantDispatchInterceptorSupport;
import org.axonframework.extensions.multitenancy.components.NoSuchTenantException;
import org.axonframework.extensions.multitenancy.components.TargetTenantResolver;
import org.axonframework.extensions.multitenancy.components.TenantDescriptor;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;

/* loaded from: input_file:org/axonframework/extensions/multitenancy/components/eventstore/MultiTenantEventStore.class */
public class MultiTenantEventStore implements EventStore, MultiTenantAwareComponent, MultiTenantDispatchInterceptorSupport<EventMessage<?>, EventStore> {
    private final Map<TenantDescriptor, EventStore> tenantSegments = new ConcurrentHashMap();
    private final List<Consumer<List<? extends EventMessage<?>>>> messageProcessors = new CopyOnWriteArrayList();
    private final List<MessageDispatchInterceptor<? super EventMessage<?>>> dispatchInterceptors = new CopyOnWriteArrayList();
    private final Map<TenantDescriptor, List<Registration>> dispatchInterceptorsRegistration = new ConcurrentHashMap();
    private final Map<TenantDescriptor, Registration> subscribeRegistrations = new ConcurrentHashMap();
    private final TenantEventSegmentFactory tenantSegmentFactory;
    private final TargetTenantResolver<Message<?>> targetTenantResolver;
    private MultiStreamableMessageSource multiSource;

    /* loaded from: input_file:org/axonframework/extensions/multitenancy/components/eventstore/MultiTenantEventStore$Builder.class */
    public static class Builder {
        public TenantEventSegmentFactory tenantSegmentFactory;
        public TargetTenantResolver<Message<?>> targetTenantResolver;

        public Builder tenantSegmentFactory(TenantEventSegmentFactory tenantEventSegmentFactory) {
            BuilderUtils.assertNonNull(tenantEventSegmentFactory, "The TenantEventSegmentFactory is a hard requirement");
            this.tenantSegmentFactory = tenantEventSegmentFactory;
            return this;
        }

        public Builder targetTenantResolver(TargetTenantResolver<Message<?>> targetTenantResolver) {
            BuilderUtils.assertNonNull(targetTenantResolver, "");
            this.targetTenantResolver = targetTenantResolver;
            return this;
        }

        public MultiTenantEventStore build() {
            return new MultiTenantEventStore(this);
        }

        protected void validate() {
            BuilderUtils.assertNonNull(this.targetTenantResolver, "The TargetTenantResolver is a hard requirement");
            BuilderUtils.assertNonNull(this.tenantSegmentFactory, "The TenantEventProcessorSegmentFactory is a hard requirement");
        }
    }

    public MultiTenantEventStore(Builder builder) {
        builder.validate();
        this.tenantSegmentFactory = builder.tenantSegmentFactory;
        this.targetTenantResolver = builder.targetTenantResolver;
    }

    public static Builder builder() {
        return new Builder();
    }

    public void publish(List<? extends EventMessage<?>> list) {
        ((EventStore) list.stream().findFirst().map((v1) -> {
            return resolveTenant(v1);
        }).orElseGet(this::resolveSegment)).publish(list);
    }

    public void publish(EventMessage<?>... eventMessageArr) {
        ((EventStore) Optional.ofNullable(eventMessageArr[0]).map((v1) -> {
            return resolveTenant(v1);
        }).orElseGet(this::resolveSegment)).publish(eventMessageArr);
    }

    public Registration subscribe(Consumer<List<? extends EventMessage<?>>> consumer) {
        this.messageProcessors.add(consumer);
        this.tenantSegments.forEach((tenantDescriptor, eventStore) -> {
            this.subscribeRegistrations.putIfAbsent(tenantDescriptor, eventStore.subscribe(consumer));
        });
        return () -> {
            return ((Boolean) this.subscribeRegistrations.values().stream().map((v0) -> {
                return v0.cancel();
            }).reduce((bool, bool2) -> {
                return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
            }).orElse(false)).booleanValue();
        };
    }

    @Override // org.axonframework.extensions.multitenancy.components.MultiTenantAwareComponent
    public Registration registerTenant(TenantDescriptor tenantDescriptor) {
        this.tenantSegments.putIfAbsent(tenantDescriptor, this.tenantSegmentFactory.apply(tenantDescriptor));
        return () -> {
            return unregisterTenant(tenantDescriptor) != null;
        };
    }

    public EventStore unregisterTenant(TenantDescriptor tenantDescriptor) {
        Registration remove = this.subscribeRegistrations.remove(tenantDescriptor);
        if (remove != null) {
            remove.cancel();
        }
        return this.tenantSegments.remove(tenantDescriptor);
    }

    @Override // org.axonframework.extensions.multitenancy.components.MultiTenantAwareComponent
    public Registration registerAndStartTenant(TenantDescriptor tenantDescriptor) {
        this.tenantSegments.computeIfAbsent(tenantDescriptor, tenantDescriptor2 -> {
            EventStore apply = this.tenantSegmentFactory.apply(tenantDescriptor);
            this.dispatchInterceptors.forEach(messageDispatchInterceptor -> {
                this.dispatchInterceptorsRegistration.computeIfAbsent(tenantDescriptor, tenantDescriptor2 -> {
                    return new CopyOnWriteArrayList();
                }).add(apply.registerDispatchInterceptor(messageDispatchInterceptor));
            });
            this.messageProcessors.forEach(consumer -> {
                this.subscribeRegistrations.putIfAbsent(tenantDescriptor, apply.subscribe(consumer));
            });
            return apply;
        });
        return () -> {
            return unregisterTenant(tenantDescriptor) != null;
        };
    }

    private EventStore resolveTenantSilently(Message<?> message) {
        return this.tenantSegments.get(this.targetTenantResolver.resolveTenant(message, this.tenantSegments.keySet()));
    }

    private EventStore resolveTenant(Message<?> message) {
        TenantDescriptor resolveTenant = this.targetTenantResolver.resolveTenant(message, this.tenantSegments.keySet());
        EventStore eventStore = this.tenantSegments.get(resolveTenant);
        if (eventStore == null) {
            throw new NoSuchTenantException(resolveTenant.tenantId());
        }
        return eventStore;
    }

    private EventStore resolveSegment() {
        return resolveTenantSilently(CurrentUnitOfWork.get().getMessage());
    }

    public DomainEventStream readEvents(String str) {
        return resolveSegment().readEvents(str);
    }

    public DomainEventStream readEvents(String str, TenantDescriptor tenantDescriptor) {
        return this.tenantSegments.get(tenantDescriptor).readEvents(str);
    }

    public void storeSnapshot(DomainEventMessage<?> domainEventMessage) {
        resolveSegment().storeSnapshot(domainEventMessage);
    }

    public void storeSnapshot(DomainEventMessage<?> domainEventMessage, TenantDescriptor tenantDescriptor) {
        this.tenantSegments.get(tenantDescriptor).storeSnapshot(domainEventMessage);
    }

    public BlockingStream<TrackedEventMessage<?>> openStream(TrackingToken trackingToken) {
        return multiSource().openStream(trackingToken);
    }

    private MultiStreamableMessageSource multiSource() {
        if (Objects.isNull(this.multiSource)) {
            MultiStreamableMessageSource.Builder builder = MultiStreamableMessageSource.builder();
            this.tenantSegments.forEach((tenantDescriptor, eventStore) -> {
                builder.addMessageSource(tenantDescriptor.tenantId(), eventStore);
            });
            this.multiSource = builder.build();
        }
        return this.multiSource;
    }

    public TrackingToken createTailToken() {
        return multiSource().createTailToken();
    }

    public TrackingToken createHeadToken() {
        return multiSource().createHeadToken();
    }

    public TrackingToken createTokenAt(Instant instant) {
        return multiSource().createTokenAt(instant);
    }

    public TrackingToken createTokenSince(Duration duration) {
        return multiSource().createTokenSince(duration);
    }

    public EventStore tenantSegment(TenantDescriptor tenantDescriptor) {
        return this.tenantSegments.get(tenantDescriptor);
    }

    @Override // org.axonframework.extensions.multitenancy.components.MultiTenantDispatchInterceptorSupport, org.axonframework.extensions.multitenancy.components.MultiTenantHandlerInterceptorSupport
    public Map<TenantDescriptor, EventStore> tenantSegments() {
        return this.tenantSegments;
    }

    @Override // org.axonframework.extensions.multitenancy.components.MultiTenantDispatchInterceptorSupport
    public List<MessageDispatchInterceptor<? super EventMessage<?>>> getDispatchInterceptors() {
        return this.dispatchInterceptors;
    }

    @Override // org.axonframework.extensions.multitenancy.components.MultiTenantDispatchInterceptorSupport
    public Map<TenantDescriptor, List<Registration>> getDispatchInterceptorsRegistration() {
        return this.dispatchInterceptorsRegistration;
    }
}
