/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.buffer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;

public class LocalBufferPoolTest
extends TestLogger {
    private static final int numBuffers = 1024;
    private static final int memorySegmentSize = 128;
    private NetworkBufferPool networkBufferPool;
    private BufferPool localBufferPool;
    private static final ExecutorService executor = Executors.newCachedThreadPool();

    @Before
    public void setupLocalBufferPool() {
        this.networkBufferPool = new NetworkBufferPool(1024, 128, 1);
        this.localBufferPool = new LocalBufferPool(this.networkBufferPool, 1);
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
    }

    @After
    public void destroyAndVerifyAllBuffersReturned() throws IOException {
        if (!this.localBufferPool.isDestroyed()) {
            this.localBufferPool.lazyDestroy();
        }
        String msg = "Did not return all buffers to memory segment pool after test.";
        Assert.assertEquals((String)msg, (long)1024L, (long)this.networkBufferPool.getNumberOfAvailableMemorySegments());
        this.networkBufferPool.destroyAllBufferPools();
        this.networkBufferPool.destroy();
    }

    @AfterClass
    public static void shutdownExecutor() {
        executor.shutdownNow();
    }

    @Test
    public void testRequestMoreThanAvailable() throws IOException {
        this.localBufferPool.setNumBuffers(1024);
        ArrayList<Buffer> requests = new ArrayList<Buffer>(1024);
        for (int i = 1; i <= 1024; ++i) {
            Buffer buffer2 = this.localBufferPool.requestBuffer();
            Assert.assertEquals((long)i, (long)this.getNumRequestedFromMemorySegmentPool());
            Assert.assertNotNull((Object)buffer2);
            requests.add(buffer2);
        }
        Buffer buffer = this.localBufferPool.requestBuffer();
        Assert.assertEquals((long)1024L, (long)this.getNumRequestedFromMemorySegmentPool());
        Assert.assertNull((Object)buffer);
        for (Buffer buffer2 : requests) {
            buffer2.recycleBuffer();
        }
    }

    @Test
    public void testRequestAfterDestroy() throws IOException {
        this.localBufferPool.lazyDestroy();
        try {
            this.localBufferPool.requestBuffer();
            Assert.fail((String)"Call should have failed with an IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testRecycleAfterDestroy() throws IOException {
        this.localBufferPool.setNumBuffers(1024);
        ArrayList<Buffer> requests = new ArrayList<Buffer>(1024);
        for (int i = 0; i < 1024; ++i) {
            requests.add(this.localBufferPool.requestBuffer());
        }
        this.localBufferPool.lazyDestroy();
        Assert.assertEquals((long)1024L, (long)this.getNumRequestedFromMemorySegmentPool());
        for (Buffer buffer : requests) {
            buffer.recycleBuffer();
        }
    }

    @Test
    public void testRecycleExcessBuffersAfterRecycling() throws Exception {
        int i;
        this.localBufferPool.setNumBuffers(1024);
        ArrayList<Buffer> requests = new ArrayList<Buffer>(1024);
        for (i = 1; i <= 1024; ++i) {
            requests.add(this.localBufferPool.requestBuffer());
        }
        Assert.assertEquals((long)1024L, (long)this.getNumRequestedFromMemorySegmentPool());
        this.localBufferPool.setNumBuffers(512);
        Assert.assertEquals((long)1024L, (long)this.getNumRequestedFromMemorySegmentPool());
        for (i = 1; i < 512; ++i) {
            ((Buffer)requests.remove(0)).recycleBuffer();
            Assert.assertEquals((long)(1024 - i), (long)this.getNumRequestedFromMemorySegmentPool());
        }
        for (Buffer buffer : requests) {
            buffer.recycleBuffer();
        }
    }

    @Test
    public void testRecycleExcessBuffersAfterChangingNumBuffers() throws Exception {
        this.localBufferPool.setNumBuffers(1024);
        ArrayList<Buffer> requests = new ArrayList<Buffer>(1024);
        for (int i = 1; i <= 1024; ++i) {
            requests.add(this.localBufferPool.requestBuffer());
        }
        for (Buffer buffer : requests) {
            buffer.recycleBuffer();
        }
        Assert.assertEquals((long)1024L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        this.localBufferPool.setNumBuffers(512);
        Assert.assertEquals((long)512L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
    }

    @Test(expected=IllegalArgumentException.class)
    public void testSetLessThanRequiredNumBuffers() throws IOException {
        this.localBufferPool.setNumBuffers(1);
        this.localBufferPool.setNumBuffers(0);
    }

    @Test
    public void testPendingRequestWithListenersAfterRecycle() throws Exception {
        BufferListener twoTimesListener = this.createBufferListener(2);
        BufferListener oneTimeListener = this.createBufferListener(1);
        this.localBufferPool.setNumBuffers(2);
        Buffer available1 = this.localBufferPool.requestBuffer();
        Buffer available2 = this.localBufferPool.requestBuffer();
        Assert.assertNull((Object)this.localBufferPool.requestBuffer());
        Assert.assertTrue((boolean)this.localBufferPool.addBufferListener(twoTimesListener));
        Assert.assertTrue((boolean)this.localBufferPool.addBufferListener(oneTimeListener));
        available1.recycleBuffer();
        ((BufferListener)Mockito.verify((Object)oneTimeListener, (VerificationMode)Mockito.times((int)1))).notifyBufferAvailable((Buffer)Matchers.any(Buffer.class));
        ((BufferListener)Mockito.verify((Object)twoTimesListener, (VerificationMode)Mockito.times((int)1))).notifyBufferAvailable((Buffer)Matchers.any(Buffer.class));
        available2.recycleBuffer();
        ((BufferListener)Mockito.verify((Object)oneTimeListener, (VerificationMode)Mockito.times((int)1))).notifyBufferAvailable((Buffer)Matchers.any(Buffer.class));
        ((BufferListener)Mockito.verify((Object)twoTimesListener, (VerificationMode)Mockito.times((int)2))).notifyBufferAvailable((Buffer)Matchers.any(Buffer.class));
    }

    @Test
    public void testCancelPendingRequestsAfterDestroy() throws IOException {
        BufferListener listener = (BufferListener)Mockito.mock(BufferListener.class);
        this.localBufferPool.setNumBuffers(1);
        Buffer available = this.localBufferPool.requestBuffer();
        Buffer unavailable = this.localBufferPool.requestBuffer();
        Assert.assertNull((Object)unavailable);
        this.localBufferPool.addBufferListener(listener);
        this.localBufferPool.lazyDestroy();
        available.recycleBuffer();
        ((BufferListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).notifyBufferDestroyed();
    }

    @Test
    public void testConcurrentRequestRecycle() throws ExecutionException, InterruptedException, IOException {
        int i;
        int numConcurrentTasks = 128;
        int numBuffersToRequestPerTask = 1024;
        this.localBufferPool.setNumBuffers(numConcurrentTasks);
        Future[] taskResults = new Future[numConcurrentTasks];
        for (i = 0; i < numConcurrentTasks; ++i) {
            taskResults[i] = executor.submit(new BufferRequesterTask((BufferProvider)this.localBufferPool, numBuffersToRequestPerTask));
        }
        for (i = 0; i < numConcurrentTasks; ++i) {
            Assert.assertTrue((boolean)((Boolean)taskResults[i].get()));
        }
    }

    @Test
    public void testDestroyDuringBlockingRequest() throws Exception {
        boolean numberOfBuffers = true;
        this.localBufferPool.setNumBuffers(1);
        final CountDownLatch sync = new CountDownLatch(1);
        Callable<List<Buffer>> requester = new Callable<List<Buffer>>(){

            @Override
            public List<Buffer> call() throws Exception {
                ArrayList requested = Lists.newArrayList();
                for (int i = 0; i < 1; ++i) {
                    Buffer buffer = (Buffer)Preconditions.checkNotNull((Object)LocalBufferPoolTest.this.localBufferPool.requestBuffer());
                    requested.add(buffer);
                }
                sync.countDown();
                try {
                    LocalBufferPoolTest.this.localBufferPool.requestBufferBuilderBlocking();
                    Assert.fail((String)"Call should have failed with an IllegalStateException");
                }
                catch (IllegalStateException illegalStateException) {
                    // empty catch block
                }
                return requested;
            }
        };
        Future<List<Buffer>> f = executor.submit(requester);
        sync.await();
        this.localBufferPool.lazyDestroy();
        Thread.sleep(50L);
        List<Buffer> requestedBuffers = f.get(60L, TimeUnit.SECONDS);
        for (Buffer buffer : requestedBuffers) {
            buffer.recycleBuffer();
        }
    }

    @Test
    public void testBoundedBuffer() throws Exception {
        this.localBufferPool.lazyDestroy();
        this.localBufferPool = new LocalBufferPool(this.networkBufferPool, 1, 2);
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        Assert.assertEquals((long)2L, (long)this.localBufferPool.getMaxNumberOfMemorySegments());
        this.localBufferPool.setNumBuffers(1);
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        Buffer buffer1 = this.localBufferPool.requestBuffer();
        Assert.assertNotNull((Object)buffer1);
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        Assert.assertNull((Object)this.localBufferPool.requestBuffer());
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer1.recycleBuffer();
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        this.localBufferPool.setNumBuffers(2);
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer1 = this.localBufferPool.requestBuffer();
        Assert.assertNotNull((Object)buffer1);
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        Buffer buffer2 = this.localBufferPool.requestBuffer();
        Assert.assertNotNull((Object)buffer2);
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        Assert.assertNull((Object)this.localBufferPool.requestBuffer());
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer1.recycleBuffer();
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer2.recycleBuffer();
        Assert.assertEquals((long)2L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        this.localBufferPool.setNumBuffers(3);
        Assert.assertEquals((long)2L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer1 = this.localBufferPool.requestBuffer();
        Assert.assertNotNull((Object)buffer1);
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer2 = this.localBufferPool.requestBuffer();
        Assert.assertNotNull((Object)buffer2);
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        Assert.assertNull((Object)this.localBufferPool.requestBuffer());
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer1.recycleBuffer();
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer2.recycleBuffer();
        Assert.assertEquals((long)2L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        this.localBufferPool.setNumBuffers(1);
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer1 = this.localBufferPool.requestBuffer();
        Assert.assertNotNull((Object)buffer1);
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        Assert.assertNull((Object)this.localBufferPool.requestBuffer());
        buffer1.recycleBuffer();
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
    }

    @Test
    public void testMaxBuffersPerChannelAndAvailability() throws IOException, InterruptedException {
        this.localBufferPool.lazyDestroy();
        this.localBufferPool = new LocalBufferPool(this.networkBufferPool, 1, Integer.MAX_VALUE, null, 3, 1);
        this.localBufferPool.setNumBuffers(10);
        Assert.assertTrue((boolean)this.localBufferPool.getAvailableFuture().isDone());
        BufferBuilder bufferBuilder01 = this.localBufferPool.requestBufferBuilderBlocking(0);
        BufferBuilder bufferBuilder11 = this.localBufferPool.requestBufferBuilderBlocking(1);
        Assert.assertTrue((boolean)this.localBufferPool.getAvailableFuture().isDone());
        BufferBuilder bufferBuilder02 = this.localBufferPool.requestBufferBuilderBlocking(0);
        Assert.assertFalse((boolean)this.localBufferPool.getAvailableFuture().isDone());
        BufferBuilder bufferBuilder03 = this.localBufferPool.requestBufferBuilderBlocking(0);
        BufferBuilder bufferBuilder21 = this.localBufferPool.requestBufferBuilderBlocking(2);
        BufferBuilder bufferBuilder22 = this.localBufferPool.requestBufferBuilderBlocking(2);
        Assert.assertFalse((boolean)this.localBufferPool.getAvailableFuture().isDone());
        bufferBuilder11.getRecycler().recycle(bufferBuilder11.getMemorySegment());
        Assert.assertFalse((boolean)this.localBufferPool.getAvailableFuture().isDone());
        bufferBuilder21.getRecycler().recycle(bufferBuilder21.getMemorySegment());
        Assert.assertFalse((boolean)this.localBufferPool.getAvailableFuture().isDone());
        bufferBuilder02.getRecycler().recycle(bufferBuilder02.getMemorySegment());
        Assert.assertFalse((boolean)this.localBufferPool.getAvailableFuture().isDone());
        bufferBuilder01.getRecycler().recycle(bufferBuilder01.getMemorySegment());
        Assert.assertTrue((boolean)this.localBufferPool.getAvailableFuture().isDone());
        bufferBuilder03.getRecycler().recycle(bufferBuilder03.getMemorySegment());
        Assert.assertTrue((boolean)this.localBufferPool.getAvailableFuture().isDone());
        bufferBuilder22.getRecycler().recycle(bufferBuilder22.getMemorySegment());
        Assert.assertTrue((boolean)this.localBufferPool.getAvailableFuture().isDone());
    }

    @Test
    public void testIsAvailableOrNot() throws Exception {
        Assert.assertTrue((boolean)this.localBufferPool.getAvailableFuture().isDone());
        BufferBuilder bufferBuilder = (BufferBuilder)Preconditions.checkNotNull((Object)this.localBufferPool.requestBufferBuilderBlocking());
        CompletableFuture availableFuture = this.localBufferPool.getAvailableFuture();
        Assert.assertFalse((boolean)availableFuture.isDone());
        this.localBufferPool.setNumBuffers(2);
        Assert.assertTrue((boolean)availableFuture.isDone());
        Assert.assertTrue((boolean)this.localBufferPool.getAvailableFuture().isDone());
        ArrayList<Object> segments = new ArrayList<Object>(1024);
        while (this.networkBufferPool.getNumberOfAvailableMemorySegments() > 0) {
            segments.add(Preconditions.checkNotNull((Object)this.networkBufferPool.requestMemorySegment()));
        }
        Assert.assertFalse((boolean)this.localBufferPool.getAvailableFuture().isDone());
        for (MemorySegment memorySegment : segments) {
            this.networkBufferPool.recycle(memorySegment);
        }
        Assert.assertTrue((boolean)this.localBufferPool.getAvailableFuture().isDone());
        this.localBufferPool.setNumBuffers(1);
        availableFuture = this.localBufferPool.getAvailableFuture();
        Assert.assertFalse((boolean)availableFuture.isDone());
        bufferBuilder.createBufferConsumer().close();
        Assert.assertTrue((boolean)this.localBufferPool.getAvailableFuture().isDone());
        Assert.assertTrue((boolean)availableFuture.isDone());
    }

    private int getNumRequestedFromMemorySegmentPool() {
        return this.networkBufferPool.getTotalNumberOfMemorySegments() - this.networkBufferPool.getNumberOfAvailableMemorySegments();
    }

    private BufferListener createBufferListener(final int notificationTimes) {
        return (BufferListener)PowerMockito.spy((Object)new BufferListener(){
            AtomicInteger times = new AtomicInteger(0);

            public BufferListener.NotificationResult notifyBufferAvailable(Buffer buffer) {
                int newCount = this.times.incrementAndGet();
                buffer.recycleBuffer();
                if (newCount < notificationTimes) {
                    return BufferListener.NotificationResult.BUFFER_USED_NEED_MORE;
                }
                return BufferListener.NotificationResult.BUFFER_USED_NO_NEED_MORE;
            }

            public void notifyBufferDestroyed() {
            }
        });
    }

    private static class BufferRequesterTask
    implements Callable<Boolean> {
        private final BufferProvider bufferProvider;
        private final int numBuffersToRequest;

        private BufferRequesterTask(BufferProvider bufferProvider, int numBuffersToRequest) {
            this.bufferProvider = bufferProvider;
            this.numBuffersToRequest = numBuffersToRequest;
        }

        @Override
        public Boolean call() throws Exception {
            try {
                for (int i = 0; i < this.numBuffersToRequest; ++i) {
                    Buffer buffer = (Buffer)Preconditions.checkNotNull((Object)this.bufferProvider.requestBuffer());
                    buffer.recycleBuffer();
                }
            }
            catch (Throwable t) {
                return false;
            }
            return true;
        }
    }
}

