package io.trino.execution.buffer;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.Threads;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.buffer.PipelinedOutputBuffers;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.memory.context.SimpleLocalMemoryContext;
import io.trino.spi.Page;
import io.trino.spi.QueryId;
import io.trino.spi.type.BigintType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.testng.Assert;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
/* loaded from: input_file:io/trino/execution/buffer/TestArbitraryOutputBuffer.class */
public class TestArbitraryOutputBuffer {
    private static final String TASK_INSTANCE_ID = "task-instance-id";
    private static final ImmutableList<BigintType> TYPES = ImmutableList.of(BigintType.BIGINT);
    private static final PipelinedOutputBuffers.OutputBufferId FIRST = new PipelinedOutputBuffers.OutputBufferId(0);
    private static final PipelinedOutputBuffers.OutputBufferId SECOND = new PipelinedOutputBuffers.OutputBufferId(1);
    private ScheduledExecutorService stateNotificationExecutor;

    @BeforeAll
    public void setUp() {
        this.stateNotificationExecutor = Executors.newScheduledThreadPool(5, Threads.daemonThreadsNamed(getClass().getSimpleName() + "-%s"));
    }

    @AfterAll
    public void tearDown() {
        if (this.stateNotificationExecutor != null) {
            this.stateNotificationExecutor.shutdownNow();
            this.stateNotificationExecutor = null;
        }
    }

