package io.trino.server.remotetask;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.net.MediaType;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.concurrent.SetThreadName;
import io.airlift.http.client.FullJsonResponseHandler;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpUriBuilder;
import io.airlift.http.client.Request;
import io.airlift.http.client.StaticBodyGenerator;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.trino.Session;
import io.trino.SystemSessionProperties;
import io.trino.execution.DynamicFiltersCollector;
import io.trino.execution.FutureStateChange;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.PartitionedSplitsInfo;
import io.trino.execution.RemoteTask;
import io.trino.execution.ScheduledSplit;
import io.trino.execution.SplitAssignment;
import io.trino.execution.StateMachine;
import io.trino.execution.TaskId;
import io.trino.execution.TaskInfo;
import io.trino.execution.TaskState;
import io.trino.execution.TaskStatus;
import io.trino.execution.buffer.OutputBuffers;
import io.trino.execution.buffer.PipelinedBufferInfo;
import io.trino.execution.buffer.PipelinedOutputBuffers;
import io.trino.execution.buffer.SpoolingOutputStats;
import io.trino.metadata.Split;
import io.trino.operator.TaskStats;
import io.trino.server.DynamicFilterService;
import io.trino.server.FailTaskRequest;
import io.trino.server.TaskUpdateRequest;
import io.trino.spi.HostAddress;
import io.trino.spi.QueryId;
import io.trino.spi.SplitWeight;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.TrinoTransportException;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.DynamicFilterId;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.tracing.TrinoAttributes;
import io.trino.util.Failures;
import java.net.URI;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.joda.time.DateTime;

/* loaded from: input_file:io/trino/server/remotetask/HttpRemoteTask.class */
public final class HttpRemoteTask implements RemoteTask {
    private static final Logger log = Logger.get(HttpRemoteTask.class);
    private final TaskId taskId;
    private final Session session;
    private final Span stageSpan;
    private final String nodeId;
    private final AtomicBoolean speculative;
    private final PlanFragment planFragment;
    private final RemoteTaskStats stats;
    private final Tracer tracer;
    private final Span span;
    private final TaskInfoFetcher taskInfoFetcher;
    private final ContinuousTaskStatusFetcher taskStatusFetcher;
    private final DynamicFiltersFetcher dynamicFiltersFetcher;
    private final DynamicFiltersCollector outboundDynamicFiltersCollector;
    private final int maxUnacknowledgedSplits;

    @GuardedBy("this")
    private volatile int pendingSourceSplitCount;

    @GuardedBy("this")
    private volatile long pendingSourceSplitsWeight;

    @VisibleForTesting
    final AtomicInteger splitBatchSize;
    private final boolean summarizeTaskInfo;
    private final HttpClient httpClient;
    private final Executor executor;
    private final ScheduledExecutorService errorScheduledExecutor;
    private final Duration maxErrorDuration;
    private final Duration taskTerminationTimeout;
    private final JsonCodec<TaskInfo> taskInfoCodec;
    private final JsonCodec<TaskUpdateRequest> taskUpdateRequestCodec;
    private final JsonCodec<FailTaskRequest> failTaskRequestCodec;
    private final RequestErrorTracker updateErrorTracker;
    private final NodeTaskMap.PartitionedSplitCountTracker partitionedSplitCountTracker;
    private final int guaranteedSplitsPerRequest;
    private final long maxRequestSizeInBytes;
    private final long requestSizeHeadroomInBytes;
    private final boolean adaptiveUpdateRequestSizeEnabled;
    private final AtomicLong nextSplitId = new AtomicLong();
    private final AtomicLong sentDynamicFiltersVersion = new AtomicLong(0);
    private final AtomicLong terminationStartedNanos = new AtomicLong();
    private final AtomicReference<Future<?>> currentRequest = new AtomicReference<>();

    @GuardedBy("this")
    private final SetMultimap<PlanNodeId, ScheduledSplit> pendingSplits = HashMultimap.create();

    @GuardedBy("this")
    private final Map<PlanNodeId, Boolean> noMoreSplits = new HashMap();
    private final AtomicReference<OutputBuffers> outputBuffers = new AtomicReference<>();
    private final FutureStateChange<Void> whenSplitQueueHasSpace = new FutureStateChange<>();

    @GuardedBy("this")
    private boolean splitQueueHasSpace = true;

    @GuardedBy("this")
    private OptionalLong whenSplitQueueHasSpaceThreshold = OptionalLong.empty();
    private final AtomicInteger pendingRequestsCounter = new AtomicInteger(0);
    private final AtomicBoolean sendPlan = new AtomicBoolean(true);
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final AtomicBoolean terminating = new AtomicBoolean(false);
    private final AtomicBoolean cleanedUp = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/server/remotetask/HttpRemoteTask$UpdateResponseHandler.class */
    public class UpdateResponseHandler implements SimpleHttpResponseCallback<TaskInfo> {
        private final List<SplitAssignment> splitAssignments;
        private final long currentRequestDynamicFiltersVersion;
        private final long currentRequestStartNanos;
        private final int currentPendingRequestsCounter;

        private UpdateResponseHandler(List<SplitAssignment> list, long j, long j2, int i) {
            this.splitAssignments = ImmutableList.copyOf((Collection) Objects.requireNonNull(list, "splitAssignments is null"));
            this.currentRequestDynamicFiltersVersion = j;
            this.currentRequestStartNanos = j2;
            this.currentPendingRequestsCounter = i;
        }

