/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.proxy.backend.communication.vertx;

import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.sqlclient.SqlClient;
import io.vertx.sqlclient.SqlConnection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxConnectionManager;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import org.apache.shardingsphere.proxy.backend.communication.BackendConnection;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.ConnectionPostProcessor;
import org.apache.shardingsphere.proxy.backend.communication.vertx.transaction.VertxLocalTransactionManager;
import org.apache.shardingsphere.proxy.backend.reactive.context.ReactiveProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.transaction.core.TransactionType;

public final class VertxBackendConnection
implements BackendConnection<Future<Void>>,
ExecutorVertxConnectionManager {
    private final ConnectionSession connectionSession;
    private final List<ConnectionPostProcessor<Future<SqlConnection>>> connectionPostProcessors = new LinkedList<ConnectionPostProcessor<Future<SqlConnection>>>();
    private final Multimap<String, Future<SqlConnection>> cachedConnections = LinkedHashMultimap.create();
    private final AtomicBoolean closed;

    public VertxBackendConnection(ConnectionSession connectionSession) {
        ShardingSpherePreconditions.checkState((TransactionType.LOCAL == connectionSession.getTransactionStatus().getTransactionType() ? 1 : 0) != 0, () -> new UnsupportedSQLOperationException("Vert.x backend supports LOCAL transaction only for now"));
        this.closed = new AtomicBoolean(false);
        this.connectionSession = connectionSession;
    }

    public List<Future<? extends SqlClient>> getConnections(String dataSourceName, int connectionSize, ConnectionMode connectionMode) {
        return this.connectionSession.getTransactionStatus().isInTransaction() ? this.getConnectionsWithTransaction(dataSourceName, connectionSize) : this.getConnectionsWithoutTransaction(dataSourceName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<Future<? extends SqlClient>> getConnectionsWithTransaction(String dataSourceName, int connectionSize) {
        List<Object> result;
        Collection connections;
        Multimap<String, Future<SqlConnection>> multimap = this.cachedConnections;
        synchronized (multimap) {
            connections = this.cachedConnections.get((Object)(this.connectionSession.getDatabaseName() + "." + dataSourceName));
        }
        if (connections.size() >= connectionSize) {
            result = new ArrayList(connections).subList(0, connectionSize);
        } else {
            if (!connections.isEmpty()) {
                result = new ArrayList(connectionSize);
                result.addAll(connections);
                List<Future<SqlConnection>> newConnections = this.createNewConnections(dataSourceName, connectionSize - connections.size());
                result.addAll(newConnections);
                Multimap<String, Future<SqlConnection>> multimap2 = this.cachedConnections;
                synchronized (multimap2) {
                    this.cachedConnections.putAll((Object)(this.connectionSession.getDatabaseName() + "." + dataSourceName), newConnections);
                }
            }
            result = this.createNewConnections(dataSourceName, connectionSize);
            Multimap<String, Future<SqlConnection>> multimap3 = this.cachedConnections;
            synchronized (multimap3) {
                this.cachedConnections.putAll((Object)(this.connectionSession.getDatabaseName() + "." + dataSourceName), result);
            }
        }
        return new ArrayList<Object>(result);
    }

    private List<Future<SqlConnection>> createNewConnections(String dataSourceName, int connectionSize) {
        Preconditions.checkNotNull((Object)this.connectionSession.getDatabaseName(), (Object)"Current database is null.");
        List<Future<SqlConnection>> result = ReactiveProxyContext.getInstance().getVertxBackendDataSource().getConnections(this.connectionSession.getDatabaseName(), dataSourceName, connectionSize);
        for (Future<SqlConnection> each : result) {
            this.replayMethodsInvocation(each);
        }
        return result;
    }

    private void replayMethodsInvocation(Future<SqlConnection> target) {
        for (ConnectionPostProcessor<Future<SqlConnection>> each : this.connectionPostProcessors) {
            each.process(target);
        }
    }

    private List<Future<? extends SqlClient>> getConnectionsWithoutTransaction(String dataSourceName) {
        Preconditions.checkNotNull((Object)this.connectionSession.getDatabaseName(), (Object)"Current database is null.");
        Future poolFuture = Future.succeededFuture((Object)ReactiveProxyContext.getInstance().getVertxBackendDataSource().getPool(this.connectionSession.getDatabaseName(), dataSourceName));
        return Collections.singletonList(poolFuture);
    }

    @Override
    public Future<Void> prepareForTaskExecution() {
        return Future.succeededFuture();
    }

    @Override
    public Future<Void> handleAutoCommit() {
        if (!this.connectionSession.isAutoCommit() && !this.connectionSession.getTransactionStatus().isInTransaction()) {
            VertxLocalTransactionManager transactionManager = new VertxLocalTransactionManager(this);
            return transactionManager.begin();
        }
        return Future.succeededFuture();
    }

    @Override
    public Future<Void> closeExecutionResources() {
        if (!this.connectionSession.getTransactionStatus().isInTransaction()) {
            return this.closeAllConnections(false);
        }
        if (this.closed.get()) {
            return this.closeAllConnections(true);
        }
        return Future.succeededFuture();
    }

    @Override
    public Future<Void> closeAllResources() {
        this.closed.set(true);
        return this.closeAllConnections(true);
    }

    private Future<Void> closeAllConnections(boolean rollbackBeforeClosing) {
        Collection connections = this.cachedConnections.values();
        if (connections.isEmpty()) {
            return Future.succeededFuture();
        }
        ArrayList<Future> closeFutures = new ArrayList<Future>(connections.size());
        for (Future each : connections) {
            closeFutures.add(rollbackBeforeClosing ? each.compose(connection -> connection.query("rollback").execute().compose(unused -> connection.close())) : each.compose(SqlClient::close));
        }
        return CompositeFuture.join(closeFutures).onComplete(unused -> this.cachedConnections.clear()).compose(unused -> Future.succeededFuture());
    }

    public Future<Void> executeInAllCachedConnections(String sql) {
        ArrayList<Future> futures = new ArrayList<Future>(this.cachedConnections.size());
        for (Future each : this.cachedConnections.values()) {
            futures.add(each.compose(connection -> connection.query(sql).execute()));
        }
        return CompositeFuture.join(futures).compose(result -> Future.succeededFuture());
    }

    @Override
    @Generated
    public ConnectionSession getConnectionSession() {
        return this.connectionSession;
    }

    @Generated
    public List<ConnectionPostProcessor<Future<SqlConnection>>> getConnectionPostProcessors() {
        return this.connectionPostProcessors;
    }

    @Generated
    public Multimap<String, Future<SqlConnection>> getCachedConnections() {
        return this.cachedConnections;
    }

    @Generated
    public AtomicBoolean getClosed() {
        return this.closed;
    }
}

