package io.trino.dispatcher;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.inject.Inject;
import io.airlift.concurrent.Threads;
import io.airlift.jaxrs.AsyncResponseHandler;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.trino.client.QueryError;
import io.trino.client.QueryResults;
import io.trino.client.StatementStats;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.QueryManagerConfig;
import io.trino.execution.QueryState;
import io.trino.server.HttpRequestSessionContextFactory;
import io.trino.server.ProtocolConfig;
import io.trino.server.ServerConfig;
import io.trino.server.SessionContext;
import io.trino.server.protocol.QueryInfoUrlFactory;
import io.trino.server.protocol.Slug;
import io.trino.server.security.InternalPrincipal;
import io.trino.server.security.ResourceSecurity;
import io.trino.spi.ErrorCode;
import io.trino.spi.QueryId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.security.Identity;
import io.trino.tracing.TrinoAttributes;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.container.Suspended;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriBuilder;
import jakarta.ws.rs.core.UriInfo;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

@Path("/v1/statement")
/* loaded from: input_file:io/trino/dispatcher/QueuedStatementResource.class */
public class QueuedStatementResource {
    private static final Logger log = Logger.get(QueuedStatementResource.class);
    private static final Duration MAX_WAIT_TIME = new Duration(1.0d, TimeUnit.SECONDS);
    private static final Ordering<Comparable<Duration>> WAIT_ORDERING = Ordering.natural().nullsLast();
    private static final Duration NO_DURATION = new Duration(0.0d, TimeUnit.MILLISECONDS);
    private final HttpRequestSessionContextFactory sessionContextFactory;
    private final DispatchManager dispatchManager;
    private final Tracer tracer;
    private final QueryInfoUrlFactory queryInfoUrlFactory;
    private final Executor responseExecutor;
    private final ScheduledExecutorService timeoutExecutor;
    private final boolean compressionEnabled;
    private final Optional<String> alternateHeaderName;
    private final QueryManager queryManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/dispatcher/QueuedStatementResource$Query.class */
    public static final class Query {
        private final String query;
        private final SessionContext sessionContext;
        private final DispatchManager dispatchManager;
        private final QueryId queryId;
        private final Optional<URI> queryInfoUrl;
        private final Span querySpan;
        private final Slug slug = Slug.createNew();
        private final AtomicLong lastToken = new AtomicLong();
        private final long initTime = System.nanoTime();
        private final AtomicReference<Boolean> submissionGate = new AtomicReference<>();
        private final SettableFuture<Void> creationFuture = SettableFuture.create();

        public Query(String str, SessionContext sessionContext, DispatchManager dispatchManager, QueryInfoUrlFactory queryInfoUrlFactory, Tracer tracer) {
            this.query = (String) Objects.requireNonNull(str, "query is null");
            this.sessionContext = (SessionContext) Objects.requireNonNull(sessionContext, "sessionContext is null");
            this.dispatchManager = (DispatchManager) Objects.requireNonNull(dispatchManager, "dispatchManager is null");
            this.queryId = dispatchManager.createQueryId();
            Objects.requireNonNull(queryInfoUrlFactory, "queryInfoUrlFactory is null");
            this.queryInfoUrl = queryInfoUrlFactory.getQueryInfoUrl(this.queryId);
            Objects.requireNonNull(tracer, "tracer is null");
            this.querySpan = tracer.spanBuilder("query").addLink(Span.current().getSpanContext()).setNoParent().setAttribute(TrinoAttributes.QUERY_ID, this.queryId.toString()).startSpan();
        }

        public QueryId getQueryId() {
            return this.queryId;
        }

        public Slug getSlug() {
            return this.slug;
        }

        public long getLastToken() {
            return this.lastToken.get();
        }

        public boolean tryAbandonSubmissionWithTimeout(Duration duration) {
            return Duration.nanosSince(this.initTime).compareTo(duration) >= 0 && this.submissionGate.compareAndSet(null, false);
        }

        public boolean isSubmissionAbandoned() {
            return Boolean.FALSE.equals(this.submissionGate.get());
        }

        public boolean isCreated() {
            return this.creationFuture.isDone();
        }

        private ListenableFuture<Void> waitForDispatched() {
            submitIfNeeded();
            return !this.creationFuture.isDone() ? Futures.nonCancellationPropagating(this.creationFuture) : this.dispatchManager.waitForDispatched(this.queryId);
        }

