package io.asyncer.r2dbc.mysql;

import io.asyncer.r2dbc.mysql.client.Client;
import io.asyncer.r2dbc.mysql.client.FluxExchangeable;
import io.asyncer.r2dbc.mysql.message.client.ClientMessage;
import io.asyncer.r2dbc.mysql.message.client.PrepareQueryMessage;
import io.asyncer.r2dbc.mysql.message.client.PreparedCloseMessage;
import io.asyncer.r2dbc.mysql.message.client.PreparedExecuteMessage;
import io.asyncer.r2dbc.mysql.message.client.PreparedFetchMessage;
import io.asyncer.r2dbc.mysql.message.client.PreparedResetMessage;
import io.asyncer.r2dbc.mysql.message.server.CompleteMessage;
import io.asyncer.r2dbc.mysql.message.server.EofMessage;
import io.asyncer.r2dbc.mysql.message.server.ErrorMessage;
import io.asyncer.r2dbc.mysql.message.server.OkMessage;
import io.asyncer.r2dbc.mysql.message.server.PreparedOkMessage;
import io.asyncer.r2dbc.mysql.message.server.ServerMessage;
import io.asyncer.r2dbc.mysql.message.server.ServerStatusMessage;
import io.asyncer.r2dbc.mysql.message.server.SyntheticMetadataMessage;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jetbrains.annotations.Nullable;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.SynchronousSink;
import reactor.util.concurrent.Queues;

/* JADX INFO: Access modifiers changed from: package-private */
/* compiled from: QueryFlow.java */
/* loaded from: input_file:io/asyncer/r2dbc/mysql/PrepareExchangeable.class */
public final class PrepareExchangeable extends FluxExchangeable<ServerMessage> {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(PrepareExchangeable.class);
    private static final int PREPARE_OR_RESET = 1;
    private static final int EXECUTE = 2;
    private static final int FETCH = 3;
    private final Client client;
    private final String sql;
    private final Iterator<Binding> bindings;
    private final int fetchSize;

    @Nullable
    private Integer statementId;
    private boolean shouldClose;
    private final AtomicBoolean disposed = new AtomicBoolean();
    private final Sinks.Many<ClientMessage> requests = Sinks.many().unicast().onBackpressureBuffer((Queue) Queues.one().get());
    private int mode = 1;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PrepareExchangeable(Client client, String str, Iterator<Binding> it, int i) {
        this.client = client;
        this.sql = str;
        this.bindings = it;
        this.fetchSize = i;
    }

    public void subscribe(CoreSubscriber<? super ClientMessage> coreSubscriber) {
        this.requests.asFlux().subscribe(coreSubscriber);
        Integer ifPresent = this.client.getContext().getPrepareCache().getIfPresent(this.sql);
        if (ifPresent == null) {
            logger.debug("Prepare cache mismatch, try to preparing");
            this.shouldClose = true;
            QueryLogger.log(this.sql);
            Sinks.EmitResult tryEmitNext = this.requests.tryEmitNext(new PrepareQueryMessage(this.sql));
            if (tryEmitNext != Sinks.EmitResult.OK) {
                logger.error("Fail to emit prepare query message due to {}", tryEmitNext);
                return;
            }
            return;
        }
        logger.debug("Prepare cache matched statement {} when getting", ifPresent);
        this.shouldClose = false;
        this.statementId = ifPresent;
        QueryLogger.log(ifPresent.intValue(), this.sql);
        Sinks.EmitResult tryEmitNext2 = this.requests.tryEmitNext(new PreparedResetMessage(ifPresent.intValue()));
        if (tryEmitNext2 != Sinks.EmitResult.OK) {
            logger.error("Fail to emit reset statement message due to {}", tryEmitNext2);
        }
    }

    @Override // java.util.function.BiConsumer
    public void accept(ServerMessage serverMessage, SynchronousSink<ServerMessage> synchronousSink) {
        if (serverMessage instanceof ErrorMessage) {
            synchronousSink.next(((ErrorMessage) serverMessage).offendedBy(this.sql));
            synchronousSink.complete();
            return;
        }
        switch (this.mode) {
            case 1:
                if (serverMessage instanceof OkMessage) {
                    Integer num = this.statementId;
                    if (num == null) {
                        logger.error("Reset succeed but statement ID was null");
                        return;
                    } else {
                        doNextExecute(num.intValue(), synchronousSink);
                        return;
                    }
                }
                if (serverMessage instanceof PreparedOkMessage) {
                    PreparedOkMessage preparedOkMessage = (PreparedOkMessage) serverMessage;
                    int statementId = preparedOkMessage.getStatementId();
                    int totalColumns = preparedOkMessage.getTotalColumns();
                    int totalParameters = preparedOkMessage.getTotalParameters();
                    this.statementId = Integer.valueOf(statementId);
                    QueryLogger.log(statementId, this.sql);
                    if (totalColumns <= (-totalParameters)) {
                        putToCache(Integer.valueOf(statementId));
                        doNextExecute(statementId, synchronousSink);
                        return;
                    }
                    return;
                }
                if (!(serverMessage instanceof SyntheticMetadataMessage) || !((SyntheticMetadataMessage) serverMessage).isCompleted()) {
                    ReferenceCountUtil.safeRelease(serverMessage);
                    return;
                }
                Integer num2 = this.statementId;
                if (num2 == null) {
                    logger.error("Prepared OK message not found");
                    return;
                } else {
                    putToCache(num2);
                    doNextExecute(num2.intValue(), synchronousSink);
                    return;
                }
            case 2:
                if ((serverMessage instanceof CompleteMessage) && ((CompleteMessage) serverMessage).isDone()) {
                    onCompleteMessage((CompleteMessage) serverMessage, synchronousSink);
                    return;
                }
                if (!(serverMessage instanceof SyntheticMetadataMessage)) {
                    synchronousSink.next(serverMessage);
                    return;
                }
                EofMessage eof = ((SyntheticMetadataMessage) serverMessage).getEof();
                if (!(eof instanceof ServerStatusMessage) || (((ServerStatusMessage) eof).getServerStatuses() & 64) == 0) {
                    setMode(3);
                    synchronousSink.next(serverMessage);
                    return;
                } else {
                    if (doNextFetch(synchronousSink)) {
                        synchronousSink.next(serverMessage);
                        return;
                    }
                    return;
                }
            default:
                if ((serverMessage instanceof CompleteMessage) && ((CompleteMessage) serverMessage).isDone()) {
                    onCompleteMessage((CompleteMessage) serverMessage, synchronousSink);
                    return;
                } else {
                    synchronousSink.next(serverMessage);
                    return;
                }
        }
    }

