package dev.miku.r2dbc.mysql;

import dev.miku.r2dbc.mysql.client.Client;
import dev.miku.r2dbc.mysql.message.client.PrepareQueryMessage;
import dev.miku.r2dbc.mysql.message.client.PreparedCloseMessage;
import dev.miku.r2dbc.mysql.message.client.PreparedFetchMessage;
import dev.miku.r2dbc.mysql.message.client.SimpleQueryMessage;
import dev.miku.r2dbc.mysql.message.server.CompleteMessage;
import dev.miku.r2dbc.mysql.message.server.ErrorMessage;
import dev.miku.r2dbc.mysql.message.server.PreparedOkMessage;
import dev.miku.r2dbc.mysql.message.server.ServerMessage;
import dev.miku.r2dbc.mysql.message.server.ServerStatusMessage;
import dev.miku.r2dbc.mysql.message.server.SyntheticMetadataMessage;
import dev.miku.r2dbc.mysql.util.ConnectionContext;
import dev.miku.r2dbc.mysql.util.InternalArrays;
import dev.miku.r2dbc.mysql.util.OperatorUtils;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:dev/miku/r2dbc/mysql/QueryFlow.class */
public final class QueryFlow {
    private static final Predicate<ServerMessage> RESULT_DONE = serverMessage -> {
        return serverMessage instanceof CompleteMessage;
    };
    private static final Predicate<ServerMessage> PREPARE_DONE = serverMessage -> {
        return (serverMessage instanceof ErrorMessage) || ((serverMessage instanceof SyntheticMetadataMessage) && ((SyntheticMetadataMessage) serverMessage).isCompleted());
    };
    private static final Predicate<ServerMessage> METADATA_DONE = serverMessage -> {
        return (serverMessage instanceof ErrorMessage) || (serverMessage instanceof SyntheticMetadataMessage) || ((serverMessage instanceof CompleteMessage) && ((CompleteMessage) serverMessage).isDone());
    };
    private static final Predicate<ServerMessage> FETCH_DONE = serverMessage -> {
        return (serverMessage instanceof ErrorMessage) || ((serverMessage instanceof CompleteMessage) && ((CompleteMessage) serverMessage).isDone());
    };
    private static final Consumer<ReferenceCounted> RELEASE = (v0) -> {
        v0.release();
    };
    private static final Consumer<Object> SAFE_RELEASE = ReferenceCountUtil::safeRelease;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/miku/r2dbc/mysql/QueryFlow$Handler.class */
    public static final class Handler implements BiConsumer<ServerMessage, SynchronousSink<ServerMessage>> {
        private final String sql;

        private Handler(String str) {
            this.sql = str;
        }

