/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.exception.KsqlException;
import io.confluent.ksql.api.client.impl.ClientImpl;
import io.confluent.ksql.api.client.impl.QueryResponseHandler;
import io.confluent.ksql.api.client.impl.RowImpl;
import io.confluent.ksql.api.client.impl.StreamedQueryResultImpl;
import io.confluent.ksql.api.client.util.RowUtil;
import io.confluent.ksql.rest.entity.QueryResponseMetadata;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.parsetools.RecordParser;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamQueryResponseHandler
extends QueryResponseHandler<CompletableFuture<StreamedQueryResult>> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamQueryResponseHandler.class);
    private StreamedQueryResultImpl queryResult;
    private Map<String, Integer> columnNameToIndex;
    private boolean paused;
    private AtomicReference<String> serializedConsistencyVector;
    private AtomicReference<String> continuationToken;
    private String sql;
    private Map<String, Object> properties;
    private ClientImpl client;

    StreamQueryResponseHandler(Context context, RecordParser recordParser, CompletableFuture<StreamedQueryResult> cf, AtomicReference<String> serializedCV, AtomicReference<String> continuationToken, String sql, Map<String, Object> properties, ClientImpl client) {
        super(context, recordParser, cf);
        this.serializedConsistencyVector = Objects.requireNonNull(serializedCV, "serializedCV");
        this.continuationToken = Objects.requireNonNull(continuationToken, "continuationToken");
        this.sql = Objects.requireNonNull(sql, "sql");
        this.properties = Objects.requireNonNull(properties, "properties");
        this.client = Objects.requireNonNull(client, "client");
    }

    @Override
    protected void handleMetadata(QueryResponseMetadata queryResponseMetadata) {
        this.queryResult = new StreamedQueryResultImpl(this.context, queryResponseMetadata.queryId, queryResponseMetadata.columnNames, RowUtil.columnTypesFromStrings(queryResponseMetadata.columnTypes), this.continuationToken, this.sql, this.properties, this.client);
        this.columnNameToIndex = RowUtil.valueToIndexMap(queryResponseMetadata.columnNames);
        this.cf.complete(this.queryResult);
    }

    @Override
    protected void handleRow(Buffer buff) {
        if (this.queryResult == null) {
            throw new IllegalStateException("handleRow called before metadata processed");
        }
        JsonObject jsonObject = buff.toJsonObject();
        if (!jsonObject.containsKey("finalMessage")) {
            if (jsonObject.containsKey("row")) {
                RowImpl row = new RowImpl(this.queryResult.columnNames(), this.queryResult.columnTypes(), new JsonArray((List)((Map)jsonObject.getMap().get("row")).get("columns")), this.columnNameToIndex);
                boolean full = this.queryResult.accept(row);
                if (full && !this.paused) {
                    this.recordParser.pause();
                    this.queryResult.drainHandler(this::publisherReceptive);
                    this.paused = true;
                }
            } else if (jsonObject.getMap() != null) {
                if (jsonObject.getMap().containsKey("consistencyToken")) {
                    LOG.info("Response contains consistency vector " + jsonObject);
                    this.serializedConsistencyVector.set((String)jsonObject.getMap().get("consistencyToken"));
                }
                if (jsonObject.getMap().containsKey("continuationToken")) {
                    LOG.info("Response contains continuation token " + jsonObject);
                    this.continuationToken.set((String)((Map)jsonObject.getMap().get("continuationToken")).get("continuationToken"));
                }
                if (jsonObject.getMap().containsKey("errorMessage")) {
                    this.queryResult.handleError(new KsqlException((String)((Map)jsonObject.getMap().get("errorMessage")).get("message")));
                }
            } else {
                throw new RuntimeException("Could not decode JSON: " + jsonObject);
            }
        }
    }

    @Override
    protected void doHandleBodyEnd() {
        this.queryResult.complete();
    }

    @Override
    public void handleExceptionAfterFutureCompleted(Throwable t) {
        this.queryResult.handleError(new Exception(t));
    }

    private void publisherReceptive() {
        this.checkContext();
        this.paused = false;
        this.recordParser.resume();
    }
}

