package io.trino.operator;

import com.google.common.collect.ImmutableList;
import io.airlift.concurrent.Threads;
import io.airlift.slice.Slice;
import io.trino.RowPagesBuilder;
import io.trino.Session;
import io.trino.SessionTestUtils;
import io.trino.connector.CatalogServiceProvider;
import io.trino.memory.context.MemoryTrackingContext;
import io.trino.metadata.OutputTableHandle;
import io.trino.metadata.TestingFunctionResolution;
import io.trino.operator.AggregationOperator;
import io.trino.operator.DevNullOperator;
import io.trino.operator.TableWriterOperator;
import io.trino.operator.aggregation.TestingAggregationFunction;
import io.trino.spi.Page;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorPageSinkId;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.WriterScalingOptions;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.split.PageSinkManager;
import io.trino.sql.analyzer.TypeSignatureProvider;
import io.trino.sql.planner.plan.AggregationNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.TableWriterNode;
import io.trino.sql.tree.QualifiedName;
import io.trino.testing.TestingHandles;
import io.trino.testing.TestingSession;
import io.trino.testing.TestingTaskContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:io/trino/operator/TestTableWriterOperator.class */
public class TestTableWriterOperator {
    private static final TestingAggregationFunction LONG_MAX = new TestingFunctionResolution().getAggregateFunction(QualifiedName.of("max"), TypeSignatureProvider.fromTypes(new Type[]{BigintType.BIGINT}));
    private ExecutorService executor;
    private ScheduledExecutorService scheduledExecutor;

    /* loaded from: input_file:io/trino/operator/TestTableWriterOperator$BlockingPageSink.class */
    private static class BlockingPageSink implements ConnectorPageSink {
        private CompletableFuture<?> future = new CompletableFuture<>();
        private CompletableFuture<Collection<Slice>> finishFuture = new CompletableFuture<>();

        private BlockingPageSink() {
        }

        public CompletableFuture<?> appendPage(Page page) {
            this.future = new CompletableFuture<>();
            return this.future;
        }

        public CompletableFuture<Collection<Slice>> finish() {
            this.finishFuture = new CompletableFuture<>();
            return this.finishFuture;
        }

        public void abort() {
        }

        void complete() {
            this.future.complete(null);
            this.finishFuture.complete(ImmutableList.of());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/operator/TestTableWriterOperator$ConstantPageSinkProvider.class */
    public static class ConstantPageSinkProvider implements ConnectorPageSinkProvider {
        private final ConnectorPageSink pageSink;

        private ConstantPageSinkProvider(ConnectorPageSink connectorPageSink) {
            this.pageSink = connectorPageSink;
        }

        public ConnectorPageSink createPageSink(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorOutputTableHandle connectorOutputTableHandle, ConnectorPageSinkId connectorPageSinkId) {
            return this.pageSink;
        }

        public ConnectorPageSink createPageSink(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorInsertTableHandle connectorInsertTableHandle, ConnectorPageSinkId connectorPageSinkId) {
            return this.pageSink;
        }
    }

    /* loaded from: input_file:io/trino/operator/TestTableWriterOperator$TableWriteInfoTestPageSink.class */
    private static class TableWriteInfoTestPageSink implements ConnectorPageSink {
        private final List<Page> pages = new ArrayList();

        private TableWriteInfoTestPageSink() {
        }

        public CompletableFuture<?> appendPage(Page page) {
            this.pages.add(page);
            return NOT_BLOCKED;
        }

        public CompletableFuture<Collection<Slice>> finish() {
            return CompletableFuture.completedFuture(ImmutableList.of());
        }

        public long getMemoryUsage() {
            long j = 0;
            Iterator<Page> it = this.pages.iterator();
            while (it.hasNext()) {
                j += it.next().getRetainedSizeInBytes();
            }
            return j;
        }

        public long getValidationCpuNanos() {
            long j = 0;
            while (this.pages.iterator().hasNext()) {
                j += r0.next().getPositionCount();
            }
            return j;
        }

        public void abort() {
        }
    }

    @BeforeClass
    public void setUp() {
        this.executor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed(getClass().getSimpleName() + "-%s"));
        this.scheduledExecutor = Executors.newScheduledThreadPool(2, Threads.daemonThreadsNamed(getClass().getSimpleName() + "-scheduledExecutor-%s"));
    }

    @AfterClass(alwaysRun = true)
    public void tearDown() {
        this.executor.shutdownNow();
        this.scheduledExecutor.shutdownNow();
    }

