package io.trino.operator;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.spi.QueryId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import java.util.Objects;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:io/trino/operator/TestStreamingDirectExchangeBuffer.class */
public class TestStreamingDirectExchangeBuffer {
    private static final StageId STAGE_ID = new StageId(new QueryId("query"), 0);
    private static final TaskId TASK_0 = new TaskId(STAGE_ID, 0, 0);
    private static final TaskId TASK_1 = new TaskId(STAGE_ID, 1, 0);
    private static final Slice PAGE_0 = Slices.utf8Slice("page0");
    private static final Slice PAGE_1 = Slices.utf8Slice("page-1");
    private static final Slice PAGE_2 = Slices.utf8Slice("page-_2");

    @Test
    public void testHappyPath() {
        StreamingDirectExchangeBuffer streamingDirectExchangeBuffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        try {
            Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
            ListenableFuture isBlocked = streamingDirectExchangeBuffer.isBlocked();
            Assert.assertFalse(isBlocked.isDone());
            Assert.assertNull(streamingDirectExchangeBuffer.pollPage());
            streamingDirectExchangeBuffer.addTask(TASK_0);
            Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
            Assert.assertFalse(isBlocked.isDone());
            Assert.assertNull(streamingDirectExchangeBuffer.pollPage());
            streamingDirectExchangeBuffer.addTask(TASK_1);
            Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
            Assert.assertFalse(isBlocked.isDone());
            Assert.assertNull(streamingDirectExchangeBuffer.pollPage());
            streamingDirectExchangeBuffer.noMoreTasks();
            Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
            Assert.assertFalse(isBlocked.isDone());
            Assert.assertNull(streamingDirectExchangeBuffer.pollPage());
            streamingDirectExchangeBuffer.addPages(TASK_0, ImmutableList.of(PAGE_0));
            Assert.assertEquals(streamingDirectExchangeBuffer.getBufferedPageCount(), 1);
            Assert.assertEquals(streamingDirectExchangeBuffer.getRetainedSizeInBytes(), PAGE_0.getRetainedSize());
            Assert.assertEquals(streamingDirectExchangeBuffer.getMaxRetainedSizeInBytes(), PAGE_0.getRetainedSize());
            Assert.assertEquals(streamingDirectExchangeBuffer.getRemainingCapacityInBytes(), DataSize.of(1L, DataSize.Unit.KILOBYTE).toBytes() - PAGE_0.getRetainedSize());
            Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
            Assert.assertTrue(isBlocked.isDone());
            Assert.assertEquals(streamingDirectExchangeBuffer.pollPage(), PAGE_0);
            ListenableFuture isBlocked2 = streamingDirectExchangeBuffer.isBlocked();
            Assert.assertEquals(streamingDirectExchangeBuffer.getRetainedSizeInBytes(), 0L);
            Assert.assertEquals(streamingDirectExchangeBuffer.getMaxRetainedSizeInBytes(), PAGE_0.getRetainedSize());
            Assert.assertEquals(streamingDirectExchangeBuffer.getRemainingCapacityInBytes(), DataSize.of(1L, DataSize.Unit.KILOBYTE).toBytes());
            Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
            Assert.assertFalse(isBlocked2.isDone());
            streamingDirectExchangeBuffer.taskFinished(TASK_0);
            Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
            Assert.assertFalse(streamingDirectExchangeBuffer.isBlocked().isDone());
            streamingDirectExchangeBuffer.addPages(TASK_1, ImmutableList.of(PAGE_1, PAGE_2));
            Assert.assertEquals(streamingDirectExchangeBuffer.getBufferedPageCount(), 2);
            Assert.assertEquals(streamingDirectExchangeBuffer.getRetainedSizeInBytes(), PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize());
            Assert.assertEquals(streamingDirectExchangeBuffer.getMaxRetainedSizeInBytes(), PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize());
            Assert.assertEquals(streamingDirectExchangeBuffer.getRemainingCapacityInBytes(), (DataSize.of(1L, DataSize.Unit.KILOBYTE).toBytes() - PAGE_1.getRetainedSize()) - PAGE_2.getRetainedSize());
            Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
            Assert.assertTrue(streamingDirectExchangeBuffer.isBlocked().isDone());
            Assert.assertEquals(streamingDirectExchangeBuffer.pollPage(), PAGE_1);
            Assert.assertEquals(streamingDirectExchangeBuffer.pollPage(), PAGE_2);
            Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
            Assert.assertFalse(streamingDirectExchangeBuffer.isBlocked().isDone());
            Assert.assertEquals(streamingDirectExchangeBuffer.getRetainedSizeInBytes(), 0L);
            Assert.assertEquals(streamingDirectExchangeBuffer.getMaxRetainedSizeInBytes(), PAGE_1.getRetainedSize() + PAGE_2.getRetainedSize());
            Assert.assertEquals(streamingDirectExchangeBuffer.getRemainingCapacityInBytes(), DataSize.of(1L, DataSize.Unit.KILOBYTE).toBytes());
            streamingDirectExchangeBuffer.taskFinished(TASK_1);
            Assert.assertTrue(streamingDirectExchangeBuffer.isFinished());
            Assert.assertTrue(isBlocked2.isDone());
            streamingDirectExchangeBuffer.close();
        } catch (Throwable th) {
            try {
                streamingDirectExchangeBuffer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testClose() {
        StreamingDirectExchangeBuffer streamingDirectExchangeBuffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        streamingDirectExchangeBuffer.addTask(TASK_0);
        streamingDirectExchangeBuffer.addTask(TASK_1);
        Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
        Assert.assertFalse(streamingDirectExchangeBuffer.isBlocked().isDone());
        Assert.assertNull(streamingDirectExchangeBuffer.pollPage());
        streamingDirectExchangeBuffer.close();
        Assert.assertTrue(streamingDirectExchangeBuffer.isFinished());
        Assert.assertTrue(streamingDirectExchangeBuffer.isBlocked().isDone());
        Assert.assertNull(streamingDirectExchangeBuffer.pollPage());
    }

    @Test
    public void testIsFinished() {
        StreamingDirectExchangeBuffer streamingDirectExchangeBuffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        try {
            Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
            ListenableFuture isBlocked = streamingDirectExchangeBuffer.isBlocked();
            Assert.assertFalse(isBlocked.isDone());
            streamingDirectExchangeBuffer.noMoreTasks();
            Assert.assertTrue(streamingDirectExchangeBuffer.isFinished());
            Assert.assertTrue(isBlocked.isDone());
            streamingDirectExchangeBuffer.close();
            streamingDirectExchangeBuffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
            try {
                Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
                ListenableFuture isBlocked2 = streamingDirectExchangeBuffer.isBlocked();
                Assert.assertFalse(isBlocked2.isDone());
                streamingDirectExchangeBuffer.addTask(TASK_0);
                streamingDirectExchangeBuffer.noMoreTasks();
                Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
                Assert.assertFalse(isBlocked2.isDone());
                streamingDirectExchangeBuffer.taskFinished(TASK_0);
                Assert.assertTrue(streamingDirectExchangeBuffer.isFinished());
                Assert.assertTrue(isBlocked2.isDone());
                streamingDirectExchangeBuffer.close();
                StreamingDirectExchangeBuffer streamingDirectExchangeBuffer2 = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
                try {
                    Assert.assertFalse(streamingDirectExchangeBuffer2.isFinished());
                    ListenableFuture isBlocked3 = streamingDirectExchangeBuffer2.isBlocked();
                    Assert.assertFalse(isBlocked3.isDone());
                    streamingDirectExchangeBuffer2.addTask(TASK_0);
                    Assert.assertFalse(streamingDirectExchangeBuffer2.isFinished());
                    Assert.assertFalse(isBlocked3.isDone());
                    RuntimeException runtimeException = new RuntimeException();
                    streamingDirectExchangeBuffer2.taskFailed(TASK_0, runtimeException);
                    Assert.assertFalse(streamingDirectExchangeBuffer2.isFinished());
                    Assert.assertTrue(streamingDirectExchangeBuffer2.isFailed());
                    Assert.assertTrue(isBlocked3.isDone());
                    Objects.requireNonNull(streamingDirectExchangeBuffer2);
                    Assertions.assertThatThrownBy(streamingDirectExchangeBuffer2::pollPage).isEqualTo(runtimeException);
                    streamingDirectExchangeBuffer2.close();
                } finally {
                    try {
                        streamingDirectExchangeBuffer2.close();
                    } catch (Throwable th) {
                        th.addSuppressed(th);
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testFutureCancellationDoesNotAffectOtherFutures() {
        StreamingDirectExchangeBuffer streamingDirectExchangeBuffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        try {
            Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
            ListenableFuture isBlocked = streamingDirectExchangeBuffer.isBlocked();
            ListenableFuture isBlocked2 = streamingDirectExchangeBuffer.isBlocked();
            ListenableFuture isBlocked3 = streamingDirectExchangeBuffer.isBlocked();
            Assert.assertFalse(isBlocked.isDone());
            Assert.assertFalse(isBlocked2.isDone());
            Assert.assertFalse(isBlocked3.isDone());
            isBlocked3.cancel(true);
            Assert.assertFalse(isBlocked.isDone());
            Assert.assertFalse(isBlocked2.isDone());
            streamingDirectExchangeBuffer.noMoreTasks();
            Assert.assertTrue(streamingDirectExchangeBuffer.isFinished());
            Assert.assertTrue(isBlocked.isDone());
            Assert.assertTrue(isBlocked2.isDone());
            streamingDirectExchangeBuffer.close();
        } catch (Throwable th) {
            try {
                streamingDirectExchangeBuffer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testRemoteTaskFailedError() {
        StreamingDirectExchangeBuffer streamingDirectExchangeBuffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        try {
            streamingDirectExchangeBuffer.addTask(TASK_0);
            streamingDirectExchangeBuffer.taskFailed(TASK_0, new TrinoException(StandardErrorCode.REMOTE_TASK_FAILED, "Remote task failed"));
            streamingDirectExchangeBuffer.noMoreTasks();
            Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
            Assert.assertFalse(streamingDirectExchangeBuffer.isFailed());
            Assert.assertNull(streamingDirectExchangeBuffer.pollPage());
            streamingDirectExchangeBuffer.close();
            streamingDirectExchangeBuffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
            try {
                streamingDirectExchangeBuffer.addTask(TASK_0);
                streamingDirectExchangeBuffer.noMoreTasks();
                streamingDirectExchangeBuffer.taskFailed(TASK_0, new TrinoException(StandardErrorCode.REMOTE_TASK_FAILED, "Remote task failed"));
                Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
                Assert.assertFalse(streamingDirectExchangeBuffer.isFailed());
                Assert.assertNull(streamingDirectExchangeBuffer.pollPage());
                streamingDirectExchangeBuffer.close();
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testSingleWakeUp() {
        StreamingDirectExchangeBuffer streamingDirectExchangeBuffer = new StreamingDirectExchangeBuffer(MoreExecutors.directExecutor(), DataSize.of(1L, DataSize.Unit.KILOBYTE));
        try {
            Assert.assertFalse(streamingDirectExchangeBuffer.isFinished());
            ListenableFuture isBlocked = streamingDirectExchangeBuffer.isBlocked();
            ListenableFuture isBlocked2 = streamingDirectExchangeBuffer.isBlocked();
            Assert.assertFalse(isBlocked.isDone());
            Assert.assertFalse(isBlocked2.isDone());
            streamingDirectExchangeBuffer.addTask(TASK_0);
            streamingDirectExchangeBuffer.addPages(TASK_0, ImmutableList.of(PAGE_0));
            streamingDirectExchangeBuffer.pollPage();
            Assert.assertTrue(isBlocked.isDone());
            Assert.assertFalse(isBlocked2.isDone());
            streamingDirectExchangeBuffer.addPages(TASK_0, ImmutableList.of(PAGE_0));
            streamingDirectExchangeBuffer.pollPage();
            Assert.assertTrue(isBlocked2.isDone());
            streamingDirectExchangeBuffer.close();
        } catch (Throwable th) {
            try {
                streamingDirectExchangeBuffer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
