package io.trino.execution.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.concurrent.MoreFutures;
import io.trino.execution.RemoteTask;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.ScheduleResult;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;
import io.trino.server.DynamicFilterService;
import io.trino.split.EmptySplit;
import io.trino.split.SplitSource;
import io.trino.sql.planner.plan.PlanNodeId;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BooleanSupplier;

/* loaded from: input_file:io/trino/execution/scheduler/SourcePartitionedScheduler.class */
public class SourcePartitionedScheduler implements SourceScheduler {
    private final StageExecution stageExecution;
    private final SplitSource splitSource;
    private final SplitPlacementPolicy splitPlacementPolicy;
    private final int splitBatchSize;
    private final PlanNodeId partitionedNode;
    private final DynamicFilterService dynamicFilterService;
    private final TableExecuteContextManager tableExecuteContextManager;
    private final BooleanSupplier anySourceTaskBlocked;
    private final PartitionIdAllocator partitionIdAllocator;
    private final Map<InternalNode, RemoteTask> scheduledTasks;
    private ListenableFuture<SplitSource.SplitBatch> nextSplitBatchFuture;
    private final Set<Split> pendingSplits = new HashSet();
    private ListenableFuture<Void> placementFuture = Futures.immediateVoidFuture();
    private State state = State.INITIALIZED;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/SourcePartitionedScheduler$State.class */
    public enum State {
        INITIALIZED,
        SPLITS_ADDED,
        SPLITS_SCHEDULED,
        FINISHED
    }

    private SourcePartitionedScheduler(StageExecution stageExecution, PlanNodeId planNodeId, SplitSource splitSource, SplitPlacementPolicy splitPlacementPolicy, int i, DynamicFilterService dynamicFilterService, TableExecuteContextManager tableExecuteContextManager, BooleanSupplier booleanSupplier, PartitionIdAllocator partitionIdAllocator, Map<InternalNode, RemoteTask> map) {
        this.stageExecution = (StageExecution) Objects.requireNonNull(stageExecution, "stageExecution is null");
        this.splitSource = (SplitSource) Objects.requireNonNull(splitSource, "splitSource is null");
        this.splitPlacementPolicy = (SplitPlacementPolicy) Objects.requireNonNull(splitPlacementPolicy, "splitPlacementPolicy is null");
        Preconditions.checkArgument(i > 0, "splitBatchSize must be at least one");
        this.splitBatchSize = i;
        this.partitionedNode = (PlanNodeId) Objects.requireNonNull(planNodeId, "partitionedNode is null");
        this.dynamicFilterService = (DynamicFilterService) Objects.requireNonNull(dynamicFilterService, "dynamicFilterService is null");
        this.tableExecuteContextManager = (TableExecuteContextManager) Objects.requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
        this.anySourceTaskBlocked = (BooleanSupplier) Objects.requireNonNull(booleanSupplier, "anySourceTaskBlocked is null");
        this.partitionIdAllocator = (PartitionIdAllocator) Objects.requireNonNull(partitionIdAllocator, "partitionIdAllocator is null");
        this.scheduledTasks = (Map) Objects.requireNonNull(map, "scheduledTasks is null");
    }

    @Override // io.trino.execution.scheduler.SourceScheduler
    public PlanNodeId getPlanNodeId() {
        return this.partitionedNode;
    }

    public static StageScheduler newSourcePartitionedSchedulerAsStageScheduler(StageExecution stageExecution, PlanNodeId planNodeId, SplitSource splitSource, SplitPlacementPolicy splitPlacementPolicy, int i, DynamicFilterService dynamicFilterService, TableExecuteContextManager tableExecuteContextManager, BooleanSupplier booleanSupplier) {
        return new StageScheduler() { // from class: io.trino.execution.scheduler.SourcePartitionedScheduler.1
            @Override // io.trino.execution.scheduler.StageScheduler
            public void start() {
                SourcePartitionedScheduler.this.start();
            }

            @Override // io.trino.execution.scheduler.StageScheduler
            public ScheduleResult schedule() {
                return SourcePartitionedScheduler.this.schedule();
            }

            @Override // io.trino.execution.scheduler.StageScheduler, java.io.Closeable, java.lang.AutoCloseable
            public void close() {
                SourcePartitionedScheduler.this.close();
            }
        };
    }