    @Test
    public void testInvalidConstructorArg() {
        Assertions.assertThatThrownBy(() -> {
            createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds(), DataSize.ofBytes(0L));
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("maxBufferSize must be at least 1");
        Assertions.assertThatThrownBy(() -> {
            createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), DataSize.ofBytes(0L));
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("maxBufferSize must be at least 1");
    }

    @Test
    public void testSimple() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), BufferTestUtils.sizeOfPages(10));
        for (int i = 0; i < 3; i++) {
            addPage(createArbitraryBuffer, BufferTestUtils.createPage(i));
        }
        PipelinedOutputBuffers withBuffer = PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0);
        createArbitraryBuffer.setOutputBuffers(withBuffer);
        assertQueueState(createArbitraryBuffer, 3, FIRST, 0, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), BufferTestUtils.createPage(1), BufferTestUtils.createPage(2)));
        assertQueueState(createArbitraryBuffer, 0, FIRST, 3, 0);
        createArbitraryBuffer.get(FIRST, 3L, BufferTestUtils.sizeOfPages(1)).cancel(true);
        assertQueueState(createArbitraryBuffer, 0, FIRST, 0, 3);
        for (int i2 = 3; i2 < 13; i2++) {
            addPage(createArbitraryBuffer, BufferTestUtils.createPage(i2));
        }
        assertQueueState(createArbitraryBuffer, 9, FIRST, 1, 3);
        ListenableFuture<Void> enqueuePage = enqueuePage(createArbitraryBuffer, BufferTestUtils.createPage(13));
        Assert.assertFalse(enqueuePage.isDone());
        assertQueueState(createArbitraryBuffer, 10, FIRST, 1, 3);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 3L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), bufferResult(3L, BufferTestUtils.createPage(3), new Page[0]));
        assertQueueState(createArbitraryBuffer, 10, FIRST, 1, 3);
        Assert.assertFalse(enqueuePage.isDone());
        PipelinedOutputBuffers withBuffer2 = withBuffer.withBuffer(SECOND, 0);
        createArbitraryBuffer.setOutputBuffers(withBuffer2);
        assertQueueState(createArbitraryBuffer, 10, SECOND, 0, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, SECOND, 0L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(4), BufferTestUtils.createPage(5), BufferTestUtils.createPage(6), BufferTestUtils.createPage(7), BufferTestUtils.createPage(8), BufferTestUtils.createPage(9), BufferTestUtils.createPage(10), BufferTestUtils.createPage(11), BufferTestUtils.createPage(12), BufferTestUtils.createPage(13)));
        assertQueueState(createArbitraryBuffer, 0, SECOND, 10, 0);
        createArbitraryBuffer.get(SECOND, 10L, BufferTestUtils.sizeOfPages(10)).cancel(true);
        assertQueueState(createArbitraryBuffer, 0, SECOND, 0, 10);
        createArbitraryBuffer.setOutputBuffers(withBuffer2.withNoMoreBufferIds());
        assertQueueState(createArbitraryBuffer, 0, FIRST, 1, 3);
        assertQueueState(createArbitraryBuffer, 0, SECOND, 0, 10);
        BufferTestUtils.assertFutureIsDone(enqueuePage);
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(14));
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(15));
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(16));
        assertQueueState(createArbitraryBuffer, 2, FIRST, 1, 3);
        assertQueueState(createArbitraryBuffer, 2, SECOND, 1, 10);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, SECOND, 10L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), bufferResult(10L, BufferTestUtils.createPage(14), new Page[0]));
        assertQueueState(createArbitraryBuffer, 2, FIRST, 1, 3);
        assertQueueState(createArbitraryBuffer, 2, SECOND, 1, 10);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 4L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), bufferResult(4L, BufferTestUtils.createPage(15), BufferTestUtils.createPage(16)));
        assertQueueState(createArbitraryBuffer, 0, FIRST, 2, 4);
        assertQueueState(createArbitraryBuffer, 0, SECOND, 1, 10);
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.NO_MORE_BUFFERS);
        createArbitraryBuffer.setNoMorePages();
        assertQueueState(createArbitraryBuffer, 0, FIRST, 2, 4);
        assertQueueState(createArbitraryBuffer, 0, SECOND, 1, 10);
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.FLUSHING);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 6L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 6L, true));
        assertQueueState(createArbitraryBuffer, 0, FIRST, 0, 6);
        assertQueueState(createArbitraryBuffer, 0, SECOND, 1, 10);
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.FLUSHING);
        createArbitraryBuffer.destroy(FIRST);
        assertQueueClosed(createArbitraryBuffer, 0, FIRST, 6);
        assertQueueState(createArbitraryBuffer, 0, SECOND, 1, 10);
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.FLUSHING);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, SECOND, 11L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 11L, true));
        assertQueueState(createArbitraryBuffer, 0, SECOND, 0, 11);
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.FLUSHING);
        createArbitraryBuffer.destroy(SECOND);
        assertQueueClosed(createArbitraryBuffer, 0, FIRST, 6);
        assertQueueClosed(createArbitraryBuffer, 0, SECOND, 11);
        BufferTestUtils.assertFinished(createArbitraryBuffer);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 6L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 6L, true));
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, SECOND, 11L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 11L, true));
    }

    @Test
    public void testAcknowledge() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), BufferTestUtils.sizeOfPages(10));
        for (int i = 0; i < 3; i++) {
            addPage(createArbitraryBuffer, BufferTestUtils.createPage(i));
        }
        createArbitraryBuffer.setOutputBuffers(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0));
        assertQueueState(createArbitraryBuffer, 3, FIRST, 0, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), BufferTestUtils.createPage(1), BufferTestUtils.createPage(2)));
        BufferTestUtils.acknowledgeBufferResult(createArbitraryBuffer, FIRST, 2L);
        assertQueueState(createArbitraryBuffer, 0, FIRST, 1, 2);
        BufferTestUtils.acknowledgeBufferResult(createArbitraryBuffer, FIRST, 3L);
        assertQueueState(createArbitraryBuffer, 0, FIRST, 0, 3);
        try {
            BufferTestUtils.acknowledgeBufferResult(createArbitraryBuffer, FIRST, 4L);
        } catch (IllegalArgumentException e) {
            Assert.assertEquals(e.getMessage(), "Invalid sequence id");
        }
        for (int i2 = 3; i2 < 6; i2++) {
            addPage(createArbitraryBuffer, BufferTestUtils.createPage(i2));
        }
        assertQueueState(createArbitraryBuffer, 3, FIRST, 0, 3);
        createArbitraryBuffer.get(FIRST, 3L, BufferTestUtils.sizeOfPages(1)).cancel(true);
        assertQueueState(createArbitraryBuffer, 2, FIRST, 1, 3);
    }

    @Test
    public void testBufferFull() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), BufferTestUtils.sizeOfPages(2));
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(1));
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(2));
        enqueuePage(createArbitraryBuffer, BufferTestUtils.createPage(3));
    }

    @Test
    public void testDuplicateRequests() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(10));
        for (int i = 0; i < 3; i++) {
            addPage(createArbitraryBuffer, BufferTestUtils.createPage(i));
        }
        assertQueueState(createArbitraryBuffer, 3, FIRST, 0, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), BufferTestUtils.createPage(1), BufferTestUtils.createPage(2)));
        assertQueueState(createArbitraryBuffer, 0, FIRST, 3, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), BufferTestUtils.createPage(1), BufferTestUtils.createPage(2)));
        assertQueueState(createArbitraryBuffer, 0, FIRST, 3, 0);
        createArbitraryBuffer.get(FIRST, 3L, BufferTestUtils.sizeOfPages(10)).cancel(true);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, false));
        assertQueueState(createArbitraryBuffer, 0, FIRST, 0, 3);
    }

    @Test
    public void testAddQueueAfterCreation() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(10));
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.NO_MORE_BUFFERS);
        Assertions.assertThatThrownBy(() -> {
            createArbitraryBuffer.setOutputBuffers(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withBuffer(SECOND, 0).withNoMoreBufferIds());
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Expected buffer to not change after no more buffers is set");
    }

    @Test
    public void testAddAfterFinish() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(10));
        createArbitraryBuffer.setNoMorePages();
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(0));
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(1));
        Assert.assertEquals(createArbitraryBuffer.getInfo().getTotalPagesSent(), 0L);
    }

    @Test
    public void testAddQueueAfterNoMoreQueues() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), BufferTestUtils.sizeOfPages(10));
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.OPEN);
        createArbitraryBuffer.setOutputBuffers(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withNoMoreBufferIds());
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.NO_MORE_BUFFERS);
        createArbitraryBuffer.setOutputBuffers(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withNoMoreBufferIds());
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.NO_MORE_BUFFERS);
        createArbitraryBuffer.setOutputBuffers(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withNoMoreBufferIds());
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.NO_MORE_BUFFERS);
        PipelinedOutputBuffers withNoMoreBufferIds = PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds();
        Assertions.assertThatThrownBy(() -> {
            createArbitraryBuffer.setOutputBuffers(withNoMoreBufferIds);
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Expected buffer to not change after no more buffers is set");
    }

    @Test
    public void testAddAfterDestroy() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(10));
        createArbitraryBuffer.destroy();
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(0));
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(1));
        Assert.assertEquals(createArbitraryBuffer.getInfo().getTotalPagesSent(), 0L);
    }

    @Test
    public void testGetBeforeCreate() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), BufferTestUtils.sizeOfPages(10));
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.OPEN);
        ListenableFuture listenableFuture = createArbitraryBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(1));
        Assert.assertFalse(listenableFuture.isDone());
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(33));
        Assert.assertTrue(listenableFuture.isDone());
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture, BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(33), new Page[0]));
    }

    @Test
    public void testResumeFromPreviousPosition() {
        PipelinedOutputBuffers createInitial = PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY);
        PipelinedOutputBuffers.OutputBufferId[] outputBufferIdArr = new PipelinedOutputBuffers.OutputBufferId[5];
        for (int i = 0; i < outputBufferIdArr.length; i++) {
            outputBufferIdArr[i] = new PipelinedOutputBuffers.OutputBufferId(i);
            createInitial = createInitial.withBuffer(outputBufferIdArr[i], i);
        }
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(createInitial, BufferTestUtils.sizeOfPages(5));
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.OPEN);
        HashMap hashMap = new HashMap();
        for (PipelinedOutputBuffers.OutputBufferId outputBufferId : outputBufferIdArr) {
            hashMap.put(outputBufferId, createArbitraryBuffer.get(outputBufferId, 0L, BufferTestUtils.sizeOfPages(1)));
        }
        Assertions.assertThat(hashMap.values()).noneMatch((v0) -> {
            return v0.isDone();
        });
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < outputBufferIdArr.length; i2++) {
            addPage(createArbitraryBuffer, BufferTestUtils.createPage(33));
            Assertions.assertThat(arrayList).allMatch(listenableFuture -> {
                return !listenableFuture.isDone();
            }, "No secondary reads should complete until after all first reads");
            List list = (List) hashMap.entrySet().stream().filter(entry -> {
                return ((ListenableFuture) entry.getValue()).isDone();
            }).map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toList());
            Assert.assertEquals(list.size(), 1, "One completed buffer read per page addition");
            PipelinedOutputBuffers.OutputBufferId outputBufferId2 = (PipelinedOutputBuffers.OutputBufferId) list.get(0);
            arrayList.add(createArbitraryBuffer.get(outputBufferId2, BufferTestUtils.getFuture((ListenableFuture) hashMap.remove(outputBufferId2), BufferTestUtils.NO_WAIT).getNextToken(), BufferTestUtils.sizeOfPages(1)));
        }
        Assert.assertEquals(arrayList.size(), outputBufferIdArr.length);
        for (int i3 = 0; i3 < outputBufferIdArr.length; i3++) {
            addPage(createArbitraryBuffer, BufferTestUtils.createPage(33));
            Assert.assertTrue(((ListenableFuture) arrayList.get(i3)).isDone(), "Invalid second read completion order at index: " + i3);
        }
    }

    @Test
    public void testUseUndeclaredBufferAfterFinalBuffersSet() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(10));
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.NO_MORE_BUFFERS);
        Assertions.assertThatThrownBy(() -> {
            createArbitraryBuffer.get(SECOND, 0L, BufferTestUtils.sizeOfPages(1));
        }).isInstanceOf(IllegalStateException.class).hasMessage("No more buffers already set");
    }

    @Test
    public void testAbortBeforeCreate() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), BufferTestUtils.sizeOfPages(10));
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.OPEN);
        ListenableFuture listenableFuture = createArbitraryBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(1));
        Assert.assertFalse(listenableFuture.isDone());
        createArbitraryBuffer.destroy(FIRST);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture, BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, false));
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, true));
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(33));
        createArbitraryBuffer.setOutputBuffers(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0));
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, true));
    }

    @Test
    public void testFullBufferBlocksWriter() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withBuffer(SECOND, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(2));
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(1));
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(2));
        enqueuePage(createArbitraryBuffer, BufferTestUtils.createPage(3));
    }

    @Test
    public void testAbort() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), BufferTestUtils.sizeOfPages(10));
        for (int i = 0; i < 10; i++) {
            addPage(createArbitraryBuffer, BufferTestUtils.createPage(i));
        }
        createArbitraryBuffer.setNoMorePages();
        PipelinedOutputBuffers withBuffer = PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0);
        createArbitraryBuffer.setOutputBuffers(withBuffer);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        createArbitraryBuffer.destroy(FIRST);
        assertQueueClosed(createArbitraryBuffer, 9, FIRST, 0);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 1L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, true));
        createArbitraryBuffer.setOutputBuffers(withBuffer.withBuffer(SECOND, 0).withNoMoreBufferIds());
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, SECOND, 0L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(1), new Page[0]));
        createArbitraryBuffer.destroy(SECOND);
        assertQueueClosed(createArbitraryBuffer, 0, SECOND, 0);
        BufferTestUtils.assertFinished(createArbitraryBuffer);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, SECOND, 1L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, true));
    }

    @Test
    public void testFinishClosesEmptyQueues() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withBuffer(SECOND, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(10));
        createArbitraryBuffer.setNoMorePages();
        assertQueueState(createArbitraryBuffer, 0, FIRST, 0, 0);
        assertQueueState(createArbitraryBuffer, 0, SECOND, 0, 0);
        createArbitraryBuffer.destroy(FIRST);
        createArbitraryBuffer.destroy(SECOND);
        assertQueueClosed(createArbitraryBuffer, 0, FIRST, 0);
        assertQueueClosed(createArbitraryBuffer, 0, SECOND, 0);
    }

    @Test
    public void testAbortFreesReader() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), BufferTestUtils.sizeOfPages(10));
        createArbitraryBuffer.setOutputBuffers(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0));
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.OPEN);
        ListenableFuture listenableFuture = createArbitraryBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(0));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture, BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        ListenableFuture listenableFuture2 = createArbitraryBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture2.isDone());
        createArbitraryBuffer.destroy(FIRST);
        assertQueueClosed(createArbitraryBuffer, 0, FIRST, 1);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture2, BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 1L, false));
    }

    @Test
    public void testFinishFreesReader() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), BufferTestUtils.sizeOfPages(10));
        createArbitraryBuffer.setOutputBuffers(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0));
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.OPEN);
        ListenableFuture listenableFuture = createArbitraryBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(0));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture, BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        ListenableFuture listenableFuture2 = createArbitraryBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture2.isDone());
        assertQueueState(createArbitraryBuffer, 0, FIRST, 0, 1);
        createArbitraryBuffer.destroy(FIRST);
        assertQueueClosed(createArbitraryBuffer, 0, FIRST, 1);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture2, BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 1L, false));
    }

    @Test
    public void testFinishFreesWriter() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), BufferTestUtils.sizeOfPages(5));
        createArbitraryBuffer.setOutputBuffers(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds());
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.NO_MORE_BUFFERS);
        for (int i = 0; i < 5; i++) {
            addPage(createArbitraryBuffer, BufferTestUtils.createPage(i));
        }
        ListenableFuture<Void> enqueuePage = enqueuePage(createArbitraryBuffer, BufferTestUtils.createPage(5));
        ListenableFuture<Void> enqueuePage2 = enqueuePage(createArbitraryBuffer, BufferTestUtils.createPage(6));
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.MAX_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        createArbitraryBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(100)).cancel(true);
        Assert.assertFalse(enqueuePage.isDone());
        Assert.assertFalse(enqueuePage2.isDone());
        createArbitraryBuffer.setNoMorePages();
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.FLUSHING);
        BufferTestUtils.assertFutureIsDone(enqueuePage);
        BufferTestUtils.assertFutureIsDone(enqueuePage2);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 1L, BufferTestUtils.sizeOfPages(100), BufferTestUtils.NO_WAIT), bufferResult(1L, BufferTestUtils.createPage(1), BufferTestUtils.createPage(2), BufferTestUtils.createPage(3), BufferTestUtils.createPage(4), BufferTestUtils.createPage(5), BufferTestUtils.createPage(6)));
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 7L, BufferTestUtils.sizeOfPages(100), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 7L, true));
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.FLUSHING);
        createArbitraryBuffer.destroy(FIRST);
        BufferTestUtils.assertFinished(createArbitraryBuffer);
    }

    @Test
    public void testDestroyFreesReader() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), BufferTestUtils.sizeOfPages(5));
        createArbitraryBuffer.setOutputBuffers(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds());
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.NO_MORE_BUFFERS);
        ListenableFuture listenableFuture = createArbitraryBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(0));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture, BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        ListenableFuture listenableFuture2 = createArbitraryBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture2.isDone());
        createArbitraryBuffer.destroy();
        assertQueueClosed(createArbitraryBuffer, 0, FIRST, 1);
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture2, BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 1L, false));
    }

    @Test
    public void testDestroyFreesWriter() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), BufferTestUtils.sizeOfPages(5));
        createArbitraryBuffer.setOutputBuffers(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds());
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.NO_MORE_BUFFERS);
        for (int i = 0; i < 5; i++) {
            addPage(createArbitraryBuffer, BufferTestUtils.createPage(i));
        }
        ListenableFuture<Void> enqueuePage = enqueuePage(createArbitraryBuffer, BufferTestUtils.createPage(5));
        ListenableFuture<Void> enqueuePage2 = enqueuePage(createArbitraryBuffer, BufferTestUtils.createPage(6));
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.MAX_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        createArbitraryBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(1)).cancel(true);
        Assert.assertFalse(enqueuePage.isDone());
        Assert.assertFalse(enqueuePage2.isDone());
        createArbitraryBuffer.destroy();
        BufferTestUtils.assertFinished(createArbitraryBuffer);
        BufferTestUtils.assertFutureIsDone(enqueuePage);
        BufferTestUtils.assertFutureIsDone(enqueuePage2);
    }

    @Test
    public void testFailDoesNotFreeReader() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(5));
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.NO_MORE_BUFFERS);
        ListenableFuture listenableFuture = createArbitraryBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(0));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture, BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        ListenableFuture listenableFuture2 = createArbitraryBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture2.isDone());
        createArbitraryBuffer.abort();
        Assert.assertFalse(listenableFuture2.isDone());
        Assert.assertFalse(createArbitraryBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(10)).isDone());
    }

    @Test
    public void testFailFreesWriter() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds(), BufferTestUtils.sizeOfPages(5));
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.NO_MORE_BUFFERS);
        for (int i = 0; i < 5; i++) {
            addPage(createArbitraryBuffer, BufferTestUtils.createPage(i));
        }
        ListenableFuture<Void> enqueuePage = enqueuePage(createArbitraryBuffer, BufferTestUtils.createPage(5));
        ListenableFuture<Void> enqueuePage2 = enqueuePage(createArbitraryBuffer, BufferTestUtils.createPage(6));
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.MAX_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        createArbitraryBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(1)).cancel(true);
        Assert.assertFalse(enqueuePage.isDone());
        Assert.assertFalse(enqueuePage2.isDone());
        createArbitraryBuffer.abort();
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.ABORTED);
        BufferTestUtils.assertFutureIsDone(enqueuePage);
        BufferTestUtils.assertFutureIsDone(enqueuePage2);
    }

    @Test
    public void testAddBufferAfterFail() {
        PipelinedOutputBuffers withBuffer = PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0);
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(withBuffer, BufferTestUtils.sizeOfPages(5));
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.OPEN);
        ListenableFuture listenableFuture = createArbitraryBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(0));
        BufferTestUtils.assertBufferResultEquals(TYPES, BufferTestUtils.getFuture(listenableFuture, BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        createArbitraryBuffer.abort();
        PipelinedOutputBuffers withBuffer2 = withBuffer.withBuffer(SECOND, 0);
        createArbitraryBuffer.setOutputBuffers(withBuffer2);
        Assert.assertFalse(createArbitraryBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(10)).isDone());
        Assert.assertFalse(createArbitraryBuffer.get(SECOND, 0L, BufferTestUtils.sizeOfPages(10)).isDone());
        createArbitraryBuffer.setOutputBuffers(withBuffer2.withNoMoreBufferIds());
        Assert.assertFalse(createArbitraryBuffer.get(FIRST, 1L, BufferTestUtils.sizeOfPages(10)).isDone());
        Assert.assertFalse(createArbitraryBuffer.get(SECOND, 0L, BufferTestUtils.sizeOfPages(10)).isDone());
    }

    @Test
    public void testBufferCompletion() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), BufferTestUtils.sizeOfPages(5));
        createArbitraryBuffer.setOutputBuffers(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0).withNoMoreBufferIds());
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.NO_MORE_BUFFERS);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            Page createPage = BufferTestUtils.createPage(i);
            addPage(createArbitraryBuffer, createPage);
            arrayList.add(createPage);
        }
        createArbitraryBuffer.setNoMorePages();
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(5), BufferTestUtils.MAX_WAIT), BufferTestUtils.createBufferResult(TASK_INSTANCE_ID, 0L, arrayList));
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.FLUSHING);
        createArbitraryBuffer.destroy(FIRST);
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.FINISHED);
    }

    @Test
    public void testNoMorePagesFreesReader() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), BufferTestUtils.sizeOfPages(10));
        createArbitraryBuffer.setOutputBuffers(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0));
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.OPEN);
        ListenableFuture listenableFuture = createArbitraryBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(10));
        Assert.assertFalse(listenableFuture.isDone());
        createArbitraryBuffer.setNoMorePages();
        Assert.assertTrue(listenableFuture.isDone());
        Assert.assertTrue(createArbitraryBuffer.get(FIRST, 0L, BufferTestUtils.sizeOfPages(10)).isDone());
    }

    @Test
    public void testFinishBeforeNoMoreBuffers() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), BufferTestUtils.sizeOfPages(10));
        for (int i = 0; i < 3; i++) {
            addPage(createArbitraryBuffer, BufferTestUtils.createPage(i));
        }
        createArbitraryBuffer.setNoMorePages();
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.NO_MORE_PAGES);
        PipelinedOutputBuffers withBuffer = PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY).withBuffer(FIRST, 0);
        createArbitraryBuffer.setOutputBuffers(withBuffer);
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.NO_MORE_PAGES);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 0L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), bufferResult(0L, BufferTestUtils.createPage(0), new Page[0]));
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.NO_MORE_PAGES);
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 1L, BufferTestUtils.sizeOfPages(10), BufferTestUtils.NO_WAIT), bufferResult(1L, BufferTestUtils.createPage(1), BufferTestUtils.createPage(2)));
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, FIRST, 3L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 3L, true));
        Assert.assertEquals(createArbitraryBuffer.getState(), BufferState.NO_MORE_PAGES);
        createArbitraryBuffer.destroy(FIRST);
        assertQueueClosed(createArbitraryBuffer, 0, FIRST, 3);
        BufferTestUtils.assertFinished(createArbitraryBuffer);
        createArbitraryBuffer.setOutputBuffers(withBuffer.withBuffer(SECOND, 0));
        BufferTestUtils.assertBufferResultEquals(TYPES, getBufferResult(createArbitraryBuffer, SECOND, 0L, BufferTestUtils.sizeOfPages(1), BufferTestUtils.NO_WAIT), BufferResult.emptyResults(TASK_INSTANCE_ID, 0L, true));
    }

    @Test
    public void testForceFreeMemory() {
        ArbitraryOutputBuffer createArbitraryBuffer = createArbitraryBuffer(PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY), BufferTestUtils.sizeOfPages(10));
        for (int i = 0; i < 3; i++) {
            addPage(createArbitraryBuffer, BufferTestUtils.createPage(i));
        }
        OutputBufferMemoryManager memoryManager = createArbitraryBuffer.getMemoryManager();
        Assert.assertTrue(memoryManager.getBufferedBytes() > 0);
        createArbitraryBuffer.forceFreeMemory();
        Assert.assertEquals(memoryManager.getBufferedBytes(), 0L);
        addPage(createArbitraryBuffer, BufferTestUtils.createPage(1));
        Assert.assertEquals(memoryManager.getBufferedBytes(), 0L);
    }

    private static BufferResult getBufferResult(OutputBuffer outputBuffer, PipelinedOutputBuffers.OutputBufferId outputBufferId, long j, DataSize dataSize, Duration duration) {
        return BufferTestUtils.getFuture(outputBuffer.get(outputBufferId, j, dataSize), duration);
    }

    private static ListenableFuture<Void> enqueuePage(OutputBuffer outputBuffer, Page page) {
        outputBuffer.enqueue(ImmutableList.of(BufferTestUtils.serializePage(page)));
        ListenableFuture<Void> isFull = outputBuffer.isFull();
        Assert.assertFalse(isFull.isDone());
        return isFull;
    }

    private static void addPage(OutputBuffer outputBuffer, Page page) {
        outputBuffer.enqueue(ImmutableList.of(BufferTestUtils.serializePage(page)));
        Assert.assertTrue(outputBuffer.isFull().isDone(), "Expected add page to not block");
    }

    private static void assertQueueState(OutputBuffer outputBuffer, int i, PipelinedOutputBuffers.OutputBufferId outputBufferId, int i2, int i3) {
        OutputBufferInfo info = outputBuffer.getInfo();
        Assert.assertEquals(info.getTotalBufferedPages() - ((List) info.getPipelinedBufferStates().orElse(ImmutableList.of())).stream().mapToInt((v0) -> {
            return v0.getBufferedPages();
        }).sum(), i, "unassignedPages");
        Assert.assertEquals((PipelinedBufferInfo) ((List) info.getPipelinedBufferStates().orElse(ImmutableList.of())).stream().filter(pipelinedBufferInfo -> {
            return pipelinedBufferInfo.getBufferId().equals(outputBufferId);
        }).findAny().orElse(null), new PipelinedBufferInfo(outputBufferId, i2 + i3, i2 + i3, i2, BufferTestUtils.sizeOfPages(i2).toBytes(), i3, false));
    }

    private static void assertQueueClosed(OutputBuffer outputBuffer, int i, PipelinedOutputBuffers.OutputBufferId outputBufferId, int i2) {
        OutputBufferInfo info = outputBuffer.getInfo();
        Assert.assertEquals(info.getTotalBufferedPages() - ((List) info.getPipelinedBufferStates().orElse(ImmutableList.of())).stream().mapToInt((v0) -> {
            return v0.getBufferedPages();
        }).sum(), i, "unassignedPages");
        PipelinedBufferInfo pipelinedBufferInfo = (PipelinedBufferInfo) ((List) info.getPipelinedBufferStates().orElse(ImmutableList.of())).stream().filter(pipelinedBufferInfo2 -> {
            return pipelinedBufferInfo2.getBufferId().equals(outputBufferId);
        }).findAny().orElse(null);
        Assert.assertEquals(pipelinedBufferInfo.getBufferedPages(), 0);
        Assert.assertEquals(pipelinedBufferInfo.getPagesSent(), i2);
        Assert.assertTrue(pipelinedBufferInfo.isFinished());
    }

    private ArbitraryOutputBuffer createArbitraryBuffer(OutputBuffers outputBuffers, DataSize dataSize) {
        ArbitraryOutputBuffer arbitraryOutputBuffer = new ArbitraryOutputBuffer(TASK_INSTANCE_ID, new OutputBufferStateMachine(new TaskId(new StageId(new QueryId("query"), 0), 0, 0), this.stateNotificationExecutor), dataSize, () -> {
            return new SimpleLocalMemoryContext(AggregatedMemoryContext.newSimpleAggregatedMemoryContext(), "test");
        }, this.stateNotificationExecutor);
        arbitraryOutputBuffer.setOutputBuffers(outputBuffers);
        return arbitraryOutputBuffer;
    }

    private static BufferResult bufferResult(long j, Page page, Page... pageArr) {
        return BufferTestUtils.createBufferResult(TASK_INSTANCE_ID, j, ImmutableList.builder().add(page).add(pageArr).build());
    }
}
