/*
 * Decompiled with CFR 0.152.
 */
package io.trino.operator;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.airlift.concurrent.Threads;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.node.NodeInfo;
import io.trino.FeaturesConfig;
import io.trino.RowPagesBuilder;
import io.trino.Session;
import io.trino.SessionTestUtils;
import io.trino.cache.SafeCaches;
import io.trino.exchange.DirectExchangeInput;
import io.trino.exchange.ExchangeInput;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.buffer.PagesSerdeFactory;
import io.trino.execution.buffer.TestingPagesSerdeFactory;
import io.trino.metadata.Split;
import io.trino.operator.DirectExchangeClientConfig;
import io.trino.operator.DirectExchangeClientFactory;
import io.trino.operator.DirectExchangeClientSupplier;
import io.trino.operator.DriverContext;
import io.trino.operator.ExchangeOperator;
import io.trino.operator.MergeOperator;
import io.trino.operator.Operator;
import io.trino.operator.OperatorAssertion;
import io.trino.operator.OperatorStats;
import io.trino.operator.PageAssertions;
import io.trino.operator.TestingExchangeHttpClientHandler;
import io.trino.operator.TestingTaskBuffer;
import io.trino.spi.Page;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.SortOrder;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
import io.trino.split.RemoteSplit;
import io.trino.sql.gen.OrderingCompiler;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.testing.TestingTaskContext;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestMergeOperator {
    private static final TaskId TASK_1_ID = new TaskId(new StageId("query", 0), 0, 0);
    private static final TaskId TASK_2_ID = new TaskId(new StageId("query", 0), 1, 0);
    private static final TaskId TASK_3_ID = new TaskId(new StageId("query", 0), 2, 0);
    private final AtomicInteger operatorId = new AtomicInteger();
    private ScheduledExecutorService executor;
    private PagesSerdeFactory serdeFactory;
    private HttpClient httpClient;
    private DirectExchangeClientFactory exchangeClientFactory;
    private OrderingCompiler orderingCompiler;
    private LoadingCache<TaskId, TestingTaskBuffer> taskBuffers;

    @BeforeMethod
    public void setUp() {
        this.executor = Executors.newSingleThreadScheduledExecutor(Threads.daemonThreadsNamed((String)"test-merge-operator-%s"));
        this.serdeFactory = new TestingPagesSerdeFactory();
        this.taskBuffers = SafeCaches.buildNonEvictableCache((CacheBuilder)CacheBuilder.newBuilder(), (CacheLoader)CacheLoader.from(TestingTaskBuffer::new));
        this.httpClient = new TestingHttpClient((TestingHttpClient.Processor)new TestingExchangeHttpClientHandler(this.taskBuffers, this.serdeFactory), (ExecutorService)this.executor);
        this.exchangeClientFactory = new DirectExchangeClientFactory(new NodeInfo("test"), new FeaturesConfig(), new DirectExchangeClientConfig(), this.httpClient, this.executor, new ExchangeManagerRegistry());
        this.orderingCompiler = new OrderingCompiler(new TypeOperators());
    }

    @AfterMethod(alwaysRun=true)
    public void tearDown() {
        this.serdeFactory = null;
        this.orderingCompiler = null;
        this.httpClient.close();
        this.httpClient = null;
        this.executor.shutdownNow();
        this.executor = null;
        this.exchangeClientFactory.stop();
        this.exchangeClientFactory = null;
    }

    @Test
    public void testSingleStream() throws Exception {
        ImmutableList types = ImmutableList.of((Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        MergeOperator operator = this.createMergeOperator((List<Type>)types, (List<Integer>)ImmutableList.of((Object)1), (List<Integer>)ImmutableList.of((Object)0, (Object)1), (List<SortOrder>)ImmutableList.of((Object)SortOrder.ASC_NULLS_FIRST, (Object)SortOrder.ASC_NULLS_FIRST));
        Assert.assertFalse((boolean)operator.isFinished());
        Assert.assertFalse((boolean)operator.isBlocked().isDone());
        operator.addSplit(TestMergeOperator.createRemoteSplit(TASK_1_ID));
        Assert.assertFalse((boolean)operator.isFinished());
        Assert.assertFalse((boolean)operator.isBlocked().isDone());
        operator.noMoreSplits();
        List<Page> input = RowPagesBuilder.rowPagesBuilder((Iterable<Type>)types).row(1, 1).row(2, 2).pageBreak().row(3, 3).row(4, 4).build();
        Assert.assertNull((Object)operator.getOutput());
        Assert.assertFalse((boolean)operator.isFinished());
        OperatorAssertion.assertOperatorIsBlocked((Operator)operator);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_1_ID)).addPage(input.get(0), false);
        OperatorAssertion.assertOperatorIsUnblocked((Operator)operator);
        Assert.assertNull((Object)operator.getOutput());
        OperatorAssertion.assertOperatorIsBlocked((Operator)operator);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_1_ID)).addPage(input.get(1), true);
        OperatorAssertion.assertOperatorIsUnblocked((Operator)operator);
        Page expected = RowPagesBuilder.rowPagesBuilder(new Type[]{BigintType.BIGINT}).row(1).row(2).row(3).row(4).build().get(0);
        PageAssertions.assertPageEquals((List<? extends Type>)ImmutableList.of((Object)BigintType.BIGINT), (Page)Iterables.getOnlyElement(TestMergeOperator.pullAvailablePages((Operator)operator)), expected);
        operator.close();
    }

    @Test
    public void testMergeDifferentTypes() throws Exception {
        ImmutableList types = ImmutableList.of((Object)BigintType.BIGINT, (Object)IntegerType.INTEGER);
        MergeOperator operator = this.createMergeOperator((List<Type>)types, (List<Integer>)ImmutableList.of((Object)1, (Object)0), (List<Integer>)ImmutableList.of((Object)1, (Object)0), (List<SortOrder>)ImmutableList.of((Object)SortOrder.DESC_NULLS_FIRST, (Object)SortOrder.ASC_NULLS_FIRST));
        operator.addSplit(TestMergeOperator.createRemoteSplit(TASK_1_ID));
        operator.addSplit(TestMergeOperator.createRemoteSplit(TASK_2_ID));
        operator.noMoreSplits();
        List<Page> task1Pages = RowPagesBuilder.rowPagesBuilder((Iterable<Type>)types).row(0, null).row(1, 4).row(2, 3).build();
        List<Page> task2Pages = RowPagesBuilder.rowPagesBuilder((Iterable<Type>)types).row(null, 5).row(2, 5).row(4, 3).build();
        Assert.assertNull((Object)operator.getOutput());
        OperatorAssertion.assertOperatorIsBlocked((Operator)operator);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_1_ID)).addPages(task1Pages, true);
        OperatorAssertion.assertOperatorIsUnblocked((Operator)operator);
        Assert.assertNull((Object)operator.getOutput());
        OperatorAssertion.assertOperatorIsBlocked((Operator)operator);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_2_ID)).addPages(task2Pages, true);
        OperatorAssertion.assertOperatorIsUnblocked((Operator)operator);
        ImmutableList outputTypes = ImmutableList.of((Object)IntegerType.INTEGER, (Object)BigintType.BIGINT);
        Page expected = RowPagesBuilder.rowPagesBuilder((Iterable<Type>)outputTypes).row(null, 0).row(5, null).row(5, 2).row(4, 1).row(3, 2).row(3, 4).build().get(0);
        PageAssertions.assertPageEquals((List<? extends Type>)outputTypes, (Page)Iterables.getOnlyElement(TestMergeOperator.pullAvailablePages((Operator)operator)), expected);
        operator.close();
    }

    @Test
    public void testMultipleStreamsSameOutputColumns() throws Exception {
        ImmutableList types = ImmutableList.of((Object)BigintType.BIGINT, (Object)BigintType.BIGINT, (Object)BigintType.BIGINT);
        MergeOperator operator = this.createMergeOperator((List<Type>)types, (List<Integer>)ImmutableList.of((Object)0, (Object)1, (Object)2), (List<Integer>)ImmutableList.of((Object)0), (List<SortOrder>)ImmutableList.of((Object)SortOrder.ASC_NULLS_FIRST));
        operator.addSplit(TestMergeOperator.createRemoteSplit(TASK_1_ID));
        operator.addSplit(TestMergeOperator.createRemoteSplit(TASK_2_ID));
        operator.addSplit(TestMergeOperator.createRemoteSplit(TASK_3_ID));
        operator.noMoreSplits();
        List<Page> source1Pages = RowPagesBuilder.rowPagesBuilder((Iterable<Type>)types).row(1, 1, 2).row(8, 1, 1).row(19, 1, 3).row(27, 1, 4).row(41, 2, 5).pageBreak().row(55, 1, 2).row(89, 1, 3).row(101, 1, 4).row(202, 1, 3).row(399, 2, 2).pageBreak().row(400, 1, 1).row(401, 1, 7).row(402, 1, 6).build();
        List<Page> source2Pages = RowPagesBuilder.rowPagesBuilder((Iterable<Type>)types).row(2, 1, 2).row(8, 1, 1).row(19, 1, 3).row(25, 1, 4).row(26, 2, 5).pageBreak().row(56, 1, 2).row(66, 1, 3).row(77, 1, 4).row(88, 1, 3).row(99, 2, 2).pageBreak().row(99, 1, 1).row(100, 1, 7).row(100, 1, 6).build();
        List<Page> source3Pages = RowPagesBuilder.rowPagesBuilder((Iterable<Type>)types).row(88, 1, 3).row(89, 1, 3).row(90, 1, 3).row(91, 1, 4).row(92, 2, 5).pageBreak().row(93, 1, 2).row(94, 1, 3).row(95, 1, 4).row(97, 1, 3).row(98, 2, 2).build();
        Assert.assertNull((Object)operator.getOutput());
        Assert.assertFalse((boolean)operator.isFinished());
        OperatorAssertion.assertOperatorIsBlocked((Operator)operator);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_1_ID)).addPage(source1Pages.get(0), false);
        OperatorAssertion.assertOperatorIsUnblocked((Operator)operator);
        Assert.assertNull((Object)operator.getOutput());
        OperatorAssertion.assertOperatorIsBlocked((Operator)operator);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_2_ID)).addPage(source2Pages.get(0), false);
        OperatorAssertion.assertOperatorIsUnblocked((Operator)operator);
        Assert.assertNull((Object)operator.getOutput());
        OperatorAssertion.assertOperatorIsBlocked((Operator)operator);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_3_ID)).addPage(source3Pages.get(0), false);
        OperatorAssertion.assertOperatorIsUnblocked((Operator)operator);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_1_ID)).addPage(source1Pages.get(1), false);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_2_ID)).addPage(source2Pages.get(1), false);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_3_ID)).addPage(source3Pages.get(1), true);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_2_ID)).addPage(source2Pages.get(2), true);
        ((TestingTaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_1_ID)).addPage(source1Pages.get(2), true);
        Page expected = RowPagesBuilder.rowPagesBuilder((Iterable<Type>)types).row(1, 1, 2).row(2, 1, 2).row(8, 1, 1).row(8, 1, 1).row(19, 1, 3).row(19, 1, 3).row(25, 1, 4).row(26, 2, 5).row(27, 1, 4).row(41, 2, 5).row(55, 1, 2).row(56, 1, 2).row(66, 1, 3).row(77, 1, 4).row(88, 1, 3).row(88, 1, 3).row(89, 1, 3).row(89, 1, 3).row(90, 1, 3).row(91, 1, 4).row(92, 2, 5).row(93, 1, 2).row(94, 1, 3).row(95, 1, 4).row(97, 1, 3).row(98, 2, 2).row(99, 2, 2).row(99, 1, 1).row(100, 1, 7).row(100, 1, 6).row(101, 1, 4).row(202, 1, 3).row(399, 2, 2).row(400, 1, 1).row(401, 1, 7).row(402, 1, 6).build().get(0);
        PageAssertions.assertPageEquals((List<? extends Type>)types, (Page)Iterables.getOnlyElement(TestMergeOperator.pullAvailablePages((Operator)operator)), expected);
        operator.close();
    }

    private MergeOperator createMergeOperator(List<Type> sourceTypes, List<Integer> outputChannels, List<Integer> sortChannels, List<SortOrder> sortOrder) {
        int mergeOperatorId = this.operatorId.getAndIncrement();
        MergeOperator.MergeOperatorFactory factory = new MergeOperator.MergeOperatorFactory(mergeOperatorId, new PlanNodeId("plan_node_id" + mergeOperatorId), (DirectExchangeClientSupplier)this.exchangeClientFactory, this.serdeFactory, this.orderingCompiler, sourceTypes, outputChannels, sortChannels, sortOrder);
        DriverContext driverContext = TestingTaskContext.createTaskContext((Executor)this.executor, (ScheduledExecutorService)this.executor, (Session)SessionTestUtils.TEST_SESSION).addPipelineContext(0, true, true, false).addDriverContext();
        return (MergeOperator)factory.createOperator(driverContext);
    }

    private static Split createRemoteSplit(TaskId taskId) {
        return new Split(ExchangeOperator.REMOTE_CATALOG_HANDLE, (ConnectorSplit)new RemoteSplit((ExchangeInput)new DirectExchangeInput(taskId, "http://localhost/" + taskId)));
    }

    private static List<Page> pullAvailablePages(Operator operator) throws Exception {
        long endTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10L);
        ArrayList<Page> outputPages = new ArrayList<Page>();
        OperatorAssertion.assertOperatorIsUnblocked(operator);
        while (!operator.isFinished() && System.nanoTime() - endTime < 0L) {
            Assert.assertFalse((boolean)operator.needsInput());
            Page outputPage = operator.getOutput();
            if (outputPage != null) {
                outputPages.add(outputPage);
                continue;
            }
            Thread.sleep(10L);
        }
        Assert.assertFalse((boolean)operator.needsInput(), (String)"Operator still wants input");
        Assert.assertTrue((boolean)operator.isFinished(), (String)"Expected operator to be finished");
        operator.close();
        operator.getOperatorContext().destroy();
        Assert.assertEquals((long)((OperatorStats)Iterables.getOnlyElement((Iterable)operator.getOperatorContext().getNestedOperatorStats())).getUserMemoryReservation().toBytes(), (long)0L);
        return outputPages;
    }
}