    public static SourceScheduler newSourcePartitionedSchedulerAsSourceScheduler(StageExecution stageExecution, PlanNodeId planNodeId, SplitSource splitSource, SplitPlacementPolicy splitPlacementPolicy, int i, DynamicFilterService dynamicFilterService, TableExecuteContextManager tableExecuteContextManager, BooleanSupplier booleanSupplier, PartitionIdAllocator partitionIdAllocator, Map<InternalNode, RemoteTask> map) {
        return new SourcePartitionedScheduler(stageExecution, planNodeId, splitSource, splitPlacementPolicy, i, dynamicFilterService, tableExecuteContextManager, booleanSupplier, partitionIdAllocator, map);
    }

    @Override // io.trino.execution.scheduler.SourceScheduler
    public synchronized void start() {
        if (this.dynamicFilterService.isCollectingTaskNeeded(this.stageExecution.getStageId().getQueryId(), this.stageExecution.getFragment())) {
            this.stageExecution.beginScheduling();
            createTaskOnRandomNode();
        }
    }

    @Override // io.trino.execution.scheduler.SourceScheduler
    public synchronized ScheduleResult schedule() {
        if (this.state == State.FINISHED) {
            return new ScheduleResult(true, ImmutableSet.of(), 0);
        }
        int i = 0;
        Multimap<InternalNode, Split> of = ImmutableMultimap.of();
        ImmutableSet.Builder builder = ImmutableSet.builder();
        Optional empty = Optional.empty();
        boolean z = false;
        boolean z2 = false;
        if (this.state == State.SPLITS_SCHEDULED) {
            Verify.verify(this.nextSplitBatchFuture == null);
        } else if (this.pendingSplits.isEmpty()) {
            if (this.nextSplitBatchFuture == null) {
                this.nextSplitBatchFuture = this.splitSource.getNextBatch(this.splitBatchSize);
                long nanoTime = System.nanoTime();
                MoreFutures.addSuccessCallback(this.nextSplitBatchFuture, () -> {
                    this.stageExecution.recordGetSplitTime(nanoTime);
                });
            }
            if (this.nextSplitBatchFuture.isDone()) {
                SplitSource.SplitBatch splitBatch = (SplitSource.SplitBatch) MoreFutures.getFutureValue(this.nextSplitBatchFuture);
                this.nextSplitBatchFuture = null;
                this.pendingSplits.addAll(splitBatch.getSplits());
                if (splitBatch.isLastBatch()) {
                    if (this.state == State.INITIALIZED && this.pendingSplits.isEmpty()) {
                        this.pendingSplits.add(new Split(this.splitSource.getCatalogHandle(), new EmptySplit(this.splitSource.getCatalogHandle())));
                    }
                    this.state = State.SPLITS_SCHEDULED;
                }
            } else {
                empty = Optional.of(asVoid(this.nextSplitBatchFuture));
                z2 = true;
            }
        }
        if (!this.pendingSplits.isEmpty() && this.state == State.INITIALIZED) {
            this.state = State.SPLITS_ADDED;
        }
        if (empty.isEmpty() && !this.pendingSplits.isEmpty()) {
            if (this.placementFuture.isDone()) {
                SplitPlacementResult computeAssignments = this.splitPlacementPolicy.computeAssignments(this.pendingSplits);
                of = computeAssignments.getAssignments();
                Collection values = of.values();
                Set<Split> set = this.pendingSplits;
                Objects.requireNonNull(set);
                values.forEach((v1) -> {
                    r1.remove(v1);
                });
                i = 0 + of.size();
                if (!this.pendingSplits.isEmpty()) {
                    this.placementFuture = computeAssignments.getBlocked();
                    empty = Optional.of(this.placementFuture);
                    z = true;
                }
            } else {
                empty = Optional.of(this.placementFuture);
                z = true;
            }
        }
        builder.addAll(assignSplits(of));
        if (this.pendingSplits.isEmpty() && this.state == State.SPLITS_SCHEDULED) {
            this.state = State.FINISHED;
            this.splitSource.getTableExecuteSplitsInfo().ifPresent(list -> {
                this.tableExecuteContextManager.getTableExecuteContextForQuery(this.stageExecution.getStageId().getQueryId()).setSplitsInfo(list);
            });
            this.splitSource.close();
            return new ScheduleResult(true, builder.build(), i);
        }
        if (empty.isEmpty()) {
            return new ScheduleResult(false, builder.build(), i);
        }
        if (this.anySourceTaskBlocked.getAsBoolean()) {
            this.dynamicFilterService.unblockStageDynamicFilters(this.stageExecution.getStageId().getQueryId(), this.stageExecution.getAttemptId(), this.stageExecution.getFragment());
            if (z) {
                builder.addAll(finalizeTaskCreationIfNecessary());
            }
        }
        return new ScheduleResult(false, (Iterable<? extends RemoteTask>) builder.build(), (ListenableFuture<Void>) Futures.nonCancellationPropagating((ListenableFuture) empty.get()), z2 ? ScheduleResult.BlockedReason.WAITING_FOR_SOURCE : ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL, i);
    }