    @Test
    public void testBlockedPageSink() {
        BlockingPageSink blockingPageSink = new BlockingPageSink();
        Operator createTableWriterOperator = createTableWriterOperator(blockingPageSink);
        Assert.assertTrue(createTableWriterOperator.isBlocked().isDone());
        Assert.assertFalse(createTableWriterOperator.isFinished());
        Assert.assertTrue(createTableWriterOperator.needsInput());
        createTableWriterOperator.addInput(RowPagesBuilder.rowPagesBuilder(BigintType.BIGINT).row(42).build().get(0));
        Assert.assertFalse(createTableWriterOperator.isBlocked().isDone());
        Assert.assertFalse(createTableWriterOperator.isFinished());
        Assert.assertFalse(createTableWriterOperator.needsInput());
        Assert.assertNull(createTableWriterOperator.getOutput());
        blockingPageSink.complete();
        Assert.assertTrue(createTableWriterOperator.isBlocked().isDone());
        Assert.assertFalse(createTableWriterOperator.isFinished());
        Assert.assertTrue(createTableWriterOperator.needsInput());
        createTableWriterOperator.addInput(RowPagesBuilder.rowPagesBuilder(BigintType.BIGINT).row(44).build().get(0));
        Assert.assertFalse(createTableWriterOperator.isBlocked().isDone());
        Assert.assertFalse(createTableWriterOperator.isFinished());
        Assert.assertFalse(createTableWriterOperator.needsInput());
        createTableWriterOperator.finish();
        Assert.assertFalse(createTableWriterOperator.isBlocked().isDone());
        Assert.assertFalse(createTableWriterOperator.isFinished());
        Assert.assertFalse(createTableWriterOperator.needsInput());
        blockingPageSink.complete();
        ImmutableList of = ImmutableList.of(BigintType.BIGINT, VarbinaryType.VARBINARY);
        PageAssertions.assertPageEquals(of, createTableWriterOperator.getOutput(), RowPagesBuilder.rowPagesBuilder((Iterable<Type>) of).row(2, null).build().get(0));
        Assert.assertTrue(createTableWriterOperator.isBlocked().isDone());
        Assert.assertTrue(createTableWriterOperator.isFinished());
        Assert.assertFalse(createTableWriterOperator.needsInput());
    }

    @Test
    public void addInputFailsOnBlockedOperator() {
        Operator createTableWriterOperator = createTableWriterOperator(new BlockingPageSink());
        createTableWriterOperator.addInput(RowPagesBuilder.rowPagesBuilder(BigintType.BIGINT).row(42).build().get(0));
        Assert.assertFalse(createTableWriterOperator.isBlocked().isDone());
        Assert.assertFalse(createTableWriterOperator.needsInput());
        Assertions.assertThatThrownBy(() -> {
            createTableWriterOperator.addInput(RowPagesBuilder.rowPagesBuilder(BigintType.BIGINT).row(42).build().get(0));
        }).isInstanceOf(IllegalStateException.class).hasMessage("Operator does not need input");
    }

    @Test
    public void testTableWriterInfo() {
        TableWriterOperator createTableWriterOperator = createTableWriterOperator(new PageSinkManager(CatalogServiceProvider.singleton(TestingHandles.TEST_CATALOG_HANDLE, new ConstantPageSinkProvider(new TableWriteInfoTestPageSink()))), new DevNullOperator.DevNullOperatorFactory(1, new PlanNodeId("test")), ImmutableList.of(BigintType.BIGINT, VarbinaryType.VARBINARY));
        RowPagesBuilder rowPagesBuilder = RowPagesBuilder.rowPagesBuilder(BigintType.BIGINT);
        for (int i = 0; i < 100; i++) {
            rowPagesBuilder.addSequencePage(100, 0);
        }
        List<Page> build = rowPagesBuilder.build();
        long j = 0;
        long j2 = 0;
        for (int i2 = 0; i2 < build.size(); i2++) {
            Page page = build.get(i2);
            j += page.getRetainedSizeInBytes();
            j2 += page.getPositionCount();
            createTableWriterOperator.addInput(page);
            TableWriterOperator.TableWriterInfo info = createTableWriterOperator.getInfo();
            Assert.assertEquals(info.getPageSinkPeakMemoryUsage(), j);
            Assert.assertEquals((long) info.getValidationCpuTime().getValue(TimeUnit.NANOSECONDS), j2);
        }
    }

