/*
 * Decompiled with CFR 0.152.
 */
package co.com.sofka.infraestructure.asyn;

import co.com.sofka.business.generic.UnexpectedException;
import co.com.sofka.domain.generic.DomainEvent;
import co.com.sofka.infraestructure.bus.EventBus;
import co.com.sofka.infraestructure.event.ErrorEvent;
import co.com.sofka.infraestructure.repository.EventStoreRepository;
import co.com.sofka.infraestructure.store.StoredEvent;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.logging.Logger;

public class SubscriberEvent
implements Flow.Subscriber<DomainEvent> {
    private static final Logger logger = Logger.getLogger(SubscriberEvent.class.getName());
    private final EventStoreRepository repository;
    private final EventBus eventBus;
    private Flow.Subscription subscription;

    public SubscriberEvent(EventStoreRepository repository, EventBus eventBus) {
        this.repository = repository;
        this.eventBus = eventBus;
    }

    public SubscriberEvent(EventStoreRepository repository) {
        this(repository, null);
    }

    public SubscriberEvent() {
        this(null, null);
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(100L);
    }

    @Override
    public final void onNext(DomainEvent event) {
        Optional.ofNullable(this.eventBus).ifPresentOrElse(bus -> bus.publish(event), () -> logger.warning("No EVENT BUS configured"));
        Optional.ofNullable(this.repository).ifPresentOrElse(repo -> {
            StoredEvent storedEvent = StoredEvent.wrapEvent(event);
            Optional.ofNullable(event.aggregateRootId()).ifPresent(aggregateId -> {
                if (Objects.nonNull(event.getAggregateName()) && !event.getAggregateName().isBlank()) {
                    repo.saveEvent(event.getAggregateName(), (String)aggregateId, storedEvent);
                }
            });
        }, () -> logger.warning("No REPOSITORY configured"));
        this.subscription.request(100L);
    }

    @Override
    public void onError(Throwable throwable) {
        Optional.ofNullable(this.eventBus).ifPresent(bus -> {
            String identify = ((UnexpectedException)throwable).getIdentify();
            ErrorEvent event = new ErrorEvent(identify, throwable);
            bus.publishError(event);
        });
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
    }
}