    private static <T> ListenableFuture<Void> asVoid(ListenableFuture<T> listenableFuture) {
        return Futures.transform(listenableFuture, obj -> {
            return null;
        }, MoreExecutors.directExecutor());
    }

    @Override // io.trino.execution.scheduler.SourceScheduler
    public void close() {
        this.splitSource.close();
    }

    private Set<RemoteTask> assignSplits(Multimap<InternalNode, Split> multimap) {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        UnmodifiableIterator it = ImmutableSet.builder().addAll(multimap.keySet()).build().iterator();
        while (it.hasNext()) {
            InternalNode internalNode = (InternalNode) it.next();
            Multimap<PlanNodeId, Split> build = ImmutableMultimap.builder().putAll(this.partitionedNode, multimap.get(internalNode)).build();
            RemoteTask remoteTask = this.scheduledTasks.get(internalNode);
            if (remoteTask != null) {
                remoteTask.addSplits(build);
            } else {
                Optional<RemoteTask> scheduleTask = scheduleTask(internalNode, build);
                Objects.requireNonNull(builder);
                scheduleTask.ifPresent((v1) -> {
                    r1.add(v1);
                });
            }
        }
        return builder.build();
    }

    private void createTaskOnRandomNode() {
        Preconditions.checkState(this.scheduledTasks.isEmpty(), "Stage task is already scheduled on node");
        List<InternalNode> allNodes = this.splitPlacementPolicy.allNodes();
        Preconditions.checkState(allNodes.size() > 0, "No nodes available");
        scheduleTask(allNodes.get(ThreadLocalRandom.current().nextInt(0, allNodes.size())), ImmutableMultimap.of());
    }

    private Set<RemoteTask> finalizeTaskCreationIfNecessary() {
        if (this.stageExecution.getFragment().isLeaf()) {
            return ImmutableSet.of();
        }
        this.splitPlacementPolicy.lockDownNodes();
        Set<RemoteTask> set = (Set) this.splitPlacementPolicy.allNodes().stream().filter(internalNode -> {
            return !this.scheduledTasks.containsKey(internalNode);
        }).map(internalNode2 -> {
            return scheduleTask(internalNode2, ImmutableMultimap.of());
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(ImmutableSet.toImmutableSet());
        this.stageExecution.transitionToSchedulingSplits();
        return set;
    }

    private Optional<RemoteTask> scheduleTask(InternalNode internalNode, Multimap<PlanNodeId, Split> multimap) {
        Optional<RemoteTask> scheduleTask = this.stageExecution.scheduleTask(internalNode, this.partitionIdAllocator.getNextId(), multimap);
        scheduleTask.ifPresent(remoteTask -> {
            this.scheduledTasks.put(internalNode, remoteTask);
        });
        return scheduleTask;
    }
}
