package org.axonframework.eventsourcing.eventstore;

import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.eventhandling.AbstractEventBus;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.monitoring.MessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventsourcing/eventstore/AbstractEventStore.class */
public abstract class AbstractEventStore extends AbstractEventBus implements EventStore {
    private static final Logger logger = LoggerFactory.getLogger(AbstractEventStore.class);
    private final EventStorageEngine storageEngine;

    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/AbstractEventStore$Builder.class */
    public static abstract class Builder extends AbstractEventBus.Builder {
        protected EventStorageEngine storageEngine;

        public Builder messageMonitor(MessageMonitor<? super EventMessage<?>> messageMonitor) {
            super.messageMonitor(messageMonitor);
            return this;
        }

        public Builder storageEngine(EventStorageEngine eventStorageEngine) {
            BuilderUtils.assertNonNull(eventStorageEngine, "EventStorageEngine may not be null");
            this.storageEngine = eventStorageEngine;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void validate() throws AxonConfigurationException {
            super.validate();
            BuilderUtils.assertNonNull(this.storageEngine, "The EventStorageEngine is a hard requirement and should be provided");
        }

        /* renamed from: messageMonitor, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ AbstractEventBus.Builder mo26messageMonitor(MessageMonitor messageMonitor) {
            return messageMonitor((MessageMonitor<? super EventMessage<?>>) messageMonitor);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractEventStore(Builder builder) {
        super(builder);
        this.storageEngine = builder.storageEngine;
    }

    protected void prepareCommit(List<? extends EventMessage<?>> list) {
        this.storageEngine.appendEvents(list);
        super.prepareCommit(list);
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStore
    public DomainEventStream readEvents(String str) {
        Optional<DomainEventMessage<?>> handleSnapshotReadingError;
        DomainEventStream readEvents;
        try {
            handleSnapshotReadingError = this.storageEngine.readSnapshot(str);
        } catch (Exception | LinkageError e) {
            handleSnapshotReadingError = handleSnapshotReadingError(str, e);
        }
        if (handleSnapshotReadingError.isPresent()) {
            DomainEventMessage<?> domainEventMessage = handleSnapshotReadingError.get();
            readEvents = DomainEventStream.concat(DomainEventStream.of(domainEventMessage), this.storageEngine.readEvents(str, domainEventMessage.getSequenceNumber() + 1));
        } else {
            readEvents = this.storageEngine.readEvents(str);
        }
        return DomainEventStream.concat(readEvents, DomainEventStream.of(stagedDomainEventMessages(str)));
    }

    protected Optional<DomainEventMessage<?>> handleSnapshotReadingError(String str, Throwable th) {
        logger.warn("Error reading snapshot for aggregate [{}]. Reconstructing from entire event stream.", str, th);
        return Optional.empty();
    }

    protected Stream<? extends DomainEventMessage<?>> stagedDomainEventMessages(String str) {
        return queuedMessages().stream().filter(eventMessage -> {
            return eventMessage instanceof DomainEventMessage;
        }).map(eventMessage2 -> {
            return (DomainEventMessage) eventMessage2;
        }).filter(domainEventMessage -> {
            return str.equals(domainEventMessage.getAggregateIdentifier());
        });
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStore
    public DomainEventStream readEvents(String str, long j) {
        return DomainEventStream.concat(this.storageEngine.readEvents(str, j), DomainEventStream.of(stagedDomainEventMessages(str).filter(domainEventMessage -> {
            return domainEventMessage.getSequenceNumber() >= j;
        })));
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStore
    public void storeSnapshot(DomainEventMessage<?> domainEventMessage) {
        this.storageEngine.storeSnapshot(domainEventMessage);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public EventStorageEngine storageEngine() {
        return this.storageEngine;
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStore
    public Optional<Long> lastSequenceNumberFor(String str) {
        Optional<Long> max = stagedDomainEventMessages(str).map((v0) -> {
            return v0.getSequenceNumber();
        }).max((v0, v1) -> {
            return v0.compareTo(v1);
        });
        return max.isPresent() ? max : this.storageEngine.lastSequenceNumberFor(str);
    }

    public TrackingToken createTailToken() {
        return this.storageEngine.createTailToken();
    }

    public TrackingToken createHeadToken() {
        return this.storageEngine.createHeadToken();
    }

    public TrackingToken createTokenAt(Instant instant) {
        return this.storageEngine.createTokenAt(instant);
    }
}