        private void submitIfNeeded() {
            if (this.submissionGate.compareAndSet(null, true)) {
                this.querySpan.addEvent("submit");
                this.creationFuture.setFuture(this.dispatchManager.createQuery(this.queryId, this.querySpan, this.slug, this.sessionContext, this.query));
            }
        }

        public QueryResults getQueryResults(long j, UriInfo uriInfo) {
            long j2 = this.lastToken.get();
            if (j != j2 && j != j2 + 1) {
                throw new WebApplicationException(Response.Status.GONE);
            }
            this.lastToken.compareAndSet(j2, j);
            if (!this.creationFuture.isDone()) {
                return createQueryResults(j + 1, uriInfo, DispatchInfo.queued(QueuedStatementResource.NO_DURATION, QueuedStatementResource.NO_DURATION));
            }
            return createQueryResults(j + 1, uriInfo, this.dispatchManager.getDispatchInfo(this.queryId).orElseThrow(() -> {
                return new WebApplicationException(Response.status(Response.Status.NOT_FOUND).build());
            }));
        }

        public void cancel() {
            this.creationFuture.addListener(() -> {
                this.dispatchManager.cancelQuery(this.queryId);
            }, MoreExecutors.directExecutor());
        }

        public void destroy() {
            this.querySpan.setStatus(StatusCode.ERROR).end();
            this.sessionContext.getIdentity().destroy();
        }

        private QueryResults createQueryResults(long j, UriInfo uriInfo, DispatchInfo dispatchInfo) {
            return QueuedStatementResource.createQueryResults(this.queryId, getNextUri(j, uriInfo, dispatchInfo), dispatchInfo.getFailureInfo().map(this::toQueryError), uriInfo, this.queryInfoUrl, dispatchInfo.getElapsedTime(), dispatchInfo.getQueuedTime());
        }

        private URI getNextUri(long j, UriInfo uriInfo, DispatchInfo dispatchInfo) {
            if (dispatchInfo.getFailureInfo().isPresent()) {
                return null;
            }
            return (URI) dispatchInfo.getCoordinatorLocation().map(coordinatorLocation -> {
                return getRedirectUri(coordinatorLocation, uriInfo);
            }).orElseGet(() -> {
                return QueuedStatementResource.getQueuedUri(this.queryId, this.slug, j, uriInfo);
            });
        }

        private URI getRedirectUri(CoordinatorLocation coordinatorLocation, UriInfo uriInfo) {
            return UriBuilder.fromUri(coordinatorLocation.getUri(uriInfo)).replacePath("/v1/statement/executing").path(this.queryId.toString()).path(this.slug.makeSlug(Slug.Context.EXECUTING_QUERY, 0L)).path("0").build(new Object[0]);
        }

        private QueryError toQueryError(ExecutionFailureInfo executionFailureInfo) {
            ErrorCode errorCode;
            if (executionFailureInfo.getErrorCode() != null) {
                errorCode = executionFailureInfo.getErrorCode();
            } else {
                errorCode = StandardErrorCode.GENERIC_INTERNAL_ERROR.toErrorCode();
                QueuedStatementResource.log.warn("Failed query %s has no error code", new Object[]{this.queryId});
            }
            return new QueryError((String) MoreObjects.firstNonNull(executionFailureInfo.getMessage(), "Internal error"), (String) null, errorCode.getCode(), errorCode.getName(), errorCode.getType().toString(), executionFailureInfo.getErrorLocation(), executionFailureInfo.toFailureInfo());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:io/trino/dispatcher/QueuedStatementResource$QueryManager.class */
    public static class QueryManager {
        private final ConcurrentMap<QueryId, Query> queries = new ConcurrentHashMap();
        private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(Threads.daemonThreadsNamed("drain-state-query-manager"));
        private final Duration querySubmissionTimeout;

        public QueryManager(Duration duration) {
            this.querySubmissionTimeout = (Duration) Objects.requireNonNull(duration, "querySubmissionTimeout is null");
        }

        public void initialize(DispatchManager dispatchManager) {
            this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
                try {
                    syncWith(dispatchManager);
                } catch (Throwable th) {
                    QueuedStatementResource.log.error(th, "Unexpected error synchronizing with dispatch manager");
                }
            }, 200L, 200L, TimeUnit.MILLISECONDS);
        }

