package io.trino.execution.scheduler;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import io.airlift.concurrent.Threads;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.opentelemetry.api.trace.Span;
import io.trino.client.NodeVersion;
import io.trino.cost.StatsAndCosts;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.RemoteTask;
import io.trino.execution.StageId;
import io.trino.execution.StateMachine;
import io.trino.execution.TaskId;
import io.trino.execution.TaskState;
import io.trino.execution.TaskStatus;
import io.trino.execution.TestingRemoteTaskFactory;
import io.trino.execution.buffer.OutputBufferStatus;
import io.trino.execution.scheduler.StageExecution;
import io.trino.metadata.InMemoryNodeManager;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;
import io.trino.spi.type.VarcharType;
import io.trino.sql.planner.Partitioning;
import io.trino.sql.planner.PartitioningScheme;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.testing.TestingHandles;
import io.trino.testing.TestingMetadata;
import io.trino.testing.TestingSession;
import io.trino.util.FinalizerService;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:io/trino/execution/scheduler/TestScaledWriterScheduler.class */
public class TestScaledWriterScheduler {
    private static final PlanNodeId TABLE_SCAN_NODE_ID = new PlanNodeId("plan_id");
    private static final InternalNode NODE_1 = new InternalNode("node-1", URI.create("http://127.0.0.1:11"), new NodeVersion("version-1"), false);
    private static final InternalNode NODE_2 = new InternalNode("node-2", URI.create("http://127.0.0.1:12"), new NodeVersion("version-1"), false);
    private static final InternalNode NODE_3 = new InternalNode("node-3", URI.create("http://127.0.0.1:13"), new NodeVersion("version-1"), false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/execution/scheduler/TestScaledWriterScheduler$TestingStageExecution.class */
    public static class TestingStageExecution implements StageExecution {
        private final PlanFragment fragment;

        public TestingStageExecution(PlanFragment planFragment) {
            this.fragment = (PlanFragment) Objects.requireNonNull(planFragment, "fragment is null");
        }

        public PlanFragment getFragment() {
            throw new UnsupportedOperationException();
        }

        public boolean isAnyTaskBlocked() {
            throw new UnsupportedOperationException();
        }

        public StageExecution.State getState() {
            throw new UnsupportedOperationException();
        }

        public void addStateChangeListener(StateMachine.StateChangeListener<StageExecution.State> stateChangeListener) {
            throw new UnsupportedOperationException();
        }

        public StageId getStageId() {
            throw new UnsupportedOperationException();
        }

        public int getAttemptId() {
            throw new UnsupportedOperationException();
        }

        public Span getStageSpan() {
            throw new UnsupportedOperationException();
        }

        public void beginScheduling() {
            throw new UnsupportedOperationException();
        }

        public void transitionToSchedulingSplits() {
            throw new UnsupportedOperationException();
        }

        public TaskLifecycleListener getTaskLifecycleListener() {
            throw new UnsupportedOperationException();
        }

        public void schedulingComplete() {
            throw new UnsupportedOperationException();
        }

        public void schedulingComplete(PlanNodeId planNodeId) {
            throw new UnsupportedOperationException();
        }

        public void cancel() {
            throw new UnsupportedOperationException();
        }

        public void abort() {
            throw new UnsupportedOperationException();
        }

        public void recordGetSplitTime(long j) {
            throw new UnsupportedOperationException();
        }

        public Optional<RemoteTask> scheduleTask(InternalNode internalNode, int i, Multimap<PlanNodeId, Split> multimap) {
            return Optional.of(new TestingRemoteTaskFactory.TestingRemoteTask(TaskId.valueOf("taskId"), "nodeId", this.fragment));
        }

        public void failTask(TaskId taskId, Throwable th) {
            throw new UnsupportedOperationException();
        }

        public List<RemoteTask> getAllTasks() {
            throw new UnsupportedOperationException();
        }

        public List<TaskStatus> getTaskStatuses() {
            throw new UnsupportedOperationException();
        }

        public Optional<ExecutionFailureInfo> getFailureCause() {
            throw new UnsupportedOperationException();
        }
    }

    @Test
    public void testGetNewTaskCountWithUnderutilizedTasksWithoutSkewness() {
        Assert.assertEquals(buildScaleWriterSchedulerWithInitialTasks(buildTaskStatus(true, 12345L), buildTaskStatus(false, 12345L), buildTaskStatus(false, 12345L)).schedule().getNewTasks().size(), 0);
    }

    @Test
    public void testGetNewTaskCountWithOverutilizedTasksWithoutSkewness() {
        Assert.assertEquals(buildScaleWriterSchedulerWithInitialTasks(buildTaskStatus(true, 12345L), buildTaskStatus(true, 12345L), buildTaskStatus(false, 12345L)).schedule().getNewTasks().size(), 1);
    }

    @Test
    public void testGetNewTaskCountWithOverutilizedSkewedTaskAndUnderutilizedNonSkewedTasks() {
        Assert.assertEquals(buildScaleWriterSchedulerWithInitialTasks(buildTaskStatus(true, 1234567L), buildTaskStatus(false, 12345L), buildTaskStatus(false, 123456L)).schedule().getNewTasks().size(), 1);
    }

    @Test
    public void testGetNewTaskCountWithUnderutilizedSkewedTaskAndOverutilizedNonSkewedTasks() {
        Assert.assertEquals(buildScaleWriterSchedulerWithInitialTasks(buildTaskStatus(true, 12345L), buildTaskStatus(true, 123456L), buildTaskStatus(false, 1234567L)).schedule().getNewTasks().size(), 1);
    }

    @Test
    public void testGetNewTaskCountWhenWriterDataProcessedIsGreaterThanMinForScaleUp() {
        Assert.assertEquals(buildScaleWriterSchedulerWithInitialTasks(buildTaskStatus(1, DataSize.of(32L, DataSize.Unit.MEGABYTE)), buildTaskStatus(1, DataSize.of(32L, DataSize.Unit.MEGABYTE)), buildTaskStatus(2, DataSize.of(64L, DataSize.Unit.MEGABYTE))).schedule().getNewTasks().size(), 1);
    }

    @Test
    public void testGetNewTaskCountWhenWriterDataProcessedIsLessThanMinForScaleUp() {
        Assert.assertEquals(buildScaleWriterSchedulerWithInitialTasks(buildTaskStatus(1, DataSize.of(32L, DataSize.Unit.MEGABYTE)), buildTaskStatus(1, DataSize.of(32L, DataSize.Unit.MEGABYTE)), buildTaskStatus(2, DataSize.of(32L, DataSize.Unit.MEGABYTE))).schedule().getNewTasks().size(), 0);
    }

    @Test
    public void testGetNewTaskCountWhenExistingWriterTaskMaxWriterCountIsEmpty() {
        Assert.assertEquals(buildScaleWriterSchedulerWithInitialTasks(buildTaskStatus(1, DataSize.of(32L, DataSize.Unit.MEGABYTE)), buildTaskStatus(2, DataSize.of(100L, DataSize.Unit.MEGABYTE)), buildTaskStatus(true, 12345L, Optional.empty(), DataSize.of(0L, DataSize.Unit.MEGABYTE))).schedule().getNewTasks().size(), 0);
    }

    @Test
    public void testNewTaskCountWhenNodesUpperLimitIsNotExceeded() {
        ScaledWriterScheduler buildScaledWriterScheduler = buildScaledWriterScheduler(new AtomicReference<>(ImmutableList.of(buildTaskStatus(true, 123456L))), 2);
        buildScaledWriterScheduler.schedule();
        Assert.assertEquals(buildScaledWriterScheduler.schedule().getNewTasks().size(), 1);
    }

    @Test
    public void testNewTaskCountWhenNodesUpperLimitIsExceeded() {
        ScaledWriterScheduler buildScaledWriterScheduler = buildScaledWriterScheduler(new AtomicReference<>(ImmutableList.of(buildTaskStatus(true, 123456L))), 1);
        buildScaledWriterScheduler.schedule();
        Assert.assertEquals(buildScaledWriterScheduler.schedule().getNewTasks().size(), 0);
    }

    private ScaledWriterScheduler buildScaleWriterSchedulerWithInitialTasks(TaskStatus taskStatus, TaskStatus taskStatus2, TaskStatus taskStatus3) {
        AtomicReference<List<TaskStatus>> atomicReference = new AtomicReference<>(ImmutableList.of());
        ScaledWriterScheduler buildScaledWriterScheduler = buildScaledWriterScheduler(atomicReference, 100);
        Assert.assertEquals(buildScaledWriterScheduler.schedule().getNewTasks().size(), 1);
        atomicReference.set(ImmutableList.of(taskStatus));
        Assert.assertEquals(buildScaledWriterScheduler.schedule().getNewTasks().size(), 1);
        atomicReference.set(ImmutableList.of(taskStatus, taskStatus2));
        Assert.assertEquals(buildScaledWriterScheduler.schedule().getNewTasks().size(), 1);
        atomicReference.set(ImmutableList.of(taskStatus, taskStatus2, taskStatus3));
        return buildScaledWriterScheduler;
    }

    private ScaledWriterScheduler buildScaledWriterScheduler(AtomicReference<List<TaskStatus>> atomicReference, int i) {
        TestingStageExecution testingStageExecution = new TestingStageExecution(createFragment());
        Objects.requireNonNull(atomicReference);
        Supplier supplier = atomicReference::get;
        Objects.requireNonNull(atomicReference);
        return new ScaledWriterScheduler(testingStageExecution, supplier, atomicReference::get, new UniformNodeSelectorFactory(new InMemoryNodeManager(new InternalNode[]{NODE_1, NODE_2, NODE_3}), new NodeSchedulerConfig().setIncludeCoordinator(true), new NodeTaskMap(new FinalizerService())).createNodeSelector(TestingSession.testSessionBuilder().build(), Optional.empty()), Executors.newScheduledThreadPool(10, Threads.threadsNamed("task-notification-%s")), DataSize.of(32L, DataSize.Unit.MEGABYTE), i);
    }

    private static TaskStatus buildTaskStatus(boolean z, long j) {
        return buildTaskStatus(z, j, Optional.of(1), DataSize.of(32L, DataSize.Unit.MEGABYTE));
    }

    private static TaskStatus buildTaskStatus(int i, DataSize dataSize) {
        return buildTaskStatus(true, 12345L, Optional.of(Integer.valueOf(i)), dataSize);
    }

    private static TaskStatus buildTaskStatus(boolean z, long j, Optional<Integer> optional, DataSize dataSize) {
        return new TaskStatus(TaskId.valueOf("taskId"), "task-instance-id", 0L, TaskState.RUNNING, URI.create("fake://task/taskId/node/some_node"), "some_node", false, ImmutableList.of(), 0, 0, new OutputBufferStatus(OptionalLong.empty(), z, false), DataSize.ofBytes(j), dataSize, DataSize.of(1L, DataSize.Unit.MEGABYTE), optional, DataSize.of(1L, DataSize.Unit.MEGABYTE), DataSize.of(1L, DataSize.Unit.MEGABYTE), DataSize.of(0L, DataSize.Unit.MEGABYTE), 0L, Duration.valueOf("0s"), 0L, 1L, 1L);
    }

    private static PlanFragment createFragment() {
        Symbol symbol = new Symbol("column");
        return new PlanFragment(new PlanFragmentId("plan_id"), TableScanNode.newInstance(TABLE_SCAN_NODE_ID, TestingHandles.TEST_TABLE_HANDLE, ImmutableList.of(symbol), ImmutableMap.of(symbol, new TestingMetadata.TestingColumnHandle("column")), false, Optional.empty()), ImmutableMap.of(symbol, VarcharType.VARCHAR), SystemPartitioningHandle.SOURCE_DISTRIBUTION, Optional.empty(), ImmutableList.of(TABLE_SCAN_NODE_ID), new PartitioningScheme(Partitioning.create(SystemPartitioningHandle.SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol)), StatsAndCosts.empty(), ImmutableList.of(), Optional.empty());
    }
}
