package io.confluent.ksql.rest.server.resources.streaming;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.confluent.ksql.api.server.StreamingOutput;
import io.confluent.ksql.execution.pull.PullQueryResult;
import io.confluent.ksql.execution.pull.PullQueryRow;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.query.PullQueryWriteStream;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.ConsistencyToken;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.ConsistencyOffsetVector;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/PullQueryStreamWriter.class */
public class PullQueryStreamWriter implements StreamingOutput {
    private static final Logger LOG = LoggerFactory.getLogger(PullQueryStreamWriter.class);
    private static final int WRITE_TIMEOUT_MS = 3000;
    private static final int FLUSH_SIZE_BYTES = 51200;
    private static final long MAX_FLUSH_MS = 1000;
    private final long disconnectCheckInterval;
    private final PullQueryWriteStream pullQueryQueue;
    private final Clock clock;
    private final PullQueryResult result;
    private final ObjectMapper objectMapper;
    private AtomicBoolean completed = new AtomicBoolean(false);
    private AtomicBoolean connectionClosed = new AtomicBoolean(false);
    private AtomicReference<Throwable> pullQueryException = new AtomicReference<>(null);
    private AtomicBoolean closed = new AtomicBoolean(false);
    private boolean sentAtLeastOneRow = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/PullQueryStreamWriter$QueueWrapper.class */
    public static final class QueueWrapper {
        public static final PullQueryRow END_ROW = new PullQueryRow((List) null, (LogicalSchema) null, (Optional) null, (Optional) null);
        private final PullQueryWriteStream pullQueryQueue;
        private final long disconnectCheckInterval;
        private PullQueryRow head = null;

        QueueWrapper(PullQueryWriteStream pullQueryWriteStream, long j) {
            this.pullQueryQueue = pullQueryWriteStream;
            this.disconnectCheckInterval = j;
        }

        public boolean hasAnotherRow() {
            return this.head != null;
        }

        public PullQueryRow pollNextRow() throws InterruptedException {
            PullQueryRow pollRow = this.pullQueryQueue.pollRow(this.disconnectCheckInterval, TimeUnit.MILLISECONDS);
            if (pollRow == END_ROW) {
                return END_ROW;
            }
            if (pollRow == null) {
                return null;
            }
            PullQueryRow pullQueryRow = this.head;
            this.head = pollRow;
            return pullQueryRow;
        }