        public void destroy() {
            this.scheduledExecutorService.shutdownNow();
        }

        private void syncWith(DispatchManager dispatchManager) {
            this.queries.forEach((queryId, query) -> {
                if (shouldBePurged(dispatchManager, query)) {
                    removeQuery(queryId);
                }
            });
        }

        private boolean shouldBePurged(DispatchManager dispatchManager, Query query) {
            if (query.isSubmissionAbandoned() || query.tryAbandonSubmissionWithTimeout(this.querySubmissionTimeout)) {
                return true;
            }
            return query.isCreated() && !dispatchManager.isQueryRegistered(query.getQueryId());
        }

        private void removeQuery(QueryId queryId) {
            Optional.ofNullable(this.queries.remove(queryId)).ifPresent(QueryManager::destroyQuietly);
        }

        private static void destroyQuietly(Query query) {
            try {
                query.destroy();
            } catch (Throwable th) {
                QueuedStatementResource.log.error(th, "Error destroying query");
            }
        }

        public void registerQuery(Query query) {
            Preconditions.checkState(this.queries.putIfAbsent(query.getQueryId(), query) == null, "Query already registered");
        }

        @Nullable
        public Query getQuery(QueryId queryId) {
            return this.queries.get(queryId);
        }
    }

    @Inject
    public QueuedStatementResource(HttpRequestSessionContextFactory httpRequestSessionContextFactory, DispatchManager dispatchManager, Tracer tracer, DispatchExecutor dispatchExecutor, QueryInfoUrlFactory queryInfoUrlFactory, ServerConfig serverConfig, ProtocolConfig protocolConfig, QueryManagerConfig queryManagerConfig) {
        this.sessionContextFactory = (HttpRequestSessionContextFactory) Objects.requireNonNull(httpRequestSessionContextFactory, "sessionContextFactory is null");
        this.dispatchManager = (DispatchManager) Objects.requireNonNull(dispatchManager, "dispatchManager is null");
        this.tracer = (Tracer) Objects.requireNonNull(tracer, "tracer is null");
        this.responseExecutor = dispatchExecutor.getExecutor();
        this.timeoutExecutor = dispatchExecutor.getScheduledExecutor();
        this.queryInfoUrlFactory = (QueryInfoUrlFactory) Objects.requireNonNull(queryInfoUrlFactory, "queryInfoUrlTemplate is null");
        this.compressionEnabled = serverConfig.isQueryResultsCompressionEnabled();
        this.alternateHeaderName = protocolConfig.getAlternateHeaderName();
        this.queryManager = new QueryManager(queryManagerConfig.getClientTimeout());
    }

    @PostConstruct
    public void start() {
        this.queryManager.initialize(this.dispatchManager);
    }

    @PreDestroy
    public void stop() {
        this.queryManager.destroy();
    }

    @ResourceSecurity(ResourceSecurity.AccessType.AUTHENTICATED_USER)
    @POST
    @Produces({"application/json"})
    public Response postStatement(String str, @Context HttpServletRequest httpServletRequest, @Context HttpHeaders httpHeaders, @Context UriInfo uriInfo) {
        if (Strings.isNullOrEmpty(str)) {
            throw badRequest(Response.Status.BAD_REQUEST, "SQL statement is empty");
        }
        Query registerQuery = registerQuery(str, httpServletRequest, httpHeaders);
        return createQueryResultsResponse(registerQuery.getQueryResults(registerQuery.getLastToken(), uriInfo));
    }

    private Query registerQuery(String str, HttpServletRequest httpServletRequest, HttpHeaders httpHeaders) {
        Optional<String> ofNullable = Optional.ofNullable(httpServletRequest.getRemoteAddr());
        Optional<Identity> ofNullable2 = Optional.ofNullable((Identity) httpServletRequest.getAttribute(HttpRequestSessionContextFactory.AUTHENTICATED_IDENTITY));
        Optional flatMap = ofNullable2.flatMap((v0) -> {
            return v0.getPrincipal();
        });
        Class<InternalPrincipal> cls = InternalPrincipal.class;
        Objects.requireNonNull(InternalPrincipal.class);
        if (((Boolean) flatMap.map((v1) -> {
            return r1.isInstance(v1);
        }).orElse(false)).booleanValue()) {
            throw badRequest(Response.Status.FORBIDDEN, "Internal communication can not be used to start a query");
        }
        Query query = new Query(str, this.sessionContextFactory.createSessionContext(httpHeaders.getRequestHeaders(), this.alternateHeaderName, ofNullable, ofNullable2), this.dispatchManager, this.queryInfoUrlFactory, this.tracer);
        this.queryManager.registerQuery(query);
        httpServletRequest.setAttribute(HttpRequestSessionContextFactory.AUTHENTICATED_IDENTITY, (Object) null);
        return query;
    }