    public void dispose() {
        if (this.disposed.compareAndSet(false, true)) {
            Integer num = this.statementId;
            if (this.shouldClose && num != null) {
                logger.debug("Closing statement {} after used", num);
                Sinks.EmitResult tryEmitNext = this.requests.tryEmitNext(new PreparedCloseMessage(num.intValue()));
                if (tryEmitNext != Sinks.EmitResult.OK) {
                    logger.error("Fail to close statement {} due to {}", num, tryEmitNext);
                }
            }
            this.requests.tryEmitComplete();
            while (this.bindings.hasNext()) {
                this.bindings.next().clear();
            }
        }
    }

    public boolean isDisposed() {
        return this.disposed.get();
    }

    private void putToCache(Integer num) {
        boolean z;
        try {
            z = this.client.getContext().getPrepareCache().putIfAbsent(this.sql, num.intValue(), i -> {
                logger.debug("Prepare cache evicts statement {} when putting", Integer.valueOf(i));
                Sinks.EmitResult tryEmitNext = this.requests.tryEmitNext(new PreparedCloseMessage(i));
                if (tryEmitNext != Sinks.EmitResult.OK) {
                    logger.error("Fail to close evicted statement {} due to {}", num, tryEmitNext);
                }
            });
        } catch (Throwable th) {
            logger.error("Put statement {} to cache failed", num, th);
            z = false;
        }
        this.shouldClose = !z;
        logger.debug("Prepare cache put statement {} is {}", num, z ? "succeed" : "fails");
    }

    private void doNextExecute(int i, SynchronousSink<ServerMessage> synchronousSink) {
        setMode(2);
        PreparedExecuteMessage executeMessage = this.bindings.next().toExecuteMessage(i, this.fetchSize <= 0);
        Sinks.EmitResult tryEmitNext = this.requests.tryEmitNext(executeMessage);
        if (tryEmitNext != Sinks.EmitResult.OK) {
            logger.error("Fail to execute {} due to {}", Integer.valueOf(i), tryEmitNext);
            executeMessage.dispose();
            synchronousSink.complete();
        }
    }

    private boolean doNextFetch(SynchronousSink<ServerMessage> synchronousSink) {
        Integer num = this.statementId;
        if (num == null) {
            synchronousSink.error(new IllegalStateException("Statement ID must not be null when fetching"));
            return false;
        }
        setMode(3);
        Sinks.EmitResult tryEmitNext = this.requests.tryEmitNext(new PreparedFetchMessage(num.intValue(), this.fetchSize));
        if (tryEmitNext == Sinks.EmitResult.OK) {
            return true;
        }
        logger.error("Fail to fetch {} due to {}", num, tryEmitNext);
        synchronousSink.complete();
        return false;
    }

    private void setMode(int i) {
        logger.debug("Mode is changed to {}", i == 2 ? "EXECUTE" : "FETCH");
        this.mode = i;
    }

    private void onCompleteMessage(CompleteMessage completeMessage, SynchronousSink<ServerMessage> synchronousSink) {
        if (((Boolean) this.requests.scanOrDefault(Scannable.Attr.TERMINATED, Boolean.FALSE)).booleanValue()) {
            logger.error("Unexpected terminated on requests");
            synchronousSink.next(completeMessage);
            synchronousSink.complete();
            return;
        }
        if (completeMessage instanceof ServerStatusMessage) {
            short serverStatuses = ((ServerStatusMessage) completeMessage).getServerStatuses();
            if ((serverStatuses & 64) != 0 && (serverStatuses & 128) == 0) {
                doNextFetch(synchronousSink);
                return;
            }
        }
        synchronousSink.next(completeMessage);
        if (!this.bindings.hasNext()) {
            synchronousSink.complete();
            return;
        }
        Integer num = this.statementId;
        if (num == null) {
            synchronousSink.error(new IllegalStateException("Statement ID must not be null when executing"));
        } else {
            doNextExecute(num.intValue(), synchronousSink);
        }
    }
}
