package io.trino.operator.exchange;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.trino.SequencePageBuilder;
import io.trino.Session;
import io.trino.block.BlockAssertions;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.scheduler.NodeScheduler;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.execution.scheduler.UniformNodeSelectorFactory;
import io.trino.metadata.InMemoryNodeManager;
import io.trino.metadata.InternalNode;
import io.trino.operator.InterpretedHashGenerator;
import io.trino.operator.PageAssertions;
import io.trino.operator.exchange.LocalExchange;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.connector.BucketFunction;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ConnectorBucketNodeMap;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
import io.trino.spi.type.VarcharType;
import io.trino.sql.planner.NodePartitioningManager;
import io.trino.sql.planner.PartitioningHandle;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.testing.TestingHandles;
import io.trino.testing.TestingSession;
import io.trino.testing.TestingTransactionHandle;
import io.trino.type.BlockTypeOperators;
import io.trino.util.FinalizerService;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:io/trino/operator/exchange/TestLocalExchange.class */
public class TestLocalExchange {
    private static final List<Type> TYPES = ImmutableList.of(BigintType.BIGINT);
    private static final DataSize RETAINED_PAGE_SIZE = DataSize.ofBytes(createPage(42).getRetainedSizeInBytes());
    private static final DataSize PAGE_SIZE = DataSize.ofBytes(createPage(42).getSizeInBytes());
    private static final DataSize LOCAL_EXCHANGE_MAX_BUFFERED_BYTES = DataSize.of(32, DataSize.Unit.MEGABYTE);
    private static final BlockTypeOperators TYPE_OPERATOR_FACTORY = new BlockTypeOperators(new TypeOperators());
    private static final Session SESSION = TestingSession.testSessionBuilder().build();
    private static final DataSize WRITER_SCALING_MIN_DATA_PROCESSED = DataSize.of(32, DataSize.Unit.MEGABYTE);
    private final ConcurrentMap<CatalogHandle, ConnectorNodePartitioningProvider> partitionManagers = new ConcurrentHashMap();
    private NodePartitioningManager nodePartitioningManager;

    @BeforeMethod
    public void setUp() {
        this.nodePartitioningManager = new NodePartitioningManager(new NodeScheduler(new UniformNodeSelectorFactory(new InMemoryNodeManager(new InternalNode[0]), new NodeSchedulerConfig().setIncludeCoordinator(true), new NodeTaskMap(new FinalizerService()))), new BlockTypeOperators(new TypeOperators()), catalogHandle -> {
            ConnectorNodePartitioningProvider connectorNodePartitioningProvider = this.partitionManagers.get(catalogHandle);
            Preconditions.checkArgument(connectorNodePartitioningProvider != null, "No partition manager for catalog handle: %s", catalogHandle);
            return connectorNodePartitioningProvider;
        });
    }

