package io.confluent.ksql.util;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.execution.scalablepush.PushQueryPreparer;
import io.confluent.ksql.execution.scalablepush.PushQueryQueuePopulator;
import io.confluent.ksql.execution.scalablepush.PushRouting;
import io.confluent.ksql.internal.ScalablePushQueryMetrics;
import io.confluent.ksql.query.CompletionHandler;
import io.confluent.ksql.query.LimitHandler;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.TransientQueryQueue;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.PushQueryMetadata;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

/* loaded from: input_file:io/confluent/ksql/util/ScalablePushQueryMetadata.class */
public class ScalablePushQueryMetadata implements PushQueryMetadata {
    private final LogicalSchema logicalSchema;
    private final QueryId queryId;
    private final TransientQueryQueue transientQueryQueue;
    private final Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics;
    private final PushQueryMetadata.ResultType resultType;
    private final PushQueryQueuePopulator pushQueryQueuePopulator;
    private final PushQueryPreparer pushQueryPreparer;
    private final KsqlConstants.QuerySourceType sourceType;
    private final KsqlConstants.RoutingNodeType routingNodeType;
    private final Supplier<Long> rowsProcessedSupplier;
    private volatile boolean closed = false;
    private CompletableFuture<PushRouting.PushConnectionsHandle> startFuture = new CompletableFuture<>();
    private CompletableFuture<Void> runningFuture = new CompletableFuture<>();

    @SuppressFBWarnings({"EI_EXPOSE_REP2"})
    public ScalablePushQueryMetadata(LogicalSchema logicalSchema, QueryId queryId, TransientQueryQueue transientQueryQueue, Optional<ScalablePushQueryMetrics> optional, PushQueryMetadata.ResultType resultType, PushQueryQueuePopulator pushQueryQueuePopulator, PushQueryPreparer pushQueryPreparer, KsqlConstants.QuerySourceType querySourceType, KsqlConstants.RoutingNodeType routingNodeType, Supplier<Long> supplier) {
        this.logicalSchema = logicalSchema;
        this.queryId = queryId;
        this.transientQueryQueue = transientQueryQueue;
        this.scalablePushQueryMetrics = optional;
        this.resultType = resultType;
        this.pushQueryQueuePopulator = pushQueryQueuePopulator;
        this.pushQueryPreparer = pushQueryPreparer;
        this.sourceType = querySourceType;
        this.routingNodeType = routingNodeType;
        this.rowsProcessedSupplier = supplier;
    }

    public void prepare() {
        this.pushQueryPreparer.prepare();
    }

    @Override // io.confluent.ksql.util.PushQueryMetadata
    public void start() {
        CompletableFuture.completedFuture(null).thenCompose(obj -> {
            return this.pushQueryQueuePopulator.run();
        }).thenApply(pushConnectionsHandle -> {
            this.startFuture.complete(pushConnectionsHandle);
            CompletableFuture<Void> completableFuture = this.runningFuture;
            completableFuture.getClass();
            pushConnectionsHandle.onException(completableFuture::completeExceptionally);
            return null;
        }).exceptionally(th -> {
            this.startFuture.completeExceptionally(th);
            this.runningFuture.completeExceptionally(th);
            return null;
        });
    }

    @Override // io.confluent.ksql.util.PushQueryMetadata
    public void close() {
        this.transientQueryQueue.close();
        this.startFuture.thenApply(pushConnectionsHandle -> {
            pushConnectionsHandle.close();
            return null;
        });
        this.closed = true;
    }

    @Override // io.confluent.ksql.util.PushQueryMetadata
    public boolean isRunning() {
        return !this.closed;
    }

    @Override // io.confluent.ksql.util.PushQueryMetadata
    @SuppressFBWarnings({"EI_EXPOSE_REP"})
    public TransientQueryQueue getRowQueue() {
        return this.transientQueryQueue;
    }

    @Override // io.confluent.ksql.util.PushQueryMetadata
    public void setLimitHandler(LimitHandler limitHandler) {
        this.transientQueryQueue.setLimitHandler(limitHandler);
    }

    @Override // io.confluent.ksql.util.PushQueryMetadata
    public void setCompletionHandler(CompletionHandler completionHandler) {
        this.transientQueryQueue.setCompletionHandler(completionHandler);
    }

    @Override // io.confluent.ksql.util.PushQueryMetadata
    public void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
        streamsUncaughtExceptionHandler.getClass();
        onException(streamsUncaughtExceptionHandler::handle);
    }

    @Override // io.confluent.ksql.util.PushQueryMetadata
    public LogicalSchema getLogicalSchema() {
        return this.logicalSchema;
    }

    @Override // io.confluent.ksql.util.PushQueryMetadata
    public QueryId getQueryId() {
        return this.queryId;
    }

    @Override // io.confluent.ksql.util.PushQueryMetadata
    public PushQueryMetadata.ResultType getResultType() {
        return this.resultType;
    }

    public void onException(Consumer<Throwable> consumer) {
        this.runningFuture.exceptionally(th -> {
            this.scalablePushQueryMetrics.ifPresent(scalablePushQueryMetrics -> {
                scalablePushQueryMetrics.recordErrorRate(1.0d, this.sourceType, this.routingNodeType);
            });
            consumer.accept(th);
            return null;
        });
    }

    public void onCompletion(Consumer<Void> consumer) {
        this.runningFuture.thenAccept((Consumer<? super Void>) consumer);
    }

    public void onCompletionOrException(BiConsumer<Void, Throwable> biConsumer) {
        this.runningFuture.handle((r5, th) -> {
            biConsumer.accept(r5, th);
            return null;
        });
    }

    public KsqlConstants.QuerySourceType getSourceType() {
        return this.sourceType;
    }

    public KsqlConstants.RoutingNodeType getRoutingNodeType() {
        return this.routingNodeType;
    }

    public long getTotalRowsReturned() {
        return this.transientQueryQueue.getTotalRowsQueued();
    }

    public long getTotalRowsProcessed() {
        return this.rowsProcessedSupplier.get().longValue();
    }
}
