package com.couchbase.client.core.transaction;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.api.query.CoreQueryContext;
import com.couchbase.client.core.api.query.CoreQueryMetaData;
import com.couchbase.client.core.api.query.CoreQueryOptions;
import com.couchbase.client.core.api.query.CoreQueryOptionsTransactions;
import com.couchbase.client.core.api.query.CoreQueryResult;
import com.couchbase.client.core.api.query.CoreReactiveQueryResult;
import com.couchbase.client.core.classic.query.ClassicCoreReactiveQueryResult;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.cnc.TracingIdentifiers;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.TextNode;
import com.couchbase.client.core.error.transaction.RetryTransactionException;
import com.couchbase.client.core.error.transaction.TransactionOperationFailedException;
import com.couchbase.client.core.error.transaction.internal.CoreTransactionCommitAmbiguousException;
import com.couchbase.client.core.error.transaction.internal.CoreTransactionExpiredException;
import com.couchbase.client.core.error.transaction.internal.CoreTransactionFailedException;
import com.couchbase.client.core.msg.query.QueryChunkRow;
import com.couchbase.client.core.retry.RetryReason;
import com.couchbase.client.core.retry.reactor.DefaultRetry;
import com.couchbase.client.core.retry.reactor.Jitter;
import com.couchbase.client.core.topology.NodeIdentifier;
import com.couchbase.client.core.transaction.config.CoreMergedTransactionConfig;
import com.couchbase.client.core.transaction.config.CoreTransactionOptions;
import com.couchbase.client.core.transaction.config.CoreTransactionsConfig;
import com.couchbase.client.core.transaction.support.SpanWrapper;
import com.couchbase.client.core.transaction.threadlocal.TransactionMarker;
import com.couchbase.client.core.transaction.util.DebugUtil;
import com.couchbase.client.core.transaction.util.QueryUtil;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.retry.Retry;

@Stability.Internal
/* loaded from: input_file:com/couchbase/client/core/transaction/CoreTransactionsReactive.class */
public class CoreTransactionsReactive {
    static final int MAX_ATTEMPTS = 100;
    private final Core core;
    private final CoreTransactionsConfig config;

    public CoreTransactionsReactive(Core core, CoreTransactionsConfig coreTransactionsConfig) {
        this.core = (Core) Objects.requireNonNull(core);
        this.config = (CoreTransactionsConfig) Objects.requireNonNull(coreTransactionsConfig);
    }