        @Override // io.trino.server.remotetask.SimpleHttpResponseCallback
        public void success(TaskInfo taskInfo) {
            SetThreadName setThreadName = new SetThreadName("UpdateResponseHandler-%s", new Object[]{HttpRemoteTask.this.taskId});
            try {
                HttpRemoteTask.this.sentDynamicFiltersVersion.set(this.currentRequestDynamicFiltersVersion);
                HttpRemoteTask.this.outboundDynamicFiltersCollector.acknowledge(this.currentRequestDynamicFiltersVersion);
                HttpRemoteTask.this.sendPlan.set(taskInfo.isNeedsPlan());
                HttpRemoteTask.this.currentRequest.set(null);
                updateStats();
                HttpRemoteTask.this.updateErrorTracker.requestSucceeded();
                HttpRemoteTask.this.processTaskUpdate(taskInfo, this.splitAssignments);
                if (HttpRemoteTask.this.pendingRequestsCounter.addAndGet(-this.currentPendingRequestsCounter) > 0) {
                    HttpRemoteTask.this.scheduleUpdate();
                }
                setThreadName.close();
            } catch (Throwable th) {
                try {
                    setThreadName.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }

        @Override // io.trino.server.remotetask.SimpleHttpResponseCallback
        public void failed(Throwable th) {
            SetThreadName setThreadName = new SetThreadName("UpdateResponseHandler-%s", new Object[]{HttpRemoteTask.this.taskId});
            try {
                try {
                    HttpRemoteTask.this.currentRequest.set(null);
                    updateStats();
                    if (!HttpRemoteTask.this.getTaskStatus().getState().isDone()) {
                        HttpRemoteTask.this.updateErrorTracker.requestFailed(th);
                    }
                    HttpRemoteTask.this.scheduleUpdate();
                } catch (Error e) {
                    HttpRemoteTask.this.fatalUnacknowledgedFailure(e);
                    throw e;
                } catch (RuntimeException e2) {
                    HttpRemoteTask.this.fatalUnacknowledgedFailure(e2);
                }
                setThreadName.close();
            } catch (Throwable th2) {
                try {
                    setThreadName.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
                throw th2;
            }
        }

        @Override // io.trino.server.remotetask.SimpleHttpResponseCallback
        public void fatal(Throwable th) {
            SetThreadName setThreadName = new SetThreadName("UpdateResponseHandler-%s", new Object[]{HttpRemoteTask.this.taskId});
            try {
                HttpRemoteTask.this.fatalUnacknowledgedFailure(th);
                setThreadName.close();
            } catch (Throwable th2) {
                try {
                    setThreadName.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
                throw th2;
            }
        }

        private void updateStats() {
            HttpRemoteTask.this.stats.updateRoundTripMillis(Duration.nanosSince(this.currentRequestStartNanos).toMillis());
        }
    }

    public HttpRemoteTask(Session session, Span span, TaskId taskId, String str, boolean z, URI uri, PlanFragment planFragment, Multimap<PlanNodeId, Split> multimap, OutputBuffers outputBuffers, HttpClient httpClient, Executor executor, ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService2, Duration duration, Duration duration2, Duration duration3, Duration duration4, boolean z2, JsonCodec<TaskStatus> jsonCodec, JsonCodec<DynamicFiltersCollector.VersionedDynamicFilterDomains> jsonCodec2, JsonCodec<TaskInfo> jsonCodec3, JsonCodec<TaskUpdateRequest> jsonCodec4, JsonCodec<FailTaskRequest> jsonCodec5, NodeTaskMap.PartitionedSplitCountTracker partitionedSplitCountTracker, Tracer tracer, RemoteTaskStats remoteTaskStats, DynamicFilterService dynamicFilterService, Set<DynamicFilterId> set, Optional<DataSize> optional) {
        Objects.requireNonNull(session, "session is null");
        Objects.requireNonNull(span, "stageSpan is null");
        Objects.requireNonNull(taskId, "taskId is null");
        Objects.requireNonNull(str, "nodeId is null");
        Objects.requireNonNull(uri, "location is null");
        Objects.requireNonNull(planFragment, "planFragment is null");
        Objects.requireNonNull(outputBuffers, "outputBuffers is null");
        Objects.requireNonNull(httpClient, "httpClient is null");
        Objects.requireNonNull(executor, "executor is null");
        Objects.requireNonNull(jsonCodec, "taskStatusCodec is null");
        Objects.requireNonNull(jsonCodec3, "taskInfoCodec is null");
        Objects.requireNonNull(jsonCodec4, "taskUpdateRequestCodec is null");
        Objects.requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
        Objects.requireNonNull(remoteTaskStats, "stats is null");
        Objects.requireNonNull(set, "outboundDynamicFilterIds is null");
        Objects.requireNonNull(optional, "estimatedMemory is null");
        SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{taskId});
        try {
            this.taskId = taskId;
            this.session = session;
            this.stageSpan = span;
            this.nodeId = str;
            this.speculative = new AtomicBoolean(z);
            this.planFragment = planFragment;
            this.outputBuffers.set(outputBuffers);
            this.httpClient = httpClient;
            this.executor = executor;
            this.errorScheduledExecutor = scheduledExecutorService2;
            this.maxErrorDuration = (Duration) Objects.requireNonNull(duration, "maxErrorDuration is null");
            this.taskTerminationTimeout = (Duration) Objects.requireNonNull(duration4, "taskTerminationTimeout is null");
            this.summarizeTaskInfo = z2;
            this.taskInfoCodec = jsonCodec3;
            this.taskUpdateRequestCodec = jsonCodec4;
            this.failTaskRequestCodec = jsonCodec5;
            this.updateErrorTracker = new RequestErrorTracker(taskId, uri, duration, scheduledExecutorService2, "updating task");
            this.partitionedSplitCountTracker = (NodeTaskMap.PartitionedSplitCountTracker) Objects.requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
            this.tracer = (Tracer) Objects.requireNonNull(tracer, "tracer is null");
            this.span = createSpanBuilder("remote-task", span).startSpan();
            this.stats = remoteTaskStats;
            for (Map.Entry entry : multimap.entries()) {
                this.pendingSplits.put((PlanNodeId) entry.getKey(), new ScheduledSplit(this.nextSplitId.getAndIncrement(), (PlanNodeId) entry.getKey(), (Split) entry.getValue()));
            }
            this.maxUnacknowledgedSplits = SystemSessionProperties.getMaxUnacknowledgedSplitsPerTask(session);
            this.guaranteedSplitsPerRequest = SystemSessionProperties.getRemoteTaskGuaranteedSplitsPerRequest(session);
            this.maxRequestSizeInBytes = SystemSessionProperties.getMaxRemoteTaskRequestSize(session).toBytes();
            this.requestSizeHeadroomInBytes = SystemSessionProperties.getRemoteTaskRequestSizeHeadroom(session).toBytes();
            this.splitBatchSize = new AtomicInteger(this.maxUnacknowledgedSplits);
            long size = planFragment.getPartitionedSources().size();
            this.adaptiveUpdateRequestSizeEnabled = size == 1 && SystemSessionProperties.isRemoteTaskAdaptiveUpdateRequestSizeEnabled(session);
            if (size > 1) {
                log.debug("%s - There are more than one partitioned sources: numOfPartitionedSources=%s", new Object[]{taskId, Integer.valueOf(planFragment.getPartitionedSources().size())});
            }
            int i = 0;
            long j = 0;
            Iterator<PlanNodeId> it = planFragment.getPartitionedSources().iterator();
            while (it.hasNext()) {
                Collection collection = multimap.get(it.next());
                if (!collection.isEmpty()) {
                    i += collection.size();
                    j = Math.addExact(j, SplitWeight.rawValueSum(collection, (v0) -> {
                        return v0.getSplitWeight();
                    }));
                }
            }
            this.pendingSourceSplitCount = i;
            this.pendingSourceSplitsWeight = j;
            TaskInfo createInitialTask = TaskInfo.createInitialTask(taskId, uri, str, this.speculative.get(), outputBuffers instanceof PipelinedOutputBuffers ? Optional.of((List) ((PipelinedOutputBuffers) outputBuffers).getBuffers().keySet().stream().map(outputBufferId -> {
                return new PipelinedBufferInfo(outputBufferId, 0L, 0L, 0, 0L, 0L, false);
            }).collect(ImmutableList.toImmutableList())) : Optional.empty(), new TaskStats(DateTime.now(), null));
            this.dynamicFiltersFetcher = new DynamicFiltersFetcher(this::fatalUnacknowledgedFailure, taskId, uri, duration2, jsonCodec2, executor, httpClient, () -> {
                return createSpanBuilder("task-dynamic-filters", this.span);
            }, duration, scheduledExecutorService2, remoteTaskStats, dynamicFilterService);
            this.taskStatusFetcher = new ContinuousTaskStatusFetcher(this::fatalUnacknowledgedFailure, createInitialTask.getTaskStatus(), duration2, jsonCodec, this.dynamicFiltersFetcher, executor, httpClient, () -> {
                return createSpanBuilder("task-status", this.span);
            }, duration, scheduledExecutorService2, remoteTaskStats);
            this.taskInfoFetcher = new TaskInfoFetcher(this::fatalUnacknowledgedFailure, this.taskStatusFetcher, createInitialTask, httpClient, () -> {
                return createSpanBuilder("task-info", this.span);
            }, duration3, jsonCodec3, duration, z2, executor, scheduledExecutorService, scheduledExecutorService2, remoteTaskStats, optional, SystemSessionProperties.getRetryPolicy(session));
            this.taskStatusFetcher.addStateChangeListener(taskStatus -> {
                TaskState state = taskStatus.getState();
                if (state.isTerminatingOrDone()) {
                    cleanUpTask(state);
                } else {
                    partitionedSplitCountTracker.setPartitionedSplits(getPartitionedSplitsInfo());
                    updateSplitQueueSpace();
                }
                if (state.isDone()) {
                    this.span.end();
                }
            });
            this.outboundDynamicFiltersCollector = new DynamicFiltersCollector(this::triggerUpdate);
            QueryId queryId = taskId.getQueryId();
            int attemptId = taskId.getAttemptId();
            DynamicFiltersCollector dynamicFiltersCollector = this.outboundDynamicFiltersCollector;
            Objects.requireNonNull(dynamicFiltersCollector);
            dynamicFilterService.registerDynamicFilterConsumer(queryId, attemptId, set, dynamicFiltersCollector::updateDomains);
            partitionedSplitCountTracker.setPartitionedSplits(getPartitionedSplitsInfo());
            updateSplitQueueSpace();
            setThreadName.close();
        } catch (Throwable th) {
            try {
                setThreadName.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Override // io.trino.execution.RemoteTask
    public TaskId getTaskId() {
        return this.taskId;
    }

    @Override // io.trino.execution.RemoteTask
    public String getNodeId() {
        return this.nodeId;
    }

    @Override // io.trino.execution.RemoteTask
    public TaskInfo getTaskInfo() {
        return this.taskInfoFetcher.getTaskInfo();
    }

    @Override // io.trino.execution.RemoteTask
    public TaskStatus getTaskStatus() {
        return this.taskStatusFetcher.getTaskStatus();
    }

    @Override // io.trino.execution.RemoteTask
    public void start() {
        SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});
        try {
            this.started.set(true);
            triggerUpdate();
            this.dynamicFiltersFetcher.start();
            this.taskStatusFetcher.start();
            this.taskInfoFetcher.start();
            setThreadName.close();
        } catch (Throwable th) {
            try {
                setThreadName.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Override // io.trino.execution.RemoteTask
    public synchronized void addSplits(Multimap<PlanNodeId, Split> multimap) {
        Objects.requireNonNull(multimap, "splitsBySource is null");
        if (getTaskStatus().getState().isTerminatingOrDone() || this.terminating.get()) {
            return;
        }
        boolean z = false;
        for (Map.Entry entry : multimap.asMap().entrySet()) {
            PlanNodeId planNodeId = (PlanNodeId) entry.getKey();
            Collection<Split> collection = (Collection) entry.getValue();
            boolean isPartitionedSources = this.planFragment.isPartitionedSources(planNodeId);
            Preconditions.checkState(!this.noMoreSplits.containsKey(planNodeId), "noMoreSplits has already been set for %s", planNodeId);
            int i = 0;
            long j = 0;
            for (Split split : collection) {
                if (this.pendingSplits.put(planNodeId, new ScheduledSplit(this.nextSplitId.getAndIncrement(), planNodeId, split)) && isPartitionedSources) {
                    i++;
                    j = Math.addExact(j, split.getSplitWeight().getRawValue());
                }
            }
            if (isPartitionedSources) {
                this.pendingSourceSplitCount += i;
                this.pendingSourceSplitsWeight = Math.addExact(this.pendingSourceSplitsWeight, j);
                this.partitionedSplitCountTracker.setPartitionedSplits(getPartitionedSplitsInfo());
            }
            z = true;
        }
        updateSplitQueueSpace();
        if (z) {
            triggerUpdate();
        }
    }

    @Override // io.trino.execution.RemoteTask
    public synchronized void noMoreSplits(PlanNodeId planNodeId) {
        if (this.noMoreSplits.containsKey(planNodeId) || this.terminating.get()) {
            return;
        }
        this.noMoreSplits.put(planNodeId, true);
        triggerUpdate();
    }

    @Override // io.trino.execution.RemoteTask
    public void setOutputBuffers(OutputBuffers outputBuffers) {
        if (getTaskStatus().getState().isTerminatingOrDone() || this.terminating.get()) {
            return;
        }
        if (outputBuffers.getVersion() > this.outputBuffers.getAndUpdate(outputBuffers2 -> {
            return outputBuffers.getVersion() > outputBuffers2.getVersion() ? outputBuffers : outputBuffers2;
        }).getVersion()) {
            triggerUpdate();
        }
    }

    @Override // io.trino.execution.RemoteTask
    public void setSpeculative(boolean z) {
        Preconditions.checkArgument(!z, "we can only move task from speculative to non-speculative");
        if (this.speculative.compareAndSet(true, z)) {
            triggerUpdate();
        }
    }

    @Override // io.trino.execution.RemoteTask
    public PartitionedSplitsInfo getPartitionedSplitsInfo() {
        TaskStatus taskStatus = getTaskStatus();
        if (taskStatus.getState().isDone()) {
            return PartitionedSplitsInfo.forZeroSplits();
        }
        if (taskStatus.getState().isTerminating()) {
            return PartitionedSplitsInfo.forSplitCountAndWeightSum(taskStatus.getRunningPartitionedDrivers(), taskStatus.getRunningPartitionedSplitsWeight());
        }
        PartitionedSplitsInfo unacknowledgedPartitionedSplitsInfo = getUnacknowledgedPartitionedSplitsInfo();
        return PartitionedSplitsInfo.forSplitCountAndWeightSum(unacknowledgedPartitionedSplitsInfo.getCount() + taskStatus.getQueuedPartitionedDrivers() + taskStatus.getRunningPartitionedDrivers(), unacknowledgedPartitionedSplitsInfo.getWeightSum() + taskStatus.getQueuedPartitionedSplitsWeight() + taskStatus.getRunningPartitionedSplitsWeight());
    }

    public PartitionedSplitsInfo getUnacknowledgedPartitionedSplitsInfo() {
        return PartitionedSplitsInfo.forSplitCountAndWeightSum(this.pendingSourceSplitCount, this.pendingSourceSplitsWeight);
    }

    @Override // io.trino.execution.RemoteTask
    public PartitionedSplitsInfo getQueuedPartitionedSplitsInfo() {
        TaskStatus taskStatus = getTaskStatus();
        if (taskStatus.getState().isTerminatingOrDone()) {
            return PartitionedSplitsInfo.forZeroSplits();
        }
        PartitionedSplitsInfo unacknowledgedPartitionedSplitsInfo = getUnacknowledgedPartitionedSplitsInfo();
        return PartitionedSplitsInfo.forSplitCountAndWeightSum(unacknowledgedPartitionedSplitsInfo.getCount() + taskStatus.getQueuedPartitionedDrivers(), unacknowledgedPartitionedSplitsInfo.getWeightSum() + taskStatus.getQueuedPartitionedSplitsWeight());
    }

    @Override // io.trino.execution.RemoteTask
    public int getUnacknowledgedPartitionedSplitCount() {
        return getPendingSourceSplitCount();
    }

    @Override // io.trino.execution.RemoteTask
    public SpoolingOutputStats.Snapshot retrieveAndDropSpoolingOutputStats() {
        return this.taskInfoFetcher.retrieveAndDropSpoolingOutputStats();
    }

    private int getPendingSourceSplitCount() {
        return this.pendingSourceSplitCount;
    }

    private long getQueuedPartitionedSplitsWeight() {
        TaskStatus taskStatus = getTaskStatus();
        if (taskStatus.getState().isTerminatingOrDone()) {
            return 0L;
        }
        return getPendingSourceSplitsWeight() + taskStatus.getQueuedPartitionedSplitsWeight();
    }

    private long getPendingSourceSplitsWeight() {
        return this.pendingSourceSplitsWeight;
    }

    @Override // io.trino.execution.RemoteTask
    public void addStateChangeListener(StateMachine.StateChangeListener<TaskStatus> stateChangeListener) {
        SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});
        try {
            this.taskStatusFetcher.addStateChangeListener(stateChangeListener);
            setThreadName.close();
        } catch (Throwable th) {
            try {
                setThreadName.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Override // io.trino.execution.RemoteTask
    public void addFinalTaskInfoListener(StateMachine.StateChangeListener<TaskInfo> stateChangeListener) {
        this.taskInfoFetcher.addFinalTaskInfoListener(stateChangeListener);
    }

    @Override // io.trino.execution.RemoteTask
    public synchronized ListenableFuture<Void> whenSplitQueueHasSpace(long j) {
        if (this.whenSplitQueueHasSpaceThreshold.isPresent()) {
            Preconditions.checkArgument(j == this.whenSplitQueueHasSpaceThreshold.getAsLong(), "Multiple split queue space notification thresholds not supported");
        } else {
            this.whenSplitQueueHasSpaceThreshold = OptionalLong.of(j);
            updateSplitQueueSpace();
        }
        return this.splitQueueHasSpace ? Futures.immediateVoidFuture() : this.whenSplitQueueHasSpace.createNewListener();
    }

    private synchronized void updateSplitQueueSpace() {
        this.splitQueueHasSpace = getUnacknowledgedPartitionedSplitCount() < this.maxUnacknowledgedSplits && (this.whenSplitQueueHasSpaceThreshold.isEmpty() || getQueuedPartitionedSplitsWeight() < this.whenSplitQueueHasSpaceThreshold.getAsLong());
        if (this.splitQueueHasSpace && this.whenSplitQueueHasSpaceThreshold.isPresent()) {
            this.whenSplitQueueHasSpace.complete(null, this.executor);
        }
    }

    private synchronized void processTaskUpdate(TaskInfo taskInfo, List<SplitAssignment> list) {
        updateTaskInfo(taskInfo);
        for (SplitAssignment splitAssignment : list) {
            PlanNodeId planNodeId = splitAssignment.getPlanNodeId();
            boolean isPartitionedSources = this.planFragment.isPartitionedSources(planNodeId);
            int i = 0;
            long j = 0;
            for (ScheduledSplit scheduledSplit : splitAssignment.getSplits()) {
                if (this.pendingSplits.remove(planNodeId, scheduledSplit) && isPartitionedSources) {
                    i++;
                    j = Math.addExact(j, scheduledSplit.getSplit().getSplitWeight().getRawValue());
                }
            }
            if (splitAssignment.isNoMoreSplits()) {
                this.noMoreSplits.put(planNodeId, false);
            }
            if (isPartitionedSources) {
                this.pendingSourceSplitCount -= i;
                this.pendingSourceSplitsWeight -= j;
            }
        }
        if (this.pendingSourceSplitCount > 0) {
            this.pendingRequestsCounter.incrementAndGet();
        }
        this.partitionedSplitCountTracker.setPartitionedSplits(getPartitionedSplitsInfo());
        updateSplitQueueSpace();
    }

    private void updateTaskInfo(TaskInfo taskInfo) {
        this.taskStatusFetcher.updateTaskStatus(taskInfo.getTaskStatus());
        this.taskInfoFetcher.updateTaskInfo(taskInfo);
    }

    private void scheduleUpdate() {
        this.executor.execute(this::sendUpdate);
    }

    private void triggerUpdate() {
        if (this.started.get() && this.pendingRequestsCounter.getAndIncrement() == 0) {
            scheduleUpdate();
        }
    }

    @VisibleForTesting
    boolean adjustSplitBatchSize(List<SplitAssignment> list, long j, int i) {
        if ((j <= this.maxRequestSizeInBytes || i <= this.guaranteedSplitsPerRequest) && (j >= this.maxRequestSizeInBytes || i >= this.maxUnacknowledgedSplits)) {
            return false;
        }
        int i2 = i;
        int i3 = 0;
        Iterator<SplitAssignment> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            SplitAssignment next = it.next();
            if (this.planFragment.isPartitionedSources(next.getPlanNodeId())) {
                i3 = next.getSplits().size();
                break;
            }
        }
        if (i3 != 0) {
            i2 = Math.max(this.guaranteedSplitsPerRequest, Math.min(this.maxUnacknowledgedSplits, (int) ((i3 * (this.maxRequestSizeInBytes - this.requestSizeHeadroomInBytes)) / j)));
        }
        if (i2 != i) {
            log.debug("%s - Split batch size changed: prevSize=%s, newSize=%s", new Object[]{this.taskId, Integer.valueOf(i), Integer.valueOf(i2)});
            this.splitBatchSize.set(i2);
        }
        if (i3 <= i2 || j <= this.maxRequestSizeInBytes) {
            return false;
        }
        log.debug("%s - current taskUpdateRequestJson exceeded limit: %d, currentSplitBatchSize: %d, newSplitBatchSize: %d", new Object[]{this.taskId, Long.valueOf(j), Integer.valueOf(i), Integer.valueOf(i2)});
        return true;
    }

    private void sendUpdate() {
        try {
            sendUpdateInternal();
        } catch (Throwable th) {
            fatalUnacknowledgedFailure(new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "unexpected error calling sendUpdate()", th));
        }
    }

    private void sendUpdateInternal() {
        TaskStatus taskStatus = getTaskStatus();
        if (taskStatus.getState().isTerminatingOrDone() || this.terminating.get()) {
            return;
        }
        Preconditions.checkState(this.started.get());
        int i = this.pendingRequestsCounter.get();
        Preconditions.checkState(i > 0, "sendUpdate shouldn't be called without pending requests");
        ListenableFuture<Void> acquireRequestPermit = this.updateErrorTracker.acquireRequestPermit();
        if (!acquireRequestPermit.isDone()) {
            acquireRequestPermit.addListener(this::sendUpdate, this.executor);
            return;
        }
        int i2 = this.splitBatchSize.get();
        List<SplitAssignment> splitAssignments = getSplitAssignments(i2);
        DynamicFiltersCollector.VersionedDynamicFilterDomains acknowledgeAndGetNewDomains = this.outboundDynamicFiltersCollector.acknowledgeAndGetNewDomains(this.sentDynamicFiltersVersion.get());
        Optional of = this.sendPlan.get() ? Optional.of(this.planFragment.withoutEmbeddedJsonRepresentation()) : Optional.empty();
        byte[] jsonBytes = this.taskUpdateRequestCodec.toJsonBytes(new TaskUpdateRequest(this.session.toSessionRepresentation(), this.session.getIdentity().getExtraCredentials(), this.stageSpan, of, splitAssignments, this.outputBuffers.get(), acknowledgeAndGetNewDomains.getDynamicFilterDomains(), this.session.getExchangeEncryptionKey(), this.speculative.get()));
        if (this.adaptiveUpdateRequestSizeEnabled && adjustSplitBatchSize(splitAssignments, jsonBytes.length, i2)) {
            scheduleUpdate();
            return;
        }
        if (of.isPresent()) {
            this.stats.updateWithPlanBytes(jsonBytes.length);
        }
        if (!acknowledgeAndGetNewDomains.getDynamicFilterDomains().isEmpty()) {
            this.stats.updateWithDynamicFilterBytes(jsonBytes.length);
        }
        Request build = Request.Builder.preparePost().setUri(getHttpUriBuilder(taskStatus).build()).setHeader("Content-Type", MediaType.JSON_UTF_8.toString()).setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(jsonBytes)).setSpanBuilder(createSpanBuilder("task-update", this.span)).build();
        this.updateErrorTracker.startRequest();
        Future<?> executeAsync = this.httpClient.executeAsync(build, FullJsonResponseHandler.createFullJsonResponseHandler(this.taskInfoCodec));
        Preconditions.checkState(this.currentRequest.getAndSet(executeAsync) == null, "There should be no previous request running");
        Futures.addCallback(executeAsync, new SimpleHttpResponseHandler(new UpdateResponseHandler(splitAssignments, acknowledgeAndGetNewDomains.getVersion(), System.nanoTime(), i), build.getUri(), this.stats), this.executor);
    }

    private synchronized List<SplitAssignment> getSplitAssignments(int i) {
        return (List) Stream.concat(this.planFragment.getPartitionedSourceNodes().stream(), this.planFragment.getRemoteSourceNodes().stream()).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map((v0) -> {
            return v0.getId();
        }).map(planNodeId -> {
            return getSplitAssignment(planNodeId, i);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(ImmutableList.toImmutableList());
    }

    private synchronized SplitAssignment getSplitAssignment(PlanNodeId planNodeId, int i) {
        Set set = this.pendingSplits.get(planNodeId);
        boolean equals = Boolean.TRUE.equals(this.noMoreSplits.get(planNodeId));
        boolean containsKey = this.noMoreSplits.containsKey(planNodeId);
        if (this.planFragment.isPartitionedSources(planNodeId) && i < set.size()) {
            log.debug("%s - Splits are limited by splitBatchSize: splitBatchSize=%s, splits=%s, planNodeId=%s", new Object[]{this.taskId, Integer.valueOf(i), Integer.valueOf(set.size()), planNodeId});
            set = (Set) set.stream().sorted(Comparator.comparingLong((v0) -> {
                return v0.getSequenceId();
            })).limit(i).collect(Collectors.toCollection(LinkedHashSet::new));
            containsKey = false;
        }
        SplitAssignment splitAssignment = null;
        if (!set.isEmpty() || equals) {
            splitAssignment = new SplitAssignment(planNodeId, set, containsKey);
        }
        return splitAssignment;
    }

    @Override // io.trino.execution.RemoteTask
    public void abort() {
        if (this.terminating.compareAndSet(false, true)) {
            synchronized (this) {
                SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});
                try {
                    if (!getTaskStatus().getState().isTerminatingOrDone()) {
                        scheduleAsyncCleanupRequest("abort", true);
                    }
                    setThreadName.close();
                } finally {
                }
            }
        }
    }

    @Override // io.trino.execution.RemoteTask
    public void cancel() {
        if (this.terminating.compareAndSet(false, true)) {
            synchronized (this) {
                SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});
                try {
                    if (!getTaskStatus().getState().isTerminatingOrDone()) {
                        scheduleAsyncCleanupRequest("cancel", false);
                    }
                    setThreadName.close();
                } finally {
                }
            }
        }
    }

    private void cleanUpTask(TaskState taskState) {
        Preconditions.checkState(taskState.isTerminatingOrDone(), "attempt to clean up a task that is not terminating or done: %s", taskState);
        synchronized (this) {
            this.pendingSplits.clear();
            this.pendingSourceSplitCount = 0;
            this.pendingSourceSplitsWeight = 0L;
            this.partitionedSplitCountTracker.setPartitionedSplits(getPartitionedSplitsInfo());
            this.splitQueueHasSpace = true;
            this.whenSplitQueueHasSpace.complete(null, this.executor);
        }
        this.outboundDynamicFiltersCollector.acknowledge(Long.MAX_VALUE);
        if (taskState.isDone()) {
            this.taskStatusFetcher.stop();
            Future<?> andSet = this.currentRequest.getAndSet(null);
            if (andSet != null) {
                andSet.cancel(true);
            }
            scheduleAsyncCleanupRequest("cleanup", true);
            return;
        }
        long j = this.terminationStartedNanos.get();
        if (j != 0) {
            if (Duration.nanosSince(j).compareTo(this.taskTerminationTimeout) >= 0) {
                fatalUnacknowledgedFailure(new TrinoException(StandardErrorCode.REMOTE_TASK_ERROR, String.format("Task %s failed to terminate after %s, last known state: %s", this.taskId, this.taskTerminationTimeout, taskState)));
            }
        } else {
            long nanoTime = System.nanoTime();
            if (nanoTime == 0) {
                nanoTime = 1;
            }
            this.terminationStartedNanos.compareAndSet(0L, nanoTime);
        }
    }

    private void scheduleAsyncCleanupRequest(String str, boolean z) {
        scheduleAsyncCleanupRequest(str, () -> {
            return buildDeleteTaskRequest(z);
        });
    }

    private void scheduleAsyncCleanupRequest(String str, FailTaskRequest failTaskRequest) {
        scheduleAsyncCleanupRequest(str, () -> {
            return buildFailTaskRequest(failTaskRequest);
        });
    }

    private void scheduleAsyncCleanupRequest(String str, Supplier<Request> supplier) {
        if (!getTaskStatus().getState().isDone() || this.cleanedUp.compareAndSet(false, true)) {
            doScheduleAsyncCleanupRequest(new Backoff(this.maxErrorDuration), (Request) supplier.get(), str);
        }
    }

    private Request buildDeleteTaskRequest(boolean z) {
        return Request.Builder.prepareDelete().setUri(getHttpUriBuilder(getTaskStatus()).addParameter("abort", new String[]{z}).build()).setSpanBuilder(createSpanBuilder("task-delete", this.span)).build();
    }

    private Request buildFailTaskRequest(FailTaskRequest failTaskRequest) {
        return Request.Builder.preparePost().setUri(getHttpUriBuilder(getTaskStatus()).appendPath("fail").build()).setHeader("Content-Type", MediaType.JSON_UTF_8.toString()).setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(this.failTaskRequestCodec.toJsonBytes(failTaskRequest))).setSpanBuilder(createSpanBuilder("task-fail", this.span)).build();
    }

    private void doScheduleAsyncCleanupRequest(final Backoff backoff, final Request request, final String str) {
        Futures.addCallback(this.httpClient.executeAsync(request, FullJsonResponseHandler.createFullJsonResponseHandler(this.taskInfoCodec)), new FutureCallback<FullJsonResponseHandler.JsonResponse<TaskInfo>>() { // from class: io.trino.server.remotetask.HttpRemoteTask.1
            public void onSuccess(FullJsonResponseHandler.JsonResponse<TaskInfo> jsonResponse) {
                try {
                    HttpRemoteTask.this.updateTaskInfo((TaskInfo) jsonResponse.getValue());
                    TaskState state = HttpRemoteTask.this.getTaskInfo().getTaskStatus().getState();
                    if (state.isTerminatingOrDone()) {
                        return;
                    }
                    fatalAsyncCleanupFailure(new TrinoTransportException(StandardErrorCode.REMOTE_TASK_ERROR, HostAddress.fromUri(request.getUri()), String.format("Unable to %s task at %s, last known state was: %s", str, request.getUri(), state)));
                } catch (Throwable th) {
                    TaskState state2 = HttpRemoteTask.this.getTaskInfo().getTaskStatus().getState();
                    if (!state2.isTerminatingOrDone()) {
                        fatalAsyncCleanupFailure(new TrinoTransportException(StandardErrorCode.REMOTE_TASK_ERROR, HostAddress.fromUri(request.getUri()), String.format("Unable to %s task at %s, last known state was: %s", str, request.getUri(), state2)));
                    }
                    throw th;
                }
            }

            public void onFailure(Throwable th) {
                if (HttpRemoteTask.this.getTaskInfo().getTaskStatus().getState().isDone()) {
                    return;
                }
                if ((th instanceof RejectedExecutionException) && HttpRemoteTask.this.httpClient.isClosed()) {
                    String format = String.format("Unable to %s task at %s. HTTP client is closed.", str, request.getUri());
                    RequestErrorTracker.logError(th, format);
                    fatalAsyncCleanupFailure(new TrinoTransportException(StandardErrorCode.REMOTE_TASK_ERROR, HostAddress.fromUri(request.getUri()), format));
                } else {
                    if (backoff.failure()) {
                        String format2 = String.format("Unable to %s task at %s. Back off depleted.", str, request.getUri());
                        RequestErrorTracker.logError(th, format2);
                        fatalAsyncCleanupFailure(new TrinoTransportException(StandardErrorCode.REMOTE_TASK_ERROR, HostAddress.fromUri(request.getUri()), format2));
                        return;
                    }
                    long backoffDelayNanos = backoff.getBackoffDelayNanos();
                    if (backoffDelayNanos == 0) {
                        HttpRemoteTask.this.doScheduleAsyncCleanupRequest(backoff, request, str);
                        return;
                    }
                    ScheduledExecutorService scheduledExecutorService = HttpRemoteTask.this.errorScheduledExecutor;
                    Backoff backoff2 = backoff;
                    Request request2 = request;
                    String str2 = str;
                    scheduledExecutorService.schedule(() -> {
                        HttpRemoteTask.this.doScheduleAsyncCleanupRequest(backoff2, request2, str2);
                    }, backoffDelayNanos, TimeUnit.NANOSECONDS);
                }
            }

            private void fatalAsyncCleanupFailure(TrinoTransportException trinoTransportException) {
                synchronized (HttpRemoteTask.this) {
                    SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{HttpRemoteTask.this.taskId});
                    try {
                        TaskStatus taskStatus = HttpRemoteTask.this.getTaskStatus();
                        if (taskStatus.getState().isDone()) {
                            HttpRemoteTask.log.warn("Task %s already in terminal state %s; cannot overwrite with FAILED due to %s", new Object[]{taskStatus.getTaskId(), taskStatus.getState(), trinoTransportException});
                        } else {
                            taskStatus = TaskStatus.failWith(taskStatus, TaskState.FAILED, ImmutableList.builderWithExpectedSize(taskStatus.getFailures().size() + 1).add(Failures.toFailure(trinoTransportException)).addAll(taskStatus.getFailures()).build());
                        }
                        HttpRemoteTask.this.updateTaskInfo(HttpRemoteTask.this.getTaskInfo().withTaskStatus(taskStatus));
                        setThreadName.close();
                    } finally {
                    }
                }
            }
        }, this.executor);
    }

