package org.springframework.data.cassandra.observability;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.springframework.data.cassandra.ReactiveResultSet;
import org.springframework.data.cassandra.ReactiveSession;
import org.springframework.data.cassandra.config.CqlSessionFactoryBean;
import org.springframework.lang.Nullable;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;

/* loaded from: input_file:org/springframework/data/cassandra/observability/ObservableReactiveSession.class */
public class ObservableReactiveSession implements ReactiveSession {
    private final ReactiveSession delegate;
    private final String remoteServiceName;
    private final ObservationRegistry observationRegistry;
    private final CassandraObservationConvention convention = new DefaultCassandraObservationConvention();

    ObservableReactiveSession(ReactiveSession reactiveSession, String str, ObservationRegistry observationRegistry) {
        this.delegate = reactiveSession;
        this.remoteServiceName = str;
        this.observationRegistry = observationRegistry;
    }

    public static ReactiveSession create(ReactiveSession reactiveSession, ObservationRegistry observationRegistry) {
        return new ObservableReactiveSession(reactiveSession, "Cassandra", observationRegistry);
    }

    public static ReactiveSession create(ReactiveSession reactiveSession, String str, ObservationRegistry observationRegistry) {
        return new ObservableReactiveSession(reactiveSession, str, observationRegistry);
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public boolean isClosed() {
        return this.delegate.isClosed();
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public DriverContext getContext() {
        return this.delegate.getContext();
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public Optional<CqlIdentifier> getKeyspace() {
        return this.delegate.getKeyspace();
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public Metadata getMetadata() {
        return this.delegate.getMetadata();
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public Mono<ReactiveResultSet> execute(String str) {
        return execute((Statement<?>) SimpleStatement.newInstance(str));
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public Mono<ReactiveResultSet> execute(String str, Object... objArr) {
        return execute((Statement<?>) SimpleStatement.newInstance(str, objArr));
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public Mono<ReactiveResultSet> execute(String str, Map<String, Object> map) {
        return execute((Statement<?>) SimpleStatement.newInstance(str, map));
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public Mono<ReactiveResultSet> execute(Statement<?> statement) {
        return ObservationStatement.isObservationStatement(statement) ? this.delegate.execute(statement) : Mono.deferContextual(contextView -> {
            return this.delegate.execute((Statement<?>) ObservationStatement.createProxy(startObservation(getParentObservation(contextView), statement, false, "execute"), statement));
        });
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public Mono<PreparedStatement> prepare(String str) {
        return prepare(SimpleStatement.newInstance(str));
    }

    @Override // org.springframework.data.cassandra.ReactiveSession
    public Mono<PreparedStatement> prepare(SimpleStatement simpleStatement) {
        return ObservationStatement.isObservationStatement(simpleStatement) ? this.delegate.prepare(simpleStatement) : Mono.deferContextual(contextView -> {
            Observation startObservation = startObservation(getParentObservation(contextView), simpleStatement, true, "prepare");
            Mono<PreparedStatement> prepare = this.delegate.prepare((SimpleStatement) ObservationStatement.createProxy(startObservation, simpleStatement));
            Objects.requireNonNull(startObservation);
            return prepare.doOnError(startObservation::error).doFinally(signalType -> {
                startObservation.stop();
            });
        });
    }

    @Override // org.springframework.data.cassandra.ReactiveSession, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.delegate.close();
    }

    private Observation startObservation(@Nullable Observation observation, Statement<?> statement, boolean z, String str) {
        Observation observationConvention = Observation.createNotStarted(str, () -> {
            return new CassandraObservationContext(statement, this.remoteServiceName, z, str, this.delegate.getContext().getSessionName(), (String) this.delegate.getKeyspace().map((v0) -> {
                return v0.asInternal();
            }).orElse(CqlSessionFactoryBean.CASSANDRA_SYSTEM_SESSION));
        }, this.observationRegistry).observationConvention(this.convention);
        if (observation != null) {
            observationConvention.parentObservation(observation);
        }
        return observationConvention.start();
    }

    @Nullable
    private static Observation getParentObservation(ContextView contextView) {
        return (Observation) contextView.getOrDefault("micrometer.observation", (Object) null);
    }
}