    public Mono<CoreTransactionResult> executeTransaction(Mono<CoreTransactionAttemptContext> mono, CoreMergedTransactionConfig coreMergedTransactionConfig, CoreTransactionContext coreTransactionContext, Function<CoreTransactionAttemptContext, Mono<Void>> function, boolean z) {
        AtomicReference<Long> atomicReference = new AtomicReference<>();
        return mono.publishOn(this.core.context().environment().transactionsSchedulers().schedulerBlocking()).doOnSubscribe(subscription -> {
            if (atomicReference.get() == null) {
                atomicReference.set(Long.valueOf(System.nanoTime()));
            }
        }).doOnNext(coreTransactionAttemptContext -> {
            coreTransactionContext.incAttempts();
            coreTransactionAttemptContext.LOGGER.info(coreTransactionAttemptContext.attemptId(), "starting attempt {}/{}/{}", Integer.valueOf(coreTransactionContext.numAttempts()), coreTransactionAttemptContext.transactionId(), coreTransactionAttemptContext.attemptId());
        }).flatMap(coreTransactionAttemptContext2 -> {
            return ((Mono) function.apply(coreTransactionAttemptContext2)).contextWrite(context -> {
                return context.put(TransactionMarker.class, new TransactionMarker(coreTransactionAttemptContext2));
            }).onErrorResume(th -> {
                return Mono.error(coreTransactionAttemptContext2.convertToOperationFailedIfNeeded(th, z));
            }).then(coreTransactionAttemptContext2.implicitCommit(z)).onErrorResume(th2 -> {
                return coreTransactionAttemptContext2.lambdaEnd(core().transactionsCleanup(), th2, z);
            }).then(coreTransactionAttemptContext2.lambdaEnd(core().transactionsCleanup(), null, z)).then(coreTransactionAttemptContext2.transactionEnd(null, z)).onErrorResume(th3 -> {
                if (!(th3 instanceof RetryTransactionException) && !(th3 instanceof CoreTransactionFailedException)) {
                    return coreTransactionAttemptContext2.transactionEnd(th3, z);
                }
                return Mono.error(th3);
            });
        }).retryWhen(executeCreateRetryWhen(coreTransactionContext, atomicReference)).doOnNext(coreTransactionResult -> {
            coreTransactionContext.finish(null);
        }).doOnError(th -> {
            coreTransactionContext.finish(th);
        }).doOnTerminate(() -> {
            coreTransactionContext.LOGGER.info("finished txn in {}us", Long.valueOf(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - ((Long) atomicReference.get()).longValue())));
        });
    }

    private Retry executeCreateRetryWhen(CoreTransactionContext coreTransactionContext, AtomicReference<Long> atomicReference) {
        return DefaultRetry.create(retryContext -> {
            return retryContext.exception() instanceof RetryTransactionException;
        }).exponentialBackoff(Duration.ofMillis(1L), Duration.ofMillis(100L)).doOnRetry(retryContext2 -> {
            Duration ofNanos = Duration.ofNanos(System.nanoTime() - ((Long) atomicReference.get()).longValue());
            coreTransactionContext.LOGGER.info("<>", "retrying transaction after backoff {}millis", Long.valueOf(retryContext2.backoff().toMillis()));
            coreTransactionContext.incrementRetryAttempts(ofNanos, RetryReason.UNKNOWN);
        }).jitter(Jitter.random()).retryMax(100L).toReactorRetry();
    }

    public CoreTransactionAttemptContext createAttemptContext(CoreTransactionContext coreTransactionContext, CoreMergedTransactionConfig coreMergedTransactionConfig, String str) {
        return coreMergedTransactionConfig.attemptContextFactory().create(this.core, coreTransactionContext, coreMergedTransactionConfig, str, this, Optional.of(coreTransactionContext.span()));
    }

    public Mono<CoreTransactionResult> run(Function<CoreTransactionAttemptContext, Mono<?>> function, @Nullable CoreTransactionOptions coreTransactionOptions) {
        return Mono.defer(() -> {
            CoreMergedTransactionConfig coreMergedTransactionConfig = new CoreMergedTransactionConfig(this.config, Optional.ofNullable(coreTransactionOptions));
            CoreTransactionContext coreTransactionContext = new CoreTransactionContext(this.core.context(), UUID.randomUUID().toString(), coreMergedTransactionConfig, this.core.transactionsCleanup());
            coreTransactionContext.LOGGER.info(configDebug(this.config, coreTransactionOptions, this.core));
            return executeTransaction(Mono.fromCallable(() -> {
                return createAttemptContext(coreTransactionContext, coreMergedTransactionConfig, UUID.randomUUID().toString());
            }), coreMergedTransactionConfig, coreTransactionContext, coreTransactionAttemptContext -> {
                return Mono.defer(() -> {
                    return (Mono) function.apply(coreTransactionAttemptContext);
                }).then();
            }, false);
        });
    }

    private void logElidedStacktrace(CoreTransactionAttemptContext coreTransactionAttemptContext, Throwable th) {
        coreTransactionAttemptContext.LOGGER.info(coreTransactionAttemptContext.attemptId(), DebugUtil.createElidedStacktrace(th));
    }

    private static String configDebug(CoreTransactionsConfig coreTransactionsConfig, @Nullable CoreTransactionOptions coreTransactionOptions, Core core) {
        StringBuilder sb = new StringBuilder();
        sb.append("SDK version: ");
        sb.append(core.context().environment().clientVersion().orElse("-"));
        sb.append(" config: ");
        sb.append("atrs=");
        sb.append(coreTransactionsConfig.numAtrs());
        sb.append(", metadataCollection=");
        sb.append(coreTransactionsConfig.metadataCollection());
        sb.append(", expiry=");
        if (coreTransactionOptions != null) {
            sb.append(coreTransactionOptions.timeout().orElse(coreTransactionsConfig.transactionExpirationTime()).toMillis());
        } else {
            sb.append(coreTransactionsConfig.transactionExpirationTime().toMillis());
        }
        sb.append("ms durability=");
        sb.append(coreTransactionsConfig.durabilityLevel());
        if (coreTransactionOptions != null) {
            sb.append(" per-txn config=");
            sb.append(" durability=");
            sb.append(coreTransactionOptions.durabilityLevel());
        }
        sb.append(", supported=");
        sb.append(coreTransactionsConfig.supported());
        return sb.toString();
    }

    public CoreTransactionsConfig config() {
        return this.config;
    }

    public Core core() {
        return this.core;
    }

    public Mono<CoreReactiveQueryResult> query(String str, @Nullable CoreQueryContext coreQueryContext, CoreQueryOptions coreQueryOptions, Optional<RequestSpan> optional, Function<Throwable, RuntimeException> function) {
        return Mono.defer(() -> {
            CoreMergedTransactionConfig coreMergedTransactionConfig = new CoreMergedTransactionConfig(this.config, Optional.empty());
            CoreTransactionContext coreTransactionContext = new CoreTransactionContext(this.core.context(), UUID.randomUUID().toString(), coreMergedTransactionConfig, this.core.transactionsCleanup());
            coreTransactionContext.LOGGER.info(configDebug(this.config, null, this.core));
            Mono<CoreTransactionAttemptContext> fromCallable = Mono.fromCallable(() -> {
                return createAttemptContext(coreTransactionContext, coreMergedTransactionConfig, UUID.randomUUID().toString());
            });
            AtomicReference atomicReference = new AtomicReference();
            Function<CoreTransactionAttemptContext, Mono<Void>> function2 = coreTransactionAttemptContext -> {
                return Mono.defer(() -> {
                    return coreTransactionAttemptContext.doQueryOperation("single query streaming", str, (SpanWrapper) optional.map(SpanWrapper::new).orElse(null), (num, atomicReference2, spanWrapper) -> {
                        spanWrapper.attribute(TracingIdentifiers.ATTR_TRANSACTION_SINGLE_QUERY, true);
                        return coreTransactionAttemptContext.queryWrapperLocked(0, coreQueryContext, str, coreQueryOptions, "query", false, true, null, null, spanWrapper, true, null, true).doOnNext(classicCoreReactiveQueryResult -> {
                            atomicReference.set(classicCoreReactiveQueryResult);
                        });
                    }).then();
                });
            };
            Function<Throwable, RuntimeException> singleQueryHandleErrorDuringRowStreaming = singleQueryHandleErrorDuringRowStreaming(coreTransactionContext, function);
            return executeTransaction(fromCallable, coreMergedTransactionConfig, coreTransactionContext, function2, true).then(Mono.defer(() -> {
                final ClassicCoreReactiveQueryResult classicCoreReactiveQueryResult = (ClassicCoreReactiveQueryResult) atomicReference.get();
                return classicCoreReactiveQueryResult == null ? Mono.error(new CoreTransactionFailedException(new IllegalStateException("No query has been run"), coreTransactionContext.LOGGER, coreTransactionContext.transactionId())) : Mono.just(new CoreReactiveQueryResult() { // from class: com.couchbase.client.core.transaction.CoreTransactionsReactive.1
                    @Override // com.couchbase.client.core.api.query.CoreReactiveQueryResult
                    public Flux<QueryChunkRow> rows() {
                        Flux<QueryChunkRow> rows = classicCoreReactiveQueryResult.rows();
                        Function function3 = singleQueryHandleErrorDuringRowStreaming;
                        return rows.onErrorResume(th -> {
                            return Mono.error((Throwable) function3.apply(th));
                        });
                    }

                    @Override // com.couchbase.client.core.api.query.CoreReactiveQueryResult
                    public Mono<CoreQueryMetaData> metaData() {
                        return classicCoreReactiveQueryResult.metaData();
                    }

                    @Override // com.couchbase.client.core.api.query.CoreReactiveQueryResult
                    public NodeIdentifier lastDispatchedTo() {
                        return classicCoreReactiveQueryResult.lastDispatchedTo();
                    }
                });
            }));
        });
    }

    private static Function<Throwable, RuntimeException> singleQueryHandleErrorDuringRowStreaming(CoreTransactionContext coreTransactionContext, Function<Throwable, RuntimeException> function) {
        return th -> {
            RuntimeException convertQueryError = QueryUtil.convertQueryError(th);
            coreTransactionContext.LOGGER.warn("", "got error on rows stream {}, converted from {}", DebugUtil.dbg(convertQueryError), DebugUtil.dbg(th));
            RuntimeException runtimeException = convertQueryError;
            if (convertQueryError instanceof TransactionOperationFailedException) {
                TransactionOperationFailedException transactionOperationFailedException = (TransactionOperationFailedException) convertQueryError;
                switch (transactionOperationFailedException.toRaise()) {
                    case TRANSACTION_FAILED_POST_COMMIT:
                        runtimeException = new CoreTransactionFailedException(transactionOperationFailedException, coreTransactionContext.LOGGER, coreTransactionContext.transactionId());
                        break;
                    case TRANSACTION_EXPIRED:
                        runtimeException = new CoreTransactionExpiredException(th, coreTransactionContext.LOGGER, coreTransactionContext.transactionId(), "Transaction has expired configured timeout of " + coreTransactionContext.expirationTime().toMillis() + "ms.  The transaction is not committed.");
                        break;
                    case TRANSACTION_COMMIT_AMBIGUOUS:
                        runtimeException = new CoreTransactionCommitAmbiguousException(th, coreTransactionContext.LOGGER, coreTransactionContext.transactionId(), "It is ambiguous whether the transaction committed");
                        break;
                    default:
                        runtimeException = new CoreTransactionFailedException(th, coreTransactionContext.LOGGER, coreTransactionContext.transactionId());
                        break;
                }
            }
            return (RuntimeException) function.apply(runtimeException);
        };
    }

    public Mono<CoreQueryResult> queryBlocking(String str, @Nullable CoreQueryContext coreQueryContext, CoreQueryOptions coreQueryOptions, Optional<RequestSpan> optional) {
        return Mono.defer(() -> {
            CoreQueryOptionsTransactions coreQueryOptionsTransactions = new CoreQueryOptionsTransactions(coreQueryOptions);
            coreQueryOptionsTransactions.put("tximplicit", TextNode.valueOf("true"));
            CoreMergedTransactionConfig coreMergedTransactionConfig = new CoreMergedTransactionConfig(this.config, optional.map(CoreTransactionOptions::create));
            CoreTransactionContext coreTransactionContext = new CoreTransactionContext(this.core.context(), UUID.randomUUID().toString(), coreMergedTransactionConfig, this.core.transactionsCleanup());
            coreTransactionContext.LOGGER.info(configDebug(this.config, null, this.core));
            Mono<CoreTransactionAttemptContext> fromCallable = Mono.fromCallable(() -> {
                return createAttemptContext(coreTransactionContext, coreMergedTransactionConfig, UUID.randomUUID().toString());
            });
            AtomicReference atomicReference = new AtomicReference();
            return executeTransaction(fromCallable, coreMergedTransactionConfig, coreTransactionContext, coreTransactionAttemptContext -> {
                return Mono.defer(() -> {
                    return coreTransactionAttemptContext.queryBlocking(str, coreQueryContext, coreQueryOptionsTransactions, true).doOnNext(coreQueryResult -> {
                        atomicReference.set(coreQueryResult);
                    }).then();
                });
            }, true).then(Mono.defer(() -> {
                return atomicReference.get() != null ? Mono.just((CoreQueryResult) atomicReference.get()) : Mono.error(new CoreTransactionFailedException(new IllegalStateException("No query has been run"), coreTransactionContext.LOGGER, coreTransactionContext.transactionId()));
            }));
        });
    }
}
