package io.micronaut.data.connection.support;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.propagation.ReactorPropagation;
import io.micronaut.core.propagation.PropagatedContextElement;
import io.micronaut.data.connection.ConnectionDefinition;
import io.micronaut.data.connection.ConnectionStatus;
import io.micronaut.data.connection.exceptions.NoConnectionException;
import io.micronaut.data.connection.reactive.DefaultReactiveConnectionStatus;
import io.micronaut.data.connection.reactive.ReactiveStreamsConnectionOperations;
import io.micronaut.data.connection.reactive.ReactorConnectionOperations;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

@Internal
/* loaded from: input_file:io/micronaut/data/connection/support/AbstractReactorConnectionOperations.class */
public abstract class AbstractReactorConnectionOperations<C> implements ReactorConnectionOperations<C> {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/micronaut/data/connection/support/AbstractReactorConnectionOperations$ClientSessionPropagatedContext.class */
    public static final class ClientSessionPropagatedContext<C> extends Record implements PropagatedContextElement {
        private final ReactiveStreamsConnectionOperations<?> connectionOperations;
        private final ConnectionStatus<C> status;

        private ClientSessionPropagatedContext(ReactiveStreamsConnectionOperations<?> reactiveStreamsConnectionOperations, ConnectionStatus<C> connectionStatus) {
            this.connectionOperations = reactiveStreamsConnectionOperations;
            this.status = connectionStatus;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ClientSessionPropagatedContext.class), ClientSessionPropagatedContext.class, "connectionOperations;status", "FIELD:Lio/micronaut/data/connection/support/AbstractReactorConnectionOperations$ClientSessionPropagatedContext;->connectionOperations:Lio/micronaut/data/connection/reactive/ReactiveStreamsConnectionOperations;", "FIELD:Lio/micronaut/data/connection/support/AbstractReactorConnectionOperations$ClientSessionPropagatedContext;->status:Lio/micronaut/data/connection/ConnectionStatus;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ClientSessionPropagatedContext.class), ClientSessionPropagatedContext.class, "connectionOperations;status", "FIELD:Lio/micronaut/data/connection/support/AbstractReactorConnectionOperations$ClientSessionPropagatedContext;->connectionOperations:Lio/micronaut/data/connection/reactive/ReactiveStreamsConnectionOperations;", "FIELD:Lio/micronaut/data/connection/support/AbstractReactorConnectionOperations$ClientSessionPropagatedContext;->status:Lio/micronaut/data/connection/ConnectionStatus;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ClientSessionPropagatedContext.class, Object.class), ClientSessionPropagatedContext.class, "connectionOperations;status", "FIELD:Lio/micronaut/data/connection/support/AbstractReactorConnectionOperations$ClientSessionPropagatedContext;->connectionOperations:Lio/micronaut/data/connection/reactive/ReactiveStreamsConnectionOperations;", "FIELD:Lio/micronaut/data/connection/support/AbstractReactorConnectionOperations$ClientSessionPropagatedContext;->status:Lio/micronaut/data/connection/ConnectionStatus;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public ReactiveStreamsConnectionOperations<?> connectionOperations() {
            return this.connectionOperations;
        }

        public ConnectionStatus<C> status() {
            return this.status;
        }
    }

    @NonNull
    protected abstract Publisher<C> openConnection(@NonNull ConnectionDefinition connectionDefinition);

    @NonNull
    protected abstract Publisher<Void> closeConnection(@NonNull C c, @NonNull ConnectionDefinition connectionDefinition);

    @Override // io.micronaut.data.connection.reactive.ReactorConnectionOperations
    public final Optional<ConnectionStatus<C>> findConnectionStatus(@NonNull ContextView contextView) {
        return (Optional<ConnectionStatus<C>>) findPropagateContextElement(contextView).map(clientSessionPropagatedContext -> {
            return clientSessionPropagatedContext.status;
        });
    }

    private Optional<ClientSessionPropagatedContext> findPropagateContextElement(ContextView contextView) {
        return ReactorPropagation.findAllContextElements(contextView, ClientSessionPropagatedContext.class).filter(clientSessionPropagatedContext -> {
            return clientSessionPropagatedContext.connectionOperations == this;
        }).findFirst();
    }

    @Override // io.micronaut.data.connection.reactive.ReactorConnectionOperations
    @NonNull
    public <T> Flux<T> withConnectionFlux(@NonNull ConnectionDefinition connectionDefinition, @NonNull Function<ConnectionStatus<C>, Flux<T>> function) {
        Objects.requireNonNull(function, "Callback cannot be null");
        return Flux.deferContextual(contextView -> {
            C findConnection = findConnection(contextView);
            if (findConnection != null) {
                switch (connectionDefinition.getPropagationBehavior()) {
                    case REQUIRED:
                    case MANDATORY:
                        return existingConnectionFlux(connectionDefinition, function, findConnection);
                    case REQUIRES_NEW:
                        return openConnectionFlux(connectionDefinition, function);
                    default:
                        throw new IncompatibleClassChangeError();
                }
            }
            switch (connectionDefinition.getPropagationBehavior()) {
                case REQUIRED:
                case REQUIRES_NEW:
                    return openConnectionFlux(connectionDefinition, function);
                case MANDATORY:
                    throw noConnectionFound();
                default:
                    throw new IncompatibleClassChangeError();
            }
        });
    }

    private <T> Flux<T> existingConnectionFlux(ConnectionDefinition connectionDefinition, Function<ConnectionStatus<C>, Flux<T>> function, C c) {
        return function.apply(new DefaultReactiveConnectionStatus(c, connectionDefinition, false));
    }

    private <T> Flux<T> openConnectionFlux(ConnectionDefinition connectionDefinition, Function<ConnectionStatus<C>, Flux<T>> function) {
        return Flux.usingWhen(Mono.from(openConnection(connectionDefinition)).map(obj -> {
            return new DefaultReactiveConnectionStatus(obj, connectionDefinition, true);
        }), defaultReactiveConnectionStatus -> {
            return ((Flux) function.apply(defaultReactiveConnectionStatus)).contextWrite(context -> {
                return addClientSession(context, defaultReactiveConnectionStatus);
            });
        }, defaultReactiveConnectionStatus2 -> {
            return defaultReactiveConnectionStatus2.onComplete(() -> {
                return closeConnection(defaultReactiveConnectionStatus2.getConnection(), connectionDefinition);
            });
        }, (defaultReactiveConnectionStatus3, th) -> {
            return defaultReactiveConnectionStatus3.onError(th, () -> {
                return closeConnection(defaultReactiveConnectionStatus3.getConnection(), connectionDefinition);
            });
        }, defaultReactiveConnectionStatus4 -> {
            return defaultReactiveConnectionStatus4.onCancel(() -> {
                return closeConnection(defaultReactiveConnectionStatus4.getConnection(), connectionDefinition);
            });
        });
    }

    @Override // io.micronaut.data.connection.reactive.ReactorConnectionOperations
    @NonNull
    public <T> Mono<T> withConnectionMono(@NonNull ConnectionDefinition connectionDefinition, @NonNull Function<ConnectionStatus<C>, Mono<T>> function) {
        Objects.requireNonNull(function, "Callback cannot be null");
        return Mono.deferContextual(contextView -> {
            C findConnection = findConnection(contextView);
            if (findConnection != null) {
                switch (connectionDefinition.getPropagationBehavior()) {
                    case REQUIRED:
                    case MANDATORY:
                        return existingConnectionMono(connectionDefinition, function, findConnection);
                    case REQUIRES_NEW:
                        return openConnectionMono(connectionDefinition, function);
                    default:
                        throw new IncompatibleClassChangeError();
                }
            }
            switch (connectionDefinition.getPropagationBehavior()) {
                case REQUIRED:
                case REQUIRES_NEW:
                    return openConnectionMono(connectionDefinition, function);
                case MANDATORY:
                    throw noConnectionFound();
                default:
                    throw new IncompatibleClassChangeError();
            }
        });
    }

    private <T> Mono<T> existingConnectionMono(ConnectionDefinition connectionDefinition, Function<ConnectionStatus<C>, Mono<T>> function, C c) {
        return function.apply(new DefaultReactiveConnectionStatus(c, connectionDefinition, false));
    }

    private <T> Mono<T> openConnectionMono(ConnectionDefinition connectionDefinition, Function<ConnectionStatus<C>, Mono<T>> function) {
        return Mono.usingWhen(Mono.from(openConnection(connectionDefinition)).map(obj -> {
            return new DefaultReactiveConnectionStatus(obj, connectionDefinition, true);
        }), defaultReactiveConnectionStatus -> {
            return ((Mono) function.apply(defaultReactiveConnectionStatus)).contextWrite(context -> {
                return addClientSession(context, defaultReactiveConnectionStatus);
            });
        }, defaultReactiveConnectionStatus2 -> {
            return defaultReactiveConnectionStatus2.onComplete(() -> {
                return closeConnection(defaultReactiveConnectionStatus2.getConnection(), connectionDefinition);
            });
        }, (defaultReactiveConnectionStatus3, th) -> {
            return defaultReactiveConnectionStatus3.onError(th, () -> {
                return closeConnection(defaultReactiveConnectionStatus3.getConnection(), connectionDefinition);
            });
        }, defaultReactiveConnectionStatus4 -> {
            return defaultReactiveConnectionStatus4.onCancel(() -> {
                return closeConnection(defaultReactiveConnectionStatus4.getConnection(), connectionDefinition);
            });
        });
    }

    private NoConnectionException noConnectionFound() {
        return new NoConnectionException("No existing connection found for connection marked with propagation 'mandatory'");
    }

    @NonNull
    private Context addClientSession(@NonNull Context context, @NonNull ConnectionStatus<C> connectionStatus) {
        return ReactorPropagation.addContextElement(context, new ClientSessionPropagatedContext(this, connectionStatus));
    }

    @Nullable
    private C findConnection(@NonNull ContextView contextView) {
        return (C) findConnectionStatus(contextView).map((v0) -> {
            return v0.getConnection();
        }).orElse(null);
    }
}