        public List<PullQueryRow> drain() {
            ArrayList newArrayList = Lists.newArrayList();
            if (this.head != null) {
                newArrayList.add(this.head);
            }
            this.head = null;
            this.pullQueryQueue.drainRowsTo(newArrayList);
            newArrayList.remove(END_ROW);
            return newArrayList;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/PullQueryStreamWriter$WriterState.class */
    public static class WriterState {
        private final Clock clock;
        private StringBuilder sb = new StringBuilder();
        private long lastFlushMs;

        WriterState(Clock clock) {
            this.clock = clock;
        }

        public WriterState append(String str) {
            this.sb.append(str);
            return this;
        }

        public int length() {
            return this.sb.length();
        }

        public long getLastFlushMs() {
            return this.lastFlushMs;
        }

        public String getStringToFlush() {
            String sb = this.sb.toString();
            this.sb = new StringBuilder();
            this.lastFlushMs = this.clock.millis();
            return sb;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PullQueryStreamWriter(PullQueryResult pullQueryResult, long j, ObjectMapper objectMapper, PullQueryWriteStream pullQueryWriteStream, Clock clock, CompletableFuture<Void> completableFuture, KsqlParser.PreparedStatement<Query> preparedStatement) {
        this.result = (PullQueryResult) Objects.requireNonNull(pullQueryResult, "result");
        this.objectMapper = (ObjectMapper) Objects.requireNonNull(objectMapper, "objectMapper");
        this.disconnectCheckInterval = j;
        this.pullQueryQueue = (PullQueryWriteStream) Objects.requireNonNull(pullQueryWriteStream, "pullQueryQueue");
        this.clock = (Clock) Objects.requireNonNull(clock, "clock");
        completableFuture.thenAccept(r4 -> {
            this.connectionClosed.set(true);
        });
        pullQueryResult.onException(th -> {
            if (this.pullQueryException.getAndSet(th) == null) {
                interruptWriterThread();
            }
        });
        pullQueryResult.onCompletion(r7 -> {
            if (this.completed.getAndSet(true)) {
                return;
            }
            Optional consistencyOffsetVector = pullQueryResult.getConsistencyOffsetVector();
            pullQueryWriteStream.getClass();
            consistencyOffsetVector.ifPresent(pullQueryWriteStream::putConsistencyVector);
            interruptWriterThread();
        });
        try {
            pullQueryResult.start();
        } catch (Exception e) {
            throw new KsqlStatementException(e.getMessage() == null ? "Server Error" : e.getMessage(), preparedStatement.getMaskedStatementText(), e);
        }
    }

    @Override // io.confluent.ksql.api.server.StreamingOutput
    public void write(OutputStream outputStream) {
        try {
            try {
                WriterState writerState = new WriterState(this.clock);
                QueueWrapper queueWrapper = new QueueWrapper(this.pullQueryQueue, this.disconnectCheckInterval);
                writerState.append("[").append(writeValueAsString(StreamedRow.header(this.result.getQueryId(), this.result.getSchema())));
                while (!this.connectionClosed.get() && !isCompletedOrHasException()) {
                    processRow(outputStream, writerState, queueWrapper);
                }
                if (this.connectionClosed.get()) {
                    close();
                    return;
                }
                drainAndThrowOnError(outputStream, writerState, queueWrapper);
                drainAndWrite(writerState, queueWrapper);
                writerState.append("]");
                if (writerState.length() > 0) {
                    outputStream.write(writerState.getStringToFlush().getBytes(StandardCharsets.UTF_8));
                    outputStream.flush();
                }
                close();
            } catch (InterruptedException e) {
                LOG.warn("Interrupted while writing to connection stream");
                close();
            } catch (Throwable th) {
                LOG.error("Exception occurred while writing to connection stream: ", th);
                outputException(outputStream, th);
                close();
            }
        } catch (Throwable th2) {
            close();
            throw th2;
        }
    }

    private void processRow(OutputStream outputStream, WriterState writerState, QueueWrapper queueWrapper) throws Throwable {
        PullQueryRow pollNextRow = queueWrapper.pollNextRow();
        if (pollNextRow == QueueWrapper.END_ROW) {
            return;
        }
        if (pollNextRow != null) {
            writeRow(pollNextRow, writerState, queueWrapper.hasAnotherRow());
            if (writerState.length() >= FLUSH_SIZE_BYTES || this.clock.millis() - writerState.getLastFlushMs() >= MAX_FLUSH_MS) {
                outputStream.write(writerState.getStringToFlush().getBytes(StandardCharsets.UTF_8));
                outputStream.flush();
            }
        }
        drainAndThrowOnError(outputStream, writerState, queueWrapper);
    }

    private void writeRow(PullQueryRow pullQueryRow, WriterState writerState, boolean z) {
        if (!this.sentAtLeastOneRow) {
            writerState.append(",").append(System.lineSeparator());
            this.sentAtLeastOneRow = true;
        }
        writerState.append(writeValueAsString(pullQueryRow.getConsistencyOffsetVector().isPresent() ? StreamedRow.consistencyToken(new ConsistencyToken(((ConsistencyOffsetVector) pullQueryRow.getConsistencyOffsetVector().get()).serialize())) : StreamedRow.pullRow(pullQueryRow.getGenericRow(), pullQueryRow.getSourceNode())));
        if (z) {
            writerState.append(",").append(System.lineSeparator());
        }
    }

    private void drainAndThrowOnError(OutputStream outputStream, WriterState writerState, QueueWrapper queueWrapper) throws Throwable {
        if (this.pullQueryException.get() != null) {
            drainAndWrite(writerState, queueWrapper);
            outputStream.write(writerState.getStringToFlush().getBytes(StandardCharsets.UTF_8));
            outputStream.flush();
            throw this.pullQueryException.get();
        }
    }

    private void drainAndWrite(WriterState writerState, QueueWrapper queueWrapper) {
        List<PullQueryRow> drain = queueWrapper.drain();
        int i = 0;
        Iterator<PullQueryRow> it = drain.iterator();
        while (it.hasNext()) {
            writeRow(it.next(), writerState, i + 1 < drain.size());
            i++;
        }
    }

    private void outputException(OutputStream outputStream, Throwable th) {
        if (this.connectionClosed.get()) {
            return;
        }
        try {
            outputStream.write(",\n".getBytes(StandardCharsets.UTF_8));
            if (th.getCause() instanceof KsqlException) {
                this.objectMapper.writeValue(outputStream, StreamedRow.error(th.getCause(), Errors.ERROR_CODE_SERVER_ERROR));
            } else {
                this.objectMapper.writeValue(outputStream, StreamedRow.error(th, Errors.ERROR_CODE_SERVER_ERROR));
            }
            outputStream.write("]\n".getBytes(StandardCharsets.UTF_8));
            outputStream.flush();
        } catch (IOException e) {
            LOG.debug("Client disconnected while attempting to write an error message");
        }
    }

    @Override // io.confluent.ksql.api.server.StreamingOutput, java.lang.AutoCloseable
    public void close() {
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.result.stop();
    }

    @Override // io.confluent.ksql.api.server.StreamingOutput
    public int getWriteTimeoutMs() {
        return 3000;
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    private boolean isCompletedOrHasException() {
        return this.completed.get() || this.pullQueryException.get() != null;
    }

    private void interruptWriterThread() {
        this.pullQueryQueue.end();
    }

    private String writeValueAsString(Object obj) {
        try {
            return this.objectMapper.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }
}