        @Override // java.util.function.BiConsumer
        public void accept(ServerMessage serverMessage, SynchronousSink<ServerMessage> synchronousSink) {
            if (serverMessage instanceof ErrorMessage) {
                synchronousSink.error(ExceptionFactory.createException((ErrorMessage) serverMessage, this.sql));
            } else {
                synchronousSink.next(serverMessage);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/miku/r2dbc/mysql/QueryFlow$TakeOne.class */
    public static final class TakeOne implements BiConsumer<ServerMessage, SynchronousSink<ServerMessage>> {
        private final String sql;
        private boolean next;

        private TakeOne(String str) {
            this.sql = str;
        }

        @Override // java.util.function.BiConsumer
        public void accept(ServerMessage serverMessage, SynchronousSink<ServerMessage> synchronousSink) {
            if (this.next) {
                return;
            }
            this.next = true;
            if (serverMessage instanceof ErrorMessage) {
                synchronousSink.error(ExceptionFactory.createException((ErrorMessage) serverMessage, this.sql));
            } else {
                synchronousSink.next(serverMessage);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Mono<PreparedIdentifier> prepare(Client client, String str) {
        return OperatorUtils.discardOnCancel(client.exchange(new PrepareQueryMessage(str), PREPARE_DONE)).doOnDiscard(PreparedOkMessage.class, preparedOkMessage -> {
            close(client, preparedOkMessage.getStatementId()).subscribe();
        }).handle((serverMessage, synchronousSink) -> {
            if (serverMessage instanceof ErrorMessage) {
                synchronousSink.error(ExceptionFactory.createException((ErrorMessage) serverMessage, str));
                return;
            }
            if (serverMessage instanceof SyntheticMetadataMessage) {
                if (((SyntheticMetadataMessage) serverMessage).isCompleted()) {
                    synchronousSink.complete();
                }
            } else if (serverMessage instanceof PreparedOkMessage) {
                synchronousSink.next(new PreparedIdentifier(client, ((PreparedOkMessage) serverMessage).getStatementId()));
            } else {
                ReferenceCountUtil.release(serverMessage);
            }
        }).last();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Flux<Flux<ServerMessage>> execute(Client client, ConnectionContext connectionContext, String str, PreparedIdentifier preparedIdentifier, boolean z, int i, List<Binding> list) {
        switch (list.size()) {
            case 0:
                return Flux.empty();
            case 1:
                return Flux.defer(() -> {
                    return execute0(client, connectionContext, str, preparedIdentifier, z, (Binding) list.get(0), i).windowUntil(RESULT_DONE);
                });
            default:
                Iterator<Binding> it = list.iterator();
                EmitterProcessor create = EmitterProcessor.create(1, true);
                Runnable runnable = () -> {
                    if (create.isCancelled() || create.isTerminated()) {
                        return;
                    }
                    try {
                        if (!it.hasNext()) {
                            create.onComplete();
                        } else if (preparedIdentifier.isClosed()) {
                            Binding.clearSubsequent(it);
                            create.onComplete();
                        } else {
                            create.onNext(it.next());
                        }
                    } catch (Throwable th) {
                        create.onError(th);
                    }
                };
                create.onNext(it.next());
                return create.concatMap(binding -> {
                    return execute0(client, connectionContext, str, preparedIdentifier, z, binding, i).doOnComplete(runnable);
                }).doOnCancel(() -> {
                    Binding.clearSubsequent(it);
                }).doOnError(th -> {
                    Binding.clearSubsequent(it);
                }).windowUntil(RESULT_DONE);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Mono<Void> close(Client client, int i) {
        return client.sendOnly(new PreparedCloseMessage(i));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Mono<Void> executeVoid(Client client, String str) {
        return Mono.defer(() -> {
            return execute0(client, str).doOnNext(SAFE_RELEASE).then();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Mono<Void> executeVoid(Client client, String... strArr) {
        return selfEmitter(InternalArrays.asReadOnlyList(strArr), client).doOnNext(SAFE_RELEASE).then();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Flux<Flux<ServerMessage>> execute(Client client, String str) {
        return Flux.defer(() -> {
            return execute0(client, str).windowUntil(RESULT_DONE);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Flux<Flux<ServerMessage>> execute(Client client, List<String> list) {
        return Flux.defer(() -> {
            switch (list.size()) {
                case 0:
                    return Flux.empty();
                case 1:
                    return execute0(client, (String) list.get(0)).windowUntil(RESULT_DONE);
                default:
                    return selfEmitter(list, client).windowUntil(RESULT_DONE);
            }
        });
    }

    private static Flux<ServerMessage> execute0(Client client, String str) {
        return OperatorUtils.discardOnCancel(client.exchange(new SimpleQueryMessage(str), FETCH_DONE)).doOnDiscard(ReferenceCounted.class, RELEASE).handle(new Handler(str));
    }

    private static Flux<ServerMessage> execute0(Client client, ConnectionContext connectionContext, String str, PreparedIdentifier preparedIdentifier, boolean z, Binding binding, int i) {
        if (i <= 0) {
            return OperatorUtils.discardOnCancel(client.exchange(binding.toMessage(preparedIdentifier.getId(), true), FETCH_DONE)).doOnDiscard(ReferenceCounted.class, RELEASE).handle(new Handler(str));
        }
        int id = preparedIdentifier.getId();
        return OperatorUtils.discardOnCancel(client.exchange(binding.toMessage(id, false), z ? FETCH_DONE : METADATA_DONE)).doOnDiscard(ReferenceCounted.class, RELEASE).handle(new TakeOne(str)).concatWith(Flux.defer(() -> {
            return fetch(client, connectionContext, preparedIdentifier, new PreparedFetchMessage(id, i), str);
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Flux<ServerMessage> fetch(Client client, ConnectionContext connectionContext, PreparedIdentifier preparedIdentifier, PreparedFetchMessage preparedFetchMessage, String str) {
        if ((connectionContext.getServerStatuses() & 64) == 0) {
            return Flux.empty();
        }
        EmitterProcessor create = EmitterProcessor.create(1, false);
        create.onNext(preparedFetchMessage);
        return create.concatMap(exchangeableMessage -> {
            return OperatorUtils.discardOnCancel(client.exchange(exchangeableMessage, FETCH_DONE)).doOnDiscard(ReferenceCounted.class, RELEASE).handle((serverMessage, synchronousSink) -> {
                if (serverMessage instanceof ErrorMessage) {
                    synchronousSink.error(ExceptionFactory.createException((ErrorMessage) serverMessage, str));
                    create.onComplete();
                    return;
                }
                if (!(serverMessage instanceof ServerStatusMessage)) {
                    synchronousSink.next(serverMessage);
                    return;
                }
                if ((((ServerStatusMessage) serverMessage).getServerStatuses() & 128) != 0) {
                    synchronousSink.next(serverMessage);
                    create.onComplete();
                } else {
                    if (create.isCancelled() || create.isTerminated()) {
                        return;
                    }
                    if (preparedIdentifier.isClosed()) {
                        create.onComplete();
                    } else {
                        create.onNext(preparedFetchMessage);
                    }
                }
            });
        });
    }

    private static Flux<ServerMessage> selfEmitter(List<String> list, Client client) {
        if (list.isEmpty()) {
            return Flux.empty();
        }
        Iterator<String> it = list.iterator();
        EmitterProcessor create = EmitterProcessor.create(1, true);
        Runnable runnable = () -> {
            OperatorUtils.emitIterator(create, it);
        };
        create.onNext(it.next());
        return create.concatMap(str -> {
            return execute0(client, str).doOnComplete(runnable);
        });
    }

    private QueryFlow() {
    }
}