    @Test
    public void testStatisticsAggregation() throws Exception {
        PageSinkManager pageSinkManager = new PageSinkManager(CatalogServiceProvider.singleton(TestingHandles.TEST_CATALOG_HANDLE, new ConstantPageSinkProvider(new TableWriteInfoTestPageSink())));
        ImmutableList of = ImmutableList.of(BigintType.BIGINT, VarbinaryType.VARBINARY, BigintType.BIGINT);
        Session build = TestingSession.testSessionBuilder().setSystemProperty("statistics_cpu_timer_enabled", "true").build();
        DriverContext addDriverContext = TestingTaskContext.createTaskContext(this.executor, this.scheduledExecutor, build).addPipelineContext(0, true, true, false).addDriverContext();
        TableWriterOperator tableWriterOperator = (TableWriterOperator) createTableWriterOperator(pageSinkManager, new AggregationOperator.AggregationOperatorFactory(1, new PlanNodeId("test"), ImmutableList.of(LONG_MAX.createAggregatorFactory(AggregationNode.Step.SINGLE, ImmutableList.of(0), OptionalInt.empty()))), of, build, addDriverContext);
        tableWriterOperator.addInput(RowPagesBuilder.rowPagesBuilder(BigintType.BIGINT).row(42).build().get(0));
        tableWriterOperator.addInput(RowPagesBuilder.rowPagesBuilder(BigintType.BIGINT).row(43).build().get(0));
        Assert.assertTrue(tableWriterOperator.isBlocked().isDone());
        Assert.assertTrue(tableWriterOperator.needsInput());
        Assertions.assertThat(addDriverContext.getMemoryUsage()).isGreaterThan(0L);
        tableWriterOperator.finish();
        Assert.assertFalse(tableWriterOperator.isFinished());
        PageAssertions.assertPageEquals(of, tableWriterOperator.getOutput(), RowPagesBuilder.rowPagesBuilder((Iterable<Type>) of).row(null, null, 43).build().get(0));
        PageAssertions.assertPageEquals(of, tableWriterOperator.getOutput(), RowPagesBuilder.rowPagesBuilder((Iterable<Type>) of).row(2, null, null).build().get(0));
        Assert.assertTrue(tableWriterOperator.isBlocked().isDone());
        Assert.assertFalse(tableWriterOperator.needsInput());
        Assert.assertTrue(tableWriterOperator.isFinished());
        tableWriterOperator.close();
        assertMemoryIsReleased(tableWriterOperator);
        TableWriterOperator.TableWriterInfo info = tableWriterOperator.getInfo();
        Assertions.assertThat(info.getStatisticsWallTime().getValue(TimeUnit.NANOSECONDS)).isGreaterThan(0.0d);
        Assertions.assertThat(info.getStatisticsCpuTime().getValue(TimeUnit.NANOSECONDS)).isGreaterThan(0.0d);
    }

    private void assertMemoryIsReleased(TableWriterOperator tableWriterOperator) {
        MemoryTrackingContext operatorMemoryContext = tableWriterOperator.getOperatorContext().getOperatorMemoryContext();
        Assert.assertEquals(operatorMemoryContext.getUserMemory(), 0L);
        Assert.assertEquals(operatorMemoryContext.getRevocableMemory(), 0L);
        AggregationOperator statisticAggregationOperator = tableWriterOperator.getStatisticAggregationOperator();
        Assert.assertTrue(statisticAggregationOperator instanceof AggregationOperator);
        MemoryTrackingContext operatorMemoryContext2 = statisticAggregationOperator.getOperatorContext().getOperatorMemoryContext();
        Assert.assertEquals(operatorMemoryContext2.getUserMemory(), 0L);
        Assert.assertEquals(operatorMemoryContext2.getRevocableMemory(), 0L);
    }

    private Operator createTableWriterOperator(BlockingPageSink blockingPageSink) {
        return createTableWriterOperator(new PageSinkManager(CatalogServiceProvider.singleton(TestingHandles.TEST_CATALOG_HANDLE, new ConstantPageSinkProvider(blockingPageSink))), new DevNullOperator.DevNullOperatorFactory(1, new PlanNodeId("test")), ImmutableList.of(BigintType.BIGINT, VarbinaryType.VARBINARY));
    }

    private Operator createTableWriterOperator(PageSinkManager pageSinkManager, OperatorFactory operatorFactory, List<Type> list) {
        return createTableWriterOperator(pageSinkManager, operatorFactory, list, SessionTestUtils.TEST_SESSION);
    }

    private Operator createTableWriterOperator(PageSinkManager pageSinkManager, OperatorFactory operatorFactory, List<Type> list, Session session) {
        return createTableWriterOperator(pageSinkManager, operatorFactory, list, session, TestingTaskContext.createTaskContext(this.executor, this.scheduledExecutor, session).addPipelineContext(0, true, true, false).addDriverContext());
    }

    private Operator createTableWriterOperator(PageSinkManager pageSinkManager, OperatorFactory operatorFactory, List<Type> list, Session session, DriverContext driverContext) {
        SchemaTableName schemaTableName = new SchemaTableName("testSchema", "testTable");
        return new TableWriterOperator.TableWriterOperatorFactory(0, new PlanNodeId("test"), pageSinkManager, new TableWriterNode.CreateTarget(new OutputTableHandle(TestingHandles.TEST_CATALOG_HANDLE, schemaTableName, new ConnectorTransactionHandle() { // from class: io.trino.operator.TestTableWriterOperator.1
        }, new ConnectorOutputTableHandle() { // from class: io.trino.operator.TestTableWriterOperator.2
        }), schemaTableName, false, OptionalInt.empty(), WriterScalingOptions.DISABLED), ImmutableList.of(0), session, operatorFactory, list).createOperator(driverContext);
    }
}
