/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spanner.r2dbc.v2;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.AsyncResultSet;
import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.ReadContext;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.r2dbc.SpannerConnectionConfiguration;
import com.google.cloud.spanner.r2dbc.v2.DatabaseClientTransactionManager;
import com.google.cloud.spanner.r2dbc.v2.ReactiveResultSetCallback;
import com.google.cloud.spanner.r2dbc.v2.SpannerClientLibraryResult;
import com.google.cloud.spanner.r2dbc.v2.SpannerClientLibraryRow;
import com.google.common.annotations.VisibleForTesting;
import com.google.spanner.v1.ExecuteSqlRequest;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Schedulers;

class DatabaseClientReactiveAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseClientReactiveAdapter.class);
    public static final Executor REACTOR_EXECUTOR = runnable -> Schedulers.parallel().schedule(runnable);
    private final SpannerConnectionConfiguration config;
    private final Spanner spannerClient;
    private final DatabaseClient dbClient;
    private final DatabaseAdminClient dbAdminClient;
    private DatabaseClientTransactionManager txnManager;
    private boolean autoCommit = true;
    private boolean active = true;
    private ExecuteSqlRequest.QueryOptions queryOptions;

    DatabaseClientReactiveAdapter(Spanner spannerClient, SpannerConnectionConfiguration config) {
        this.spannerClient = spannerClient;
        this.dbClient = spannerClient.getDatabaseClient(DatabaseId.of((String)config.getProjectId(), (String)config.getInstanceName(), (String)config.getDatabaseName()));
        this.dbAdminClient = spannerClient.getDatabaseAdminClient();
        this.config = config;
        this.txnManager = new DatabaseClientTransactionManager(this.dbClient);
        ExecuteSqlRequest.QueryOptions.Builder builder = ExecuteSqlRequest.QueryOptions.newBuilder();
        if (config.getOptimizerVersion() != null) {
            builder.setOptimizerVersion(config.getOptimizerVersion());
        }
        this.queryOptions = builder.build();
    }

    Mono<Void> beginTransaction() {
        return this.convertFutureToMono(() -> this.txnManager.beginTransaction()).then();
    }

    Mono<Void> beginReadonlyTransaction(TimestampBound timestampBound) {
        return Mono.defer(() -> {
            this.txnManager.beginReadonlyTransaction(timestampBound);
            return Mono.empty();
        });
    }

    Mono<Void> commitTransaction() {
        return this.convertFutureToMono(() -> this.txnManager.commitTransaction()).doOnTerminate(this.txnManager::clearTransactionManager).then();
    }

    Publisher<Void> rollback() {
        return this.convertFutureToMono(() -> this.txnManager.rollbackTransaction()).doOnTerminate(this.txnManager::clearTransactionManager);
    }

    Mono<Void> close() {
        return Mono.defer(() -> {
            if (!this.active) {
                return Mono.empty();
            }
            this.active = false;
            return this.convertFutureToMono(() -> this.txnManager.clearTransactionManager());
        });
    }

    Mono<Boolean> healthCheck() {
        return Mono.defer(() -> {
            if (!this.active || this.spannerClient.isClosed()) {
                return Mono.just((Object)false);
            }
            return Flux.create(sink -> {
                Statement statement = Statement.newBuilder((String)"SELECT 1").build();
                this.runSelectStatementAsFlux(this.dbClient.singleUse(), statement, (FluxSink<SpannerClientLibraryRow>)sink);
            }).then(Mono.just((Object)true)).onErrorResume(error -> {
                LOGGER.warn("Cloud Spanner healthcheck failed", error);
                return Mono.just((Object)false);
            });
        });
    }

    Mono<Boolean> localHealthcheck() {
        return Mono.fromSupplier(() -> this.active);
    }

    boolean isAutoCommit() {
        return this.autoCommit;
    }

    Publisher<Void> setAutoCommit(boolean autoCommit) {
        return Mono.defer(() -> {
            Mono<Void> result = Mono.empty();
            if (this.autoCommit != autoCommit && this.txnManager.isInTransaction()) {
                result = this.commitTransaction();
            }
            return result.doOnSuccess(empty -> {
                this.autoCommit = autoCommit;
            });
        });
    }

    Mono<SpannerClientLibraryResult> runDmlStatement(Statement statement) {
        return this.runBatchDmlInternal(ctx -> ctx.executeUpdateAsync(statement, new Options.UpdateOption[0])).map(numRowsUpdated -> new SpannerClientLibraryResult((Flux<SpannerClientLibraryRow>)Flux.empty(), this.longToInt((Long)numRowsUpdated)));
    }

    Flux<SpannerClientLibraryResult> runBatchDml(List<Statement> statements) {
        return this.runBatchDmlInternal(ctx -> ctx.batchUpdateAsync((Iterable)statements, new Options.UpdateOption[0])).flatMapIterable(numRowsArray -> LongStream.of(numRowsArray).boxed().collect(Collectors.toList())).map(numRows -> new SpannerClientLibraryResult((Flux<SpannerClientLibraryRow>)Flux.empty(), this.longToInt((Long)numRows)));
    }

    private int longToInt(Long numRows) {
        if (numRows > Integer.MAX_VALUE) {
            LOGGER.warn("Number of updated rows exceeds maximum integer value; actual rows updated = {}; returning max int value", (Object)numRows);
            return Integer.MAX_VALUE;
        }
        return numRows.intValue();
    }

    private <T> Mono<T> runBatchDmlInternal(Function<TransactionContext, ApiFuture<T>> asyncOperation) {
        return Mono.defer(() -> {
            if (this.txnManager.isInReadonlyTransaction()) {
                return Mono.error((Throwable)new IllegalAccessException("Cannot run DML statements in a readonly transaction."));
            }
            if (!this.autoCommit && !this.txnManager.isInReadWriteTransaction()) {
                return Mono.error((Throwable)new IllegalAccessException("Cannot run DML statements outside of a transaction when autocommit is set to false."));
            }
            return this.convertFutureToMono(() -> {
                if (this.txnManager.isInReadWriteTransaction()) {
                    return this.txnManager.runInTransaction(asyncOperation);
                }
                ApiFuture rowCountFuture = this.dbClient.runAsync(new Options.TransactionOption[0]).runAsync(asyncOperation::apply, REACTOR_EXECUTOR);
                return rowCountFuture;
            });
        });
    }

    Flux<SpannerClientLibraryRow> runSelectStatement(Statement statement) {
        return Flux.create(sink -> {
            if (this.txnManager.isInReadWriteTransaction()) {
                this.txnManager.runInTransaction(ctx -> this.runSelectStatementAsFlux((ReadContext)ctx, statement, (FluxSink<SpannerClientLibraryRow>)sink));
            } else {
                this.runSelectStatementAsFlux(this.txnManager.getReadContext(), statement, (FluxSink<SpannerClientLibraryRow>)sink);
            }
        });
    }

    Mono<Void> runDdlStatement(String query) {
        return this.convertFutureToMono(() -> this.dbAdminClient.updateDatabaseDdl(this.config.getInstanceName(), this.config.getDatabaseName(), Collections.singletonList(query), null));
    }

    private ApiFuture<Void> runSelectStatementAsFlux(ReadContext readContext, Statement statement, FluxSink<SpannerClientLibraryRow> sink) {
        AsyncResultSet ars = readContext.executeQueryAsync(statement, new Options.QueryOption[0]);
        return ars.setCallback(REACTOR_EXECUTOR, (AsyncResultSet.ReadyCallback)new ReactiveResultSetCallback(sink, ars));
    }

    private <T> Mono<T> convertFutureToMono(Supplier<ApiFuture<T>> futureSupplier) {
        return Mono.create(sink -> {
            ApiFuture future = (ApiFuture)futureSupplier.get();
            sink.onCancel(() -> future.cancel(true));
            ApiFutures.addCallback((ApiFuture)future, (ApiFutureCallback)new ApiFutureCallback<T>((MonoSink)sink){
                final /* synthetic */ MonoSink val$sink;
                {
                    this.val$sink = monoSink;
                }

                public void onFailure(Throwable t) {
                    this.val$sink.error(t);
                }

                public void onSuccess(T result) {
                    this.val$sink.success(result);
                }
            }, (Executor)REACTOR_EXECUTOR);
        });
    }

    ExecuteSqlRequest.QueryOptions getQueryOptions() {
        return this.queryOptions;
    }

    boolean isInReadonlyTransaction() {
        return this.txnManager.isInReadonlyTransaction();
    }

    @VisibleForTesting
    void setTxnManager(DatabaseClientTransactionManager txnManager) {
        this.txnManager = txnManager;
    }
}