    private synchronized void fatalUnacknowledgedFailure(Throwable th) {
        SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});
        try {
            TaskStatus taskStatus = getTaskStatus();
            if (taskStatus.getState().isDone()) {
                updateTaskInfo(getTaskInfo().withTaskStatus(taskStatus));
            } else {
                TaskStatus failWith = TaskStatus.failWith(taskStatus, TaskState.FAILED, ImmutableList.builderWithExpectedSize(taskStatus.getFailures().size() + 1).add(Failures.toFailure(th)).addAll(taskStatus.getFailures()).build());
                if (th instanceof TrinoTransportException) {
                    updateTaskInfo(getTaskInfo().withTaskStatus(failWith));
                } else {
                    this.taskStatusFetcher.updateTaskStatus(failWith);
                }
            }
            setThreadName.close();
        } catch (Throwable th2) {
            try {
                setThreadName.close();
            } catch (Throwable th3) {
                th2.addSuppressed(th3);
            }
            throw th2;
        }
    }

    @Override // io.trino.execution.RemoteTask
    public void failRemotely(Throwable th) {
        if (this.terminating.compareAndSet(false, true)) {
            synchronized (this) {
                SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});
                try {
                    TaskStatus taskStatus = getTaskStatus();
                    if (!taskStatus.getState().isTerminatingOrDone()) {
                        log.debug(th, "Remote task %s failed with %s", new Object[]{taskStatus.getSelf(), th});
                        scheduleAsyncCleanupRequest("fail", new FailTaskRequest(Failures.toFailure(th)));
                    }
                    setThreadName.close();
                } finally {
                }
            }
        }
    }

    @Override // io.trino.execution.RemoteTask
    public void failLocallyImmediately(Throwable th) {
        Objects.requireNonNull(th, "cause is null");
        this.terminating.set(true);
        synchronized (this) {
            SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", new Object[]{this.taskId});
            try {
                TaskStatus taskStatus = getTaskStatus();
                if (!taskStatus.getState().isDone()) {
                    this.taskStatusFetcher.updateTaskStatus(TaskStatus.failWith(taskStatus, TaskState.FAILED, ImmutableList.builderWithExpectedSize(taskStatus.getFailures().size() + 1).add(Failures.toFailure(th)).addAll(taskStatus.getFailures()).build()));
                }
                setThreadName.close();
            } finally {
            }
        }
    }

    private HttpUriBuilder getHttpUriBuilder(TaskStatus taskStatus) {
        HttpUriBuilder uriBuilderFrom = HttpUriBuilder.uriBuilderFrom(taskStatus.getSelf());
        if (this.summarizeTaskInfo) {
            uriBuilderFrom.addParameter("summarize", new String[0]);
        }
        return uriBuilderFrom;
    }

    private SpanBuilder createSpanBuilder(String str, Span span) {
        return this.tracer.spanBuilder(str).setParent(Context.current().with(span)).setAttribute(TrinoAttributes.QUERY_ID, this.taskId.getQueryId().toString()).setAttribute(TrinoAttributes.STAGE_ID, this.taskId.getStageId().toString()).setAttribute(TrinoAttributes.TASK_ID, this.taskId.toString());
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).addValue(getTaskInfo()).toString();
    }
}