    @Produces({"application/json"})
    @ResourceSecurity(ResourceSecurity.AccessType.PUBLIC)
    @GET
    @Path("queued/{queryId}/{slug}/{token}")
    public void getStatus(@PathParam("queryId") QueryId queryId, @PathParam("slug") String str, @PathParam("token") long j, @QueryParam("maxWait") Duration duration, @Context UriInfo uriInfo, @Suspended AsyncResponse asyncResponse) {
        AsyncResponseHandler.bindAsyncResponse(asyncResponse, getStatus(getQuery(queryId, str, j), j, duration, uriInfo), this.responseExecutor);
    }

    private ListenableFuture<Response> getStatus(Query query, long j, Duration duration, UriInfo uriInfo) {
        return FluentFuture.from(query.waitForDispatched()).withTimeout(((Duration) WAIT_ORDERING.min(MAX_WAIT_TIME, duration)).toMillis(), TimeUnit.MILLISECONDS, this.timeoutExecutor).catching(TimeoutException.class, timeoutException -> {
            return null;
        }, MoreExecutors.directExecutor()).transform(r9 -> {
            return query.getQueryResults(j, uriInfo);
        }, this.responseExecutor).transform(this::createQueryResultsResponse, MoreExecutors.directExecutor());
    }

    @Produces({"application/json"})
    @ResourceSecurity(ResourceSecurity.AccessType.PUBLIC)
    @DELETE
    @Path("queued/{queryId}/{slug}/{token}")
    public Response cancelQuery(@PathParam("queryId") QueryId queryId, @PathParam("slug") String str, @PathParam("token") long j) {
        getQuery(queryId, str, j).cancel();
        return Response.noContent().build();
    }

    private Query getQuery(QueryId queryId, String str, long j) {
        Query query = this.queryManager.getQuery(queryId);
        if (query == null || !query.getSlug().isValid(Slug.Context.QUEUED_QUERY, str, j)) {
            throw badRequest(Response.Status.NOT_FOUND, "Query not found");
        }
        return query;
    }

    private Response createQueryResultsResponse(QueryResults queryResults) {
        Response.ResponseBuilder ok = Response.ok(queryResults);
        if (!this.compressionEnabled) {
            ok.encoding("identity");
        }
        return ok.build();
    }

    private static URI getQueuedUri(QueryId queryId, Slug slug, long j, UriInfo uriInfo) {
        return uriInfo.getBaseUriBuilder().replacePath("/v1/statement/queued/").path(queryId.toString()).path(slug.makeSlug(Slug.Context.QUEUED_QUERY, j)).path(String.valueOf(j)).replaceQuery("").build(new Object[0]);
    }

    private static QueryResults createQueryResults(QueryId queryId, URI uri, Optional<QueryError> optional, UriInfo uriInfo, Optional<URI> optional2, Duration duration, Duration duration2) {
        QueryState queryState = (QueryState) optional.map(queryError -> {
            return QueryState.FAILED;
        }).orElse(QueryState.QUEUED);
        return new QueryResults(queryId.toString(), QueryInfoUrlFactory.getQueryInfoUri(optional2, queryId, uriInfo), (URI) null, uri, (List) null, (List) null, StatementStats.builder().setState(queryState.toString()).setQueued(queryState == QueryState.QUEUED).setProgressPercentage(OptionalDouble.empty()).setRunningPercentage(OptionalDouble.empty()).setElapsedTimeMillis(duration.toMillis()).setQueuedTimeMillis(duration2.toMillis()).build(), optional.orElse(null), ImmutableList.of(), (String) null, (Long) null);
    }

    private static WebApplicationException badRequest(Response.Status status, String str) {
        throw new WebApplicationException(Response.status(status).type(MediaType.TEXT_PLAIN_TYPE).entity(str).build());
    }
}