    @Test
    public void testGatherSingleWriter() {
        run(new LocalExchange(this.nodePartitioningManager, SESSION, 8, SystemPartitioningHandle.SINGLE_DISTRIBUTION, ImmutableList.of(), ImmutableList.of(), Optional.empty(), DataSize.ofBytes(retainedSizeOfPages(99)), TYPE_OPERATOR_FACTORY, WRITER_SCALING_MIN_DATA_PROCESSED), localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 1);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
            createSinkFactory.noMoreSinkFactories();
            LocalExchangeSource nextSource = localExchange.getNextSource();
            assertSource(nextSource, 0);
            LocalExchangeSink createSink = createSinkFactory.createSink();
            createSinkFactory.close();
            assertSinkCanWrite(createSink);
            assertSource(nextSource, 0);
            ListenableFuture waitForReading = nextSource.waitForReading();
            Assert.assertFalse(waitForReading.isDone());
            createSink.addPage(createPage(0));
            Assert.assertTrue(waitForReading.isDone());
            assertExchangeTotalBufferedBytes(localExchange, 1);
            assertSource(nextSource, 1);
            createSink.addPage(createPage(1));
            assertSource(nextSource, 2);
            assertExchangeTotalBufferedBytes(localExchange, 2);
            assertRemovePage(nextSource, createPage(0));
            assertSource(nextSource, 1);
            assertExchangeTotalBufferedBytes(localExchange, 1);
            assertRemovePage(nextSource, createPage(1));
            assertSource(nextSource, 0);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            createSink.addPage(createPage(2));
            createSink.addPage(createPage(3));
            assertSource(nextSource, 2);
            assertExchangeTotalBufferedBytes(localExchange, 2);
            createSink.finish();
            assertSinkFinished(createSink);
            assertSource(nextSource, 2);
            assertRemovePage(nextSource, createPage(2));
            assertSource(nextSource, 1);
            assertSinkFinished(createSink);
            assertExchangeTotalBufferedBytes(localExchange, 1);
            assertRemovePage(nextSource, createPage(3));
            assertSourceFinished(nextSource);
            assertExchangeTotalBufferedBytes(localExchange, 0);
        });
    }

    @Test
    public void testRandom() {
        run(new LocalExchange(this.nodePartitioningManager, SESSION, 2, SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION, ImmutableList.of(), ImmutableList.of(), Optional.empty(), LOCAL_EXCHANGE_MAX_BUFFERED_BYTES, TYPE_OPERATOR_FACTORY, WRITER_SCALING_MIN_DATA_PROCESSED), localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
            createSinkFactory.noMoreSinkFactories();
            LocalExchangeSink createSink = createSinkFactory.createSink();
            assertSinkCanWrite(createSink);
            createSinkFactory.close();
            LocalExchangeSource nextSource = localExchange.getNextSource();
            assertSource(nextSource, 0);
            LocalExchangeSource nextSource2 = localExchange.getNextSource();
            assertSource(nextSource2, 0);
            for (int i = 0; i < 100; i++) {
                createSink.addPage(createPage(0));
                assertExchangeTotalBufferedBytes(localExchange, i + 1);
                LocalExchangeBufferInfo bufferInfo = nextSource.getBufferInfo();
                LocalExchangeBufferInfo bufferInfo2 = nextSource2.getBufferInfo();
                Assert.assertEquals(bufferInfo.getBufferedBytes() + bufferInfo2.getBufferedBytes(), retainedSizeOfPages(i + 1));
                Assert.assertEquals(bufferInfo.getBufferedPages() + bufferInfo2.getBufferedPages(), i + 1);
            }
            Assert.assertTrue(nextSource.getBufferInfo().getBufferedPages() > 0);
            Assert.assertTrue(nextSource2.getBufferInfo().getBufferedPages() > 0);
            assertExchangeTotalBufferedBytes(localExchange, 100);
        });
    }

    @Test
    public void testScaleWriter() {
        run(new LocalExchange(this.nodePartitioningManager, SESSION, 3, SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION, ImmutableList.of(), ImmutableList.of(), Optional.empty(), DataSize.ofBytes(retainedSizeOfPages(4)), TYPE_OPERATOR_FACTORY, DataSize.ofBytes(sizeOfPages(2))), localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 3);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
            createSinkFactory.noMoreSinkFactories();
            LocalExchangeSink createSink = createSinkFactory.createSink();
            assertSinkCanWrite(createSink);
            createSinkFactory.close();
            LocalExchangeSource nextSource = localExchange.getNextSource();
            assertSource(nextSource, 0);
            LocalExchangeSource nextSource2 = localExchange.getNextSource();
            assertSource(nextSource2, 0);
            LocalExchangeSource nextSource3 = localExchange.getNextSource();
            assertSource(nextSource3, 0);
            createSink.addPage(createPage(0));
            createSink.addPage(createPage(0));
            Assert.assertEquals(nextSource.getBufferInfo().getBufferedPages(), 2);
            Assert.assertEquals(nextSource2.getBufferInfo().getBufferedPages(), 0);
            Assert.assertEquals(nextSource3.getBufferInfo().getBufferedPages(), 0);
            createSink.addPage(createPage(0));
            Assert.assertEquals(nextSource.getBufferInfo().getBufferedPages(), 2);
            Assert.assertEquals(nextSource2.getBufferInfo().getBufferedPages(), 1);
            Assert.assertEquals(nextSource3.getBufferInfo().getBufferedPages(), 0);
            assertRemovePage(nextSource, createPage(0));
            assertRemovePage(nextSource, createPage(0));
            createSink.addPage(createPage(0));
            createSink.addPage(createPage(0));
            createSink.addPage(createPage(0));
            Assert.assertEquals(nextSource.getBufferInfo().getBufferedPages(), 1);
            Assert.assertEquals(nextSource2.getBufferInfo().getBufferedPages(), 2);
            Assert.assertEquals(nextSource3.getBufferInfo().getBufferedPages(), 1);
        });
    }

    @Test
    public void testNoWriterScalingWhenOnlyBufferSizeLimitIsExceeded() {
        run(new LocalExchange(this.nodePartitioningManager, SESSION, 3, SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION, ImmutableList.of(), ImmutableList.of(), Optional.empty(), DataSize.ofBytes(retainedSizeOfPages(4)), TYPE_OPERATOR_FACTORY, DataSize.ofBytes(sizeOfPages(10))), localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 3);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
            createSinkFactory.noMoreSinkFactories();
            LocalExchangeSink createSink = createSinkFactory.createSink();
            assertSinkCanWrite(createSink);
            createSinkFactory.close();
            LocalExchangeSource nextSource = localExchange.getNextSource();
            assertSource(nextSource, 0);
            LocalExchangeSource nextSource2 = localExchange.getNextSource();
            assertSource(nextSource2, 0);
            LocalExchangeSource nextSource3 = localExchange.getNextSource();
            assertSource(nextSource3, 0);
            IntStream.range(0, 6).forEach(i -> {
                createSink.addPage(createPage(0));
            });
            Assert.assertEquals(nextSource.getBufferInfo().getBufferedPages(), 6);
            Assert.assertEquals(nextSource2.getBufferInfo().getBufferedPages(), 0);
            Assert.assertEquals(nextSource3.getBufferInfo().getBufferedPages(), 0);
        });
    }

    @Test
    public void testNoWriterScalingWhenOnlyWriterScalingMinDataProcessedLimitIsExceeded() {
        run(new LocalExchange(this.nodePartitioningManager, SESSION, 3, SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION, ImmutableList.of(), ImmutableList.of(), Optional.empty(), DataSize.ofBytes(retainedSizeOfPages(20)), TYPE_OPERATOR_FACTORY, DataSize.ofBytes(sizeOfPages(2))), localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 3);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
            createSinkFactory.noMoreSinkFactories();
            LocalExchangeSink createSink = createSinkFactory.createSink();
            assertSinkCanWrite(createSink);
            createSinkFactory.close();
            AtomicLong atomicLong = new AtomicLong(0L);
            LocalExchangeSource nextSource = localExchange.getNextSource();
            assertSource(nextSource, 0);
            LocalExchangeSource nextSource2 = localExchange.getNextSource();
            assertSource(nextSource2, 0);
            LocalExchangeSource nextSource3 = localExchange.getNextSource();
            assertSource(nextSource3, 0);
            IntStream.range(0, 8).forEach(i -> {
                createSink.addPage(createPage(0));
            });
            atomicLong.set(retainedSizeOfPages(8));
            createSink.addPage(createPage(0));
            Assert.assertEquals(nextSource.getBufferInfo().getBufferedPages(), 9);
            Assert.assertEquals(nextSource2.getBufferInfo().getBufferedPages(), 0);
            Assert.assertEquals(nextSource3.getBufferInfo().getBufferedPages(), 0);
        });
    }

    @Test(dataProvider = "scalingPartitionHandles")
    public void testScalingForSkewedWriters(PartitioningHandle partitioningHandle) {
        run(new LocalExchange(this.nodePartitioningManager, TestingSession.testSessionBuilder().setSystemProperty("skewed_partition_min_data_processed_rebalance_threshold", "20kB").build(), 4, partitioningHandle, ImmutableList.of(0), TYPES, Optional.empty(), DataSize.ofBytes(retainedSizeOfPages(2)), TYPE_OPERATOR_FACTORY, DataSize.of(10L, DataSize.Unit.KILOBYTE)), localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 4);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
            createSinkFactory.noMoreSinkFactories();
            LocalExchangeSink createSink = createSinkFactory.createSink();
            assertSinkCanWrite(createSink);
            createSinkFactory.close();
            LocalExchangeSource nextSource = localExchange.getNextSource();
            assertSource(nextSource, 0);
            LocalExchangeSource nextSource2 = localExchange.getNextSource();
            assertSource(nextSource2, 0);
            LocalExchangeSource nextSource3 = localExchange.getNextSource();
            assertSource(nextSource3, 0);
            LocalExchangeSource nextSource4 = localExchange.getNextSource();
            assertSource(nextSource4, 0);
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(1, 2));
            createSink.addPage(createSingleValuePage(1, 2));
            assertSource(nextSource, 2);
            assertSource(nextSource2, 0);
            assertSource(nextSource3, 0);
            assertSource(nextSource4, 2);
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            assertSource(nextSource, 2);
            assertSource(nextSource2, 2);
            assertSource(nextSource3, 0);
            assertSource(nextSource4, 4);
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            assertSource(nextSource, 2);
            assertSource(nextSource2, 4);
            assertSource(nextSource3, 1);
            assertSource(nextSource4, 5);
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            assertSource(nextSource, 4);
            assertSource(nextSource2, 5);
            assertSource(nextSource3, 3);
            assertSource(nextSource4, 6);
        });
    }

    @Test(dataProvider = "scalingPartitionHandles")
    public void testNoScalingWhenDataWrittenIsLessThanMinFileSize(PartitioningHandle partitioningHandle) {
        run(new LocalExchange(this.nodePartitioningManager, TestingSession.testSessionBuilder().setSystemProperty("skewed_partition_min_data_processed_rebalance_threshold", "20kB").build(), 4, partitioningHandle, ImmutableList.of(0), TYPES, Optional.empty(), DataSize.ofBytes(retainedSizeOfPages(2)), TYPE_OPERATOR_FACTORY, DataSize.of(50L, DataSize.Unit.MEGABYTE)), localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 4);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
            createSinkFactory.noMoreSinkFactories();
            LocalExchangeSink createSink = createSinkFactory.createSink();
            assertSinkCanWrite(createSink);
            createSinkFactory.close();
            LocalExchangeSource nextSource = localExchange.getNextSource();
            assertSource(nextSource, 0);
            LocalExchangeSource nextSource2 = localExchange.getNextSource();
            assertSource(nextSource2, 0);
            LocalExchangeSource nextSource3 = localExchange.getNextSource();
            assertSource(nextSource3, 0);
            LocalExchangeSource nextSource4 = localExchange.getNextSource();
            assertSource(nextSource4, 0);
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(1, 2));
            createSink.addPage(createSingleValuePage(1, 2));
            assertSource(nextSource, 2);
            assertSource(nextSource2, 0);
            assertSource(nextSource3, 0);
            assertSource(nextSource4, 2);
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            assertSource(nextSource, 2);
            assertSource(nextSource2, 0);
            assertSource(nextSource3, 0);
            assertSource(nextSource4, 6);
        });
    }

    @Test(dataProvider = "scalingPartitionHandles")
    public void testNoScalingWhenBufferUtilizationIsLessThanLimit(PartitioningHandle partitioningHandle) {
        run(new LocalExchange(this.nodePartitioningManager, TestingSession.testSessionBuilder().setSystemProperty("skewed_partition_min_data_processed_rebalance_threshold", "20kB").build(), 4, partitioningHandle, ImmutableList.of(0), TYPES, Optional.empty(), DataSize.of(50L, DataSize.Unit.MEGABYTE), TYPE_OPERATOR_FACTORY, DataSize.of(10L, DataSize.Unit.KILOBYTE)), localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 4);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
            createSinkFactory.noMoreSinkFactories();
            LocalExchangeSink createSink = createSinkFactory.createSink();
            assertSinkCanWrite(createSink);
            createSinkFactory.close();
            LocalExchangeSource nextSource = localExchange.getNextSource();
            assertSource(nextSource, 0);
            LocalExchangeSource nextSource2 = localExchange.getNextSource();
            assertSource(nextSource2, 0);
            LocalExchangeSource nextSource3 = localExchange.getNextSource();
            assertSource(nextSource3, 0);
            LocalExchangeSource nextSource4 = localExchange.getNextSource();
            assertSource(nextSource4, 0);
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(1, 2));
            createSink.addPage(createSingleValuePage(1, 2));
            assertSource(nextSource, 2);
            assertSource(nextSource2, 0);
            assertSource(nextSource3, 0);
            assertSource(nextSource4, 2);
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(0, 1000));
            assertSource(nextSource, 2);
            assertSource(nextSource2, 0);
            assertSource(nextSource3, 0);
            assertSource(nextSource4, 6);
        });
    }

    @Test
    public void testNoScalingWhenNoWriterSkewness() {
        run(new LocalExchange(this.nodePartitioningManager, TestingSession.testSessionBuilder().setSystemProperty("skewed_partition_min_data_processed_rebalance_threshold", "20kB").build(), 2, SystemPartitioningHandle.SCALED_WRITER_HASH_DISTRIBUTION, ImmutableList.of(0), TYPES, Optional.empty(), DataSize.ofBytes(retainedSizeOfPages(2)), TYPE_OPERATOR_FACTORY, DataSize.of(50L, DataSize.Unit.KILOBYTE)), localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
            createSinkFactory.noMoreSinkFactories();
            LocalExchangeSink createSink = createSinkFactory.createSink();
            assertSinkCanWrite(createSink);
            createSinkFactory.close();
            LocalExchangeSource nextSource = localExchange.getNextSource();
            assertSource(nextSource, 0);
            LocalExchangeSource nextSource2 = localExchange.getNextSource();
            assertSource(nextSource2, 0);
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(1, 1000));
            assertSource(nextSource, 1);
            assertSource(nextSource2, 1);
            createSink.addPage(createSingleValuePage(0, 1000));
            createSink.addPage(createSingleValuePage(1, 1000));
            assertSource(nextSource, 2);
            assertSource(nextSource2, 2);
        });
    }

    @Test
    public void testPassthrough() {
        run(new LocalExchange(this.nodePartitioningManager, SESSION, 2, SystemPartitioningHandle.FIXED_PASSTHROUGH_DISTRIBUTION, ImmutableList.of(), ImmutableList.of(), Optional.empty(), DataSize.ofBytes(retainedSizeOfPages(1)), TYPE_OPERATOR_FACTORY, WRITER_SCALING_MIN_DATA_PROCESSED), localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
            createSinkFactory.noMoreSinkFactories();
            LocalExchangeSink createSink = createSinkFactory.createSink();
            LocalExchangeSink createSink2 = createSinkFactory.createSink();
            assertSinkCanWrite(createSink);
            assertSinkCanWrite(createSink2);
            createSinkFactory.close();
            LocalExchangeSource nextSource = localExchange.getNextSource();
            assertSource(nextSource, 0);
            LocalExchangeSource nextSource2 = localExchange.getNextSource();
            assertSource(nextSource2, 0);
            createSink.addPage(createPage(0));
            assertSource(nextSource, 1);
            assertSource(nextSource2, 0);
            assertSinkWriteBlocked(createSink);
            assertSinkCanWrite(createSink2);
            createSink2.addPage(createPage(1));
            assertSource(nextSource, 1);
            assertSource(nextSource2, 1);
            assertSinkWriteBlocked(createSink);
            assertExchangeTotalBufferedBytes(localExchange, 2);
            assertRemovePage(nextSource, createPage(0));
            assertSource(nextSource, 0);
            assertSinkCanWrite(createSink);
            assertSinkWriteBlocked(createSink2);
            assertExchangeTotalBufferedBytes(localExchange, 1);
            createSink.finish();
            assertSinkFinished(createSink);
            assertSource(nextSource2, 1);
            nextSource.finish();
            nextSource2.finish();
            assertRemovePage(nextSource2, createPage(1));
            assertSourceFinished(nextSource);
            assertSourceFinished(nextSource2);
            assertSinkFinished(createSink2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
        });
    }

    @Test
    public void testPartition() {
        run(new LocalExchange(this.nodePartitioningManager, SESSION, 2, SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION, ImmutableList.of(0), TYPES, Optional.empty(), LOCAL_EXCHANGE_MAX_BUFFERED_BYTES, TYPE_OPERATOR_FACTORY, WRITER_SCALING_MIN_DATA_PROCESSED), localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
            createSinkFactory.noMoreSinkFactories();
            LocalExchangeSink createSink = createSinkFactory.createSink();
            assertSinkCanWrite(createSink);
            createSinkFactory.close();
            LocalExchangeSource nextSource = localExchange.getNextSource();
            assertSource(nextSource, 0);
            LocalExchangeSource nextSource2 = localExchange.getNextSource();
            assertSource(nextSource2, 0);
            createSink.addPage(createPage(0));
            assertSource(nextSource, 1);
            assertSource(nextSource2, 1);
            Assert.assertTrue(nextSource.getBufferInfo().getBufferedBytes() + nextSource2.getBufferInfo().getBufferedBytes() >= retainedSizeOfPages(1));
            createSink.addPage(createPage(0));
            assertSource(nextSource, 2);
            assertSource(nextSource2, 2);
            Assert.assertTrue(nextSource.getBufferInfo().getBufferedBytes() + nextSource2.getBufferInfo().getBufferedBytes() >= retainedSizeOfPages(2));
            assertPartitionedRemovePage(nextSource, 0, 2);
            assertSource(nextSource, 1);
            assertSource(nextSource2, 2);
            assertPartitionedRemovePage(nextSource, 0, 2);
            assertSource(nextSource, 0);
            assertSource(nextSource2, 2);
            createSink.finish();
            assertSinkFinished(createSink);
            assertSourceFinished(nextSource);
            assertSource(nextSource2, 2);
            assertPartitionedRemovePage(nextSource2, 1, 2);
            assertSourceFinished(nextSource);
            assertSource(nextSource2, 1);
            assertPartitionedRemovePage(nextSource2, 1, 2);
            assertSourceFinished(nextSource);
            assertSourceFinished(nextSource2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
        });
    }

    @Test
    public void testPartitionCustomPartitioning() {
        ConnectorPartitioningHandle connectorPartitioningHandle = new ConnectorPartitioningHandle() { // from class: io.trino.operator.exchange.TestLocalExchange.1
        };
        ConnectorNodePartitioningProvider connectorNodePartitioningProvider = new ConnectorNodePartitioningProvider() { // from class: io.trino.operator.exchange.TestLocalExchange.2
            public Optional<ConnectorBucketNodeMap> getBucketNodeMapping(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorPartitioningHandle connectorPartitioningHandle2) {
                return Optional.of(ConnectorBucketNodeMap.createBucketNodeMap(2));
            }

            public BucketFunction getBucketFunction(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorPartitioningHandle connectorPartitioningHandle2, List<Type> list, int i) {
                return (page, i2) -> {
                    return BigintType.BIGINT.getLong(page.getBlock(0), i2) == 42 ? 0 : 1;
                };
            }
        };
        ImmutableList of = ImmutableList.of(VarcharType.VARCHAR, BigintType.BIGINT);
        this.partitionManagers.put(TestingHandles.TEST_CATALOG_HANDLE, connectorNodePartitioningProvider);
        run(new LocalExchange(this.nodePartitioningManager, SESSION, 2, new PartitioningHandle(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), Optional.of(TestingTransactionHandle.create()), connectorPartitioningHandle), ImmutableList.of(1), ImmutableList.of(BigintType.BIGINT), Optional.empty(), LOCAL_EXCHANGE_MAX_BUFFERED_BYTES, TYPE_OPERATOR_FACTORY, WRITER_SCALING_MIN_DATA_PROCESSED), localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
            createSinkFactory.noMoreSinkFactories();
            LocalExchangeSink createSink = createSinkFactory.createSink();
            assertSinkCanWrite(createSink);
            createSinkFactory.close();
            LocalExchangeSource nextSource = localExchange.getNextSource();
            assertSource(nextSource, 0);
            LocalExchangeSource nextSource2 = localExchange.getNextSource();
            assertSource(nextSource2, 0);
            Page createSequencePage = SequencePageBuilder.createSequencePage(of, 1, 100, 42);
            createSink.addPage(createSequencePage);
            assertSource(nextSource2, 1);
            assertSource(nextSource, 0);
            assertRemovePage(of, nextSource2, createSequencePage);
            assertSource(nextSource2, 0);
            Page createSequencePage2 = SequencePageBuilder.createSequencePage(of, 100, 100, 43);
            createSink.addPage(createSequencePage2);
            assertSource(nextSource2, 0);
            assertSource(nextSource, 1);
            assertRemovePage(of, nextSource, createSequencePage2);
            assertSource(nextSource, 0);
        });
    }

    @Test
    public void writeUnblockWhenAllReadersFinish() {
        run(new LocalExchange(this.nodePartitioningManager, SESSION, 2, SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION, ImmutableList.of(), ImmutableList.of(), Optional.empty(), LOCAL_EXCHANGE_MAX_BUFFERED_BYTES, TYPE_OPERATOR_FACTORY, WRITER_SCALING_MIN_DATA_PROCESSED), localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
            createSinkFactory.noMoreSinkFactories();
            LocalExchangeSink createSink = createSinkFactory.createSink();
            assertSinkCanWrite(createSink);
            LocalExchangeSink createSink2 = createSinkFactory.createSink();
            assertSinkCanWrite(createSink2);
            createSinkFactory.close();
            LocalExchangeSource nextSource = localExchange.getNextSource();
            assertSource(nextSource, 0);
            LocalExchangeSource nextSource2 = localExchange.getNextSource();
            assertSource(nextSource2, 0);
            nextSource.finish();
            assertSourceFinished(nextSource);
            assertSinkCanWrite(createSink);
            assertSinkCanWrite(createSink2);
            nextSource2.finish();
            assertSourceFinished(nextSource2);
            assertSinkFinished(createSink);
            assertSinkFinished(createSink2);
        });
    }

    @Test
    public void writeUnblockWhenAllReadersFinishAndPagesConsumed() {
        run(new LocalExchange(this.nodePartitioningManager, SESSION, 2, SystemPartitioningHandle.FIXED_PASSTHROUGH_DISTRIBUTION, ImmutableList.of(), ImmutableList.of(), Optional.empty(), DataSize.ofBytes(2L), TYPE_OPERATOR_FACTORY, WRITER_SCALING_MIN_DATA_PROCESSED), localExchange -> {
            Assert.assertEquals(localExchange.getBufferCount(), 2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            LocalExchange.LocalExchangeSinkFactory createSinkFactory = localExchange.createSinkFactory();
            createSinkFactory.noMoreSinkFactories();
            LocalExchangeSink createSink = createSinkFactory.createSink();
            assertSinkCanWrite(createSink);
            ListenableFuture isFinished = createSink.isFinished();
            Assert.assertFalse(isFinished.isDone());
            LocalExchangeSink createSink2 = createSinkFactory.createSink();
            assertSinkCanWrite(createSink2);
            ListenableFuture isFinished2 = createSink2.isFinished();
            Assert.assertFalse(isFinished2.isDone());
            createSinkFactory.close();
            LocalExchangeSource nextSource = localExchange.getNextSource();
            assertSource(nextSource, 0);
            LocalExchangeSource nextSource2 = localExchange.getNextSource();
            assertSource(nextSource2, 0);
            createSink.addPage(createPage(0));
            ListenableFuture<Void> assertSinkWriteBlocked = assertSinkWriteBlocked(createSink);
            createSink2.addPage(createPage(0));
            ListenableFuture<Void> assertSinkWriteBlocked2 = assertSinkWriteBlocked(createSink2);
            assertSource(nextSource, 1);
            assertSource(nextSource2, 1);
            assertExchangeTotalBufferedBytes(localExchange, 2);
            nextSource.finish();
            assertSource(nextSource, 1);
            assertRemovePage(nextSource, createPage(0));
            assertSourceFinished(nextSource);
            assertExchangeTotalBufferedBytes(localExchange, 1);
            assertSource(nextSource2, 1);
            assertSinkWriteBlocked(createSink2);
            nextSource2.finish();
            assertSource(nextSource2, 1);
            assertRemovePage(nextSource2, createPage(0));
            assertSourceFinished(nextSource2);
            assertExchangeTotalBufferedBytes(localExchange, 0);
            Assert.assertTrue(assertSinkWriteBlocked.isDone());
            Assert.assertTrue(assertSinkWriteBlocked2.isDone());
            Assert.assertTrue(isFinished.isDone());
            Assert.assertTrue(isFinished2.isDone());
            assertSinkFinished(createSink);
            assertSinkFinished(createSink2);
        });
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    public Object[][] scalingPartitionHandles() {
        return new Object[]{new Object[]{SystemPartitioningHandle.SCALED_WRITER_HASH_DISTRIBUTION}, new Object[]{getCustomScalingPartitioningHandle()}};
    }

    private PartitioningHandle getCustomScalingPartitioningHandle() {
        ConnectorPartitioningHandle connectorPartitioningHandle = new ConnectorPartitioningHandle() { // from class: io.trino.operator.exchange.TestLocalExchange.3
        };
        this.partitionManagers.put(TestingHandles.TEST_CATALOG_HANDLE, new ConnectorNodePartitioningProvider() { // from class: io.trino.operator.exchange.TestLocalExchange.4
            public Optional<ConnectorBucketNodeMap> getBucketNodeMapping(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorPartitioningHandle connectorPartitioningHandle2) {
                return Optional.of(ConnectorBucketNodeMap.createBucketNodeMap(4));
            }

            public BucketFunction getBucketFunction(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorPartitioningHandle connectorPartitioningHandle2, List<Type> list, int i) {
                return (page, i2) -> {
                    return BigintType.BIGINT.getLong(page.getBlock(0), i2) == 0 ? 2 : 1;
                };
            }
        });
        return new PartitioningHandle(Optional.of(TestingHandles.TEST_CATALOG_HANDLE), Optional.of(TestingTransactionHandle.create()), connectorPartitioningHandle, true);
    }

    private void run(LocalExchange localExchange, Consumer<LocalExchange> consumer) {
        consumer.accept(localExchange);
    }

    private static void assertSource(LocalExchangeSource localExchangeSource, int i) {
        LocalExchangeBufferInfo bufferInfo = localExchangeSource.getBufferInfo();
        Assert.assertEquals(bufferInfo.getBufferedPages(), i);
        Assert.assertFalse(localExchangeSource.isFinished());
        if (i != 0) {
            Assert.assertTrue(localExchangeSource.waitForReading().isDone());
            Assert.assertTrue(bufferInfo.getBufferedBytes() > 0);
            return;
        }
        Assert.assertFalse(localExchangeSource.waitForReading().isDone());
        Assert.assertNull(localExchangeSource.removePage());
        Assert.assertFalse(localExchangeSource.waitForReading().isDone());
        Assert.assertFalse(localExchangeSource.isFinished());
        Assert.assertEquals(bufferInfo.getBufferedBytes(), 0L);
    }

    private static void assertSourceFinished(LocalExchangeSource localExchangeSource) {
        Assert.assertTrue(localExchangeSource.isFinished());
        LocalExchangeBufferInfo bufferInfo = localExchangeSource.getBufferInfo();
        Assert.assertEquals(bufferInfo.getBufferedPages(), 0);
        Assert.assertEquals(bufferInfo.getBufferedBytes(), 0L);
        Assert.assertTrue(localExchangeSource.waitForReading().isDone());
        Assert.assertNull(localExchangeSource.removePage());
        Assert.assertTrue(localExchangeSource.waitForReading().isDone());
        Assert.assertTrue(localExchangeSource.isFinished());
    }

    private static void assertRemovePage(LocalExchangeSource localExchangeSource, Page page) {
        assertRemovePage(TYPES, localExchangeSource, page);
    }

    private static void assertRemovePage(List<Type> list, LocalExchangeSource localExchangeSource, Page page) {
        Assert.assertTrue(localExchangeSource.waitForReading().isDone());
        Page removePage = localExchangeSource.removePage();
        Assert.assertNotNull(removePage);
        Assert.assertEquals(removePage.getChannelCount(), page.getChannelCount());
        PageAssertions.assertPageEquals(list, removePage, page);
    }

    private static void assertPartitionedRemovePage(LocalExchangeSource localExchangeSource, int i, int i2) {
        Assert.assertTrue(localExchangeSource.waitForReading().isDone());
        Page removePage = localExchangeSource.removePage();
        Assert.assertNotNull(removePage);
        LocalPartitionGenerator localPartitionGenerator = new LocalPartitionGenerator(new InterpretedHashGenerator(TYPES, new int[]{0}, TYPE_OPERATOR_FACTORY), i2);
        for (int i3 = 0; i3 < removePage.getPositionCount(); i3++) {
            Assert.assertEquals(localPartitionGenerator.getPartition(removePage, i3), i);
        }
    }

    private static void assertSinkCanWrite(LocalExchangeSink localExchangeSink) {
        Assert.assertFalse(localExchangeSink.isFinished().isDone());
        Assert.assertTrue(localExchangeSink.waitForWriting().isDone());
    }

    private static ListenableFuture<Void> assertSinkWriteBlocked(LocalExchangeSink localExchangeSink) {
        Assert.assertFalse(localExchangeSink.isFinished().isDone());
        ListenableFuture<Void> waitForWriting = localExchangeSink.waitForWriting();
        Assert.assertFalse(waitForWriting.isDone());
        return waitForWriting;
    }

    private static void assertSinkFinished(LocalExchangeSink localExchangeSink) {
        Assert.assertTrue(localExchangeSink.isFinished().isDone());
        Assert.assertTrue(localExchangeSink.waitForWriting().isDone());
        localExchangeSink.addPage(createPage(0));
        Assert.assertTrue(localExchangeSink.isFinished().isDone());
        Assert.assertTrue(localExchangeSink.waitForWriting().isDone());
    }

    private static void assertExchangeTotalBufferedBytes(LocalExchange localExchange, int i) {
        long j = 0;
        for (int i2 = 0; i2 < localExchange.getBufferCount(); i2++) {
            j += localExchange.getSource(i2).getBufferInfo().getBufferedBytes();
        }
        Assert.assertEquals(j, retainedSizeOfPages(i));
    }

    private static Page createPage(int i) {
        return SequencePageBuilder.createSequencePage(TYPES, 100, i);
    }

    private static Page createSingleValuePage(int i, int i2) {
        return new Page(new Block[]{BlockAssertions.createLongsBlock((List) IntStream.range(0, i2).mapToObj(i3 -> {
            return Long.valueOf(i);
        }).collect(ImmutableList.toImmutableList()))});
    }

    private static long sizeOfPages(int i) {
        return PAGE_SIZE.toBytes() * i;
    }

    public static long retainedSizeOfPages(int i) {
        return RETAINED_PAGE_SIZE.toBytes() * i;
    }
}
