/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.buffer;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsCollectionContaining;
import org.hamcrest.core.IsNot;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class NetworkBufferPoolTest
extends TestLogger {
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Test
    public void testCreatePoolAfterDestroy() {
        try {
            int bufferSize = 128;
            int numBuffers = 10;
            NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 1);
            Assert.assertEquals((long)10L, (long)globalPool.getTotalNumberOfMemorySegments());
            Assert.assertEquals((long)10L, (long)globalPool.getNumberOfAvailableMemorySegments());
            Assert.assertEquals((long)0L, (long)globalPool.getNumberOfRegisteredBufferPools());
            globalPool.destroy();
            Assert.assertTrue((boolean)globalPool.isDestroyed());
            try {
                globalPool.createBufferPool(2, 2);
                Assert.fail((String)"Should throw an IllegalStateException");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            try {
                globalPool.createBufferPool(2, 10);
                Assert.fail((String)"Should throw an IllegalStateException");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            try {
                globalPool.createBufferPool(2, Integer.MAX_VALUE);
                Assert.fail((String)"Should throw an IllegalStateException");
            }
            catch (IllegalStateException illegalStateException) {}
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testDestroyAll() {
        try {
            NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 1);
            BufferPool fixedPool = globalPool.createBufferPool(2, 2);
            BufferPool boundedPool = globalPool.createBufferPool(0, 1);
            BufferPool nonFixedPool = globalPool.createBufferPool(5, Integer.MAX_VALUE);
            Assert.assertEquals((long)2L, (long)fixedPool.getNumberOfRequiredMemorySegments());
            Assert.assertEquals((long)0L, (long)boundedPool.getNumberOfRequiredMemorySegments());
            Assert.assertEquals((long)5L, (long)nonFixedPool.getNumberOfRequiredMemorySegments());
            ArrayList<Buffer> buffers = new ArrayList<Buffer>(globalPool.getTotalNumberOfMemorySegments());
            block8: for (int i = 0; i < 10; ++i) {
                for (BufferPool bp : new BufferPool[]{fixedPool, boundedPool, nonFixedPool}) {
                    Buffer buffer = bp.requestBuffer();
                    if (buffer == null) continue;
                    Assert.assertNotNull((Object)buffer.getMemorySegment());
                    buffers.add(buffer);
                    continue block8;
                }
            }
            Assert.assertEquals((long)globalPool.getTotalNumberOfMemorySegments(), (long)buffers.size());
            Assert.assertNull((Object)fixedPool.requestBuffer());
            Assert.assertNull((Object)boundedPool.requestBuffer());
            Assert.assertNull((Object)nonFixedPool.requestBuffer());
            globalPool.destroyAllBufferPools();
            Assert.assertFalse((boolean)globalPool.isDestroyed());
            Assert.assertTrue((boolean)fixedPool.isDestroyed());
            Assert.assertTrue((boolean)boundedPool.isDestroyed());
            Assert.assertTrue((boolean)nonFixedPool.isDestroyed());
            Assert.assertEquals((long)0L, (long)globalPool.getNumberOfRegisteredBufferPools());
            Assert.assertEquals((long)0L, (long)globalPool.getNumberOfAvailableMemorySegments());
            for (Buffer b : buffers) {
                b.recycleBuffer();
            }
            Assert.assertEquals((long)globalPool.getTotalNumberOfMemorySegments(), (long)globalPool.getNumberOfAvailableMemorySegments());
            try {
                fixedPool.requestBuffer();
                Assert.fail((String)"Should fail with an IllegalStateException");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            try {
                boundedPool.requestBuffer();
                Assert.fail((String)"Should fail with an IllegalStateException");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            try {
                nonFixedPool.requestBuffer();
                Assert.fail((String)"Should fail with an IllegalStateException");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            Assert.assertNotNull((Object)globalPool.createBufferPool(10, Integer.MAX_VALUE));
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
        int numBuffers = 10;
        NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 5);
        List memorySegments = Collections.emptyList();
        try {
            memorySegments = globalPool.requestMemorySegments();
            Assert.assertEquals((long)memorySegments.size(), (long)5L);
            globalPool.recycleMemorySegments((Collection)memorySegments);
            memorySegments.clear();
            Assert.assertEquals((long)globalPool.getNumberOfAvailableMemorySegments(), (long)10L);
        }
        finally {
            globalPool.recycleMemorySegments((Collection)memorySegments);
            globalPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception {
        int numBuffers = 10;
        NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 11);
        try {
            globalPool.requestMemorySegments();
            Assert.fail((String)"Should throw an IOException");
        }
        catch (IOException e) {
            Assert.assertEquals((long)globalPool.getNumberOfAvailableMemorySegments(), (long)10L);
        }
        finally {
            globalPool.destroy();
        }
    }

    @Test(expected=IllegalArgumentException.class)
    public void testRequestMemorySegmentsWithInvalidArgument() {
        NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 0);
        globalPool.destroy();
        Assert.fail((String)"Should throw an IllegalArgumentException");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException {
        BufferPool lbp1;
        List memorySegments;
        NetworkBufferPool networkBufferPool;
        block6: {
            int numBuffers = 10;
            networkBufferPool = new NetworkBufferPool(10, 128, 5);
            ArrayList<Buffer> buffers = new ArrayList<Buffer>(10);
            memorySegments = Collections.emptyList();
            Thread bufferRecycler = null;
            lbp1 = null;
            try {
                lbp1 = networkBufferPool.createBufferPool(5, 10);
                for (int i = 0; i < 10; ++i) {
                    Buffer buffer = lbp1.requestBuffer();
                    buffers.add(buffer);
                    Assert.assertNotNull((Object)buffer);
                }
                OneShotLatch isRunning = new OneShotLatch();
                bufferRecycler = new Thread(() -> {
                    try {
                        isRunning.trigger();
                        Thread.sleep(100L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    for (Buffer buffer : buffers) {
                        buffer.recycleBuffer();
                    }
                });
                bufferRecycler.start();
                isRunning.await();
                memorySegments = networkBufferPool.requestMemorySegments();
                Assert.assertThat((Object)memorySegments, (Matcher)IsNot.not((Matcher)IsCollectionContaining.hasItem((Matcher)CoreMatchers.nullValue())));
                if (bufferRecycler == null) break block6;
            }
            catch (Throwable throwable) {
                if (bufferRecycler != null) {
                    bufferRecycler.join();
                }
                if (lbp1 != null) {
                    lbp1.lazyDestroy();
                }
                networkBufferPool.recycleMemorySegments(memorySegments);
                networkBufferPool.destroy();
                throw throwable;
            }
            bufferRecycler.join();
        }
        if (lbp1 != null) {
            lbp1.lazyDestroy();
        }
        networkBufferPool.recycleMemorySegments((Collection)memorySegments);
        networkBufferPool.destroy();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestMemorySegmentsExceptionDuringBufferRedistribution() throws IOException {
        int numBuffers = 3;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(3, 128, 2);
        ArrayList<Buffer> buffers = new ArrayList<Buffer>(3);
        List memorySegments = Collections.emptyList();
        BufferPool bufferPool = networkBufferPool.createBufferPool(1, 3, numBuffersToRecycle -> {
            throw new TestIOException();
        }, 0, Integer.MAX_VALUE);
        try {
            for (int i = 0; i < 2; ++i) {
                Buffer buffer = bufferPool.requestBuffer();
                buffers.add(buffer);
                Assert.assertNotNull((Object)buffer);
            }
            memorySegments = networkBufferPool.requestMemorySegments();
            Assert.fail((String)"Requesting memory segments should have thrown during buffer pool redistribution.");
        }
        catch (TestIOException e) {
            this.expectedException.expect(TestIOException.class);
            networkBufferPool.createBufferPool(2, 2);
        }
        finally {
            for (Buffer buffer : buffers) {
                buffer.recycleBuffer();
            }
            bufferPool.lazyDestroy();
            networkBufferPool.recycleMemorySegments((Collection)memorySegments);
            networkBufferPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCreateBufferPoolExceptionDuringBufferRedistribution() throws IOException {
        int numBuffers = 3;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(3, 128, 1);
        ArrayList<Buffer> buffers = new ArrayList<Buffer>(3);
        BufferPool bufferPool = networkBufferPool.createBufferPool(1, 3, numBuffersToRecycle -> {
            throw new TestIOException();
        }, 0, Integer.MAX_VALUE);
        try {
            for (int i = 0; i < 3; ++i) {
                Buffer buffer = bufferPool.requestBuffer();
                buffers.add(buffer);
                Assert.assertNotNull((Object)buffer);
            }
            try {
                networkBufferPool.createBufferPool(1, 3);
                Assert.fail((String)"Should have failed because the other buffer pool does not support memory release.");
            }
            catch (TestIOException testIOException) {
                // empty catch block
            }
            for (Buffer buffer : buffers) {
                buffer.recycleBuffer();
            }
            buffers.clear();
            bufferPool.lazyDestroy();
            bufferPool = networkBufferPool.createBufferPool(3, 3);
        }
        finally {
            for (Buffer buffer : buffers) {
                buffer.recycleBuffer();
            }
            bufferPool.lazyDestroy();
            networkBufferPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestMemorySegmentsInterruptable() throws Exception {
        int numBuffers = 10;
        final NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 10);
        MemorySegment segment = globalPool.requestMemorySegment();
        Assert.assertNotNull((Object)segment);
        final OneShotLatch isRunning = new OneShotLatch();
        CheckedThread asyncRequest = new CheckedThread(){

            public void go() throws Exception {
                isRunning.trigger();
                globalPool.requestMemorySegments();
            }
        };
        asyncRequest.start();
        isRunning.await();
        Thread.sleep(10L);
        globalPool.destroy();
        segment.free();
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("destroyed");
        try {
            asyncRequest.sync();
        }
        finally {
            globalPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestMemorySegmentsInterruptable2() throws Exception {
        int numBuffers = 10;
        final NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 10);
        MemorySegment segment = globalPool.requestMemorySegment();
        Assert.assertNotNull((Object)segment);
        final OneShotLatch isRunning = new OneShotLatch();
        CheckedThread asyncRequest = new CheckedThread(){

            public void go() throws Exception {
                isRunning.trigger();
                globalPool.requestMemorySegments();
            }
        };
        asyncRequest.start();
        isRunning.await();
        Thread.sleep(10L);
        asyncRequest.interrupt();
        globalPool.recycle(segment);
        try {
            asyncRequest.sync();
        }
        catch (IOException e) {
            Assert.assertThat((Object)e, (Matcher)Matchers.hasProperty((String)"cause", (Matcher)CoreMatchers.instanceOf(InterruptedException.class)));
            globalPool.createBufferPool(10, 10);
        }
        finally {
            globalPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestMemorySegmentsTimeout() throws Exception {
        int numBuffers = 10;
        int numberOfSegmentsToRequest = 2;
        Duration requestSegmentsTimeout = Duration.ofMillis(50L);
        final NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 2, requestSegmentsTimeout);
        BufferPool localBufferPool = globalPool.createBufferPool(0, 10);
        for (int i = 0; i < 10; ++i) {
            localBufferPool.requestBuffer();
        }
        Assert.assertEquals((long)0L, (long)globalPool.getNumberOfAvailableMemorySegments());
        CheckedThread asyncRequest = new CheckedThread(){

            public void go() throws Exception {
                globalPool.requestMemorySegments();
            }
        };
        asyncRequest.start();
        this.expectedException.expect(IOException.class);
        this.expectedException.expectMessage("Timeout");
        try {
            asyncRequest.sync();
        }
        finally {
            globalPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIsAvailableOrNotAfterRequestAndRecycleSingleSegment() throws Exception {
        int numBuffers = 2;
        NetworkBufferPool globalPool = new NetworkBufferPool(2, 128, 1);
        try {
            Assert.assertTrue((boolean)globalPool.getAvailableFuture().isDone());
            MemorySegment segment1 = (MemorySegment)Preconditions.checkNotNull((Object)globalPool.requestMemorySegment());
            Assert.assertTrue((boolean)globalPool.getAvailableFuture().isDone());
            MemorySegment segment2 = (MemorySegment)Preconditions.checkNotNull((Object)globalPool.requestMemorySegment());
            Assert.assertFalse((boolean)globalPool.getAvailableFuture().isDone());
            CompletableFuture availableFuture = globalPool.getAvailableFuture();
            globalPool.recycle(segment1);
            Assert.assertTrue((boolean)availableFuture.isDone());
            Assert.assertTrue((boolean)globalPool.getAvailableFuture().isDone());
            globalPool.recycle(segment2);
            Assert.assertTrue((boolean)globalPool.getAvailableFuture().isDone());
        }
        finally {
            globalPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments() throws Exception {
        int numberOfSegmentsToRequest = 5;
        int numBuffers = 10;
        final NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 5);
        try {
            Assert.assertTrue((boolean)globalPool.getAvailableFuture().isDone());
            List segments1 = globalPool.requestMemorySegments();
            Assert.assertTrue((boolean)globalPool.getAvailableFuture().isDone());
            Assert.assertEquals((long)5L, (long)segments1.size());
            List segments2 = globalPool.requestMemorySegments();
            Assert.assertFalse((boolean)globalPool.getAvailableFuture().isDone());
            Assert.assertEquals((long)5L, (long)segments2.size());
            final CountDownLatch latch = new CountDownLatch(1);
            final ArrayList segments3 = new ArrayList(5);
            CheckedThread asyncRequest = new CheckedThread(){

                public void go() throws Exception {
                    segments3.addAll(globalPool.requestMemorySegments());
                    latch.countDown();
                }
            };
            asyncRequest.start();
            CompletableFuture availableFuture = globalPool.getAvailableFuture();
            globalPool.recycleMemorySegments((Collection)segments1);
            Assert.assertTrue((boolean)availableFuture.isDone());
            latch.await();
            Assert.assertFalse((boolean)globalPool.getAvailableFuture().isDone());
            Assert.assertEquals((long)5L, (long)segments3.size());
            globalPool.recycleMemorySegments((Collection)segments2);
            Assert.assertTrue((boolean)globalPool.getAvailableFuture().isDone());
            globalPool.recycleMemorySegments(segments3);
            Assert.assertTrue((boolean)globalPool.getAvailableFuture().isDone());
        }
        finally {
            globalPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testBlockingRequestFromMultiLocalBufferPool() throws Exception {
        int localPoolRequiredSize = 5;
        int localPoolMaxSize = 10;
        int numLocalBufferPool = 2;
        int numberOfSegmentsToRequest = 10;
        int numBuffers = 20;
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        NetworkBufferPool globalPool = new NetworkBufferPool(20, 128, 10);
        ArrayList<BufferPool> localBufferPools = new ArrayList<BufferPool>(2);
        try {
            for (int i = 0; i < 2; ++i) {
                BufferPool localPool = globalPool.createBufferPool(5, 10);
                localBufferPools.add(localPool);
                Assert.assertTrue((boolean)localPool.getAvailableFuture().isDone());
            }
            ArrayList<MemorySegment> segments = new ArrayList<MemorySegment>(9);
            for (int i = 0; i < 9; ++i) {
                segments.add(globalPool.requestMemorySegment());
            }
            List exclusiveSegments = globalPool.requestMemorySegments();
            Assert.assertTrue((boolean)globalPool.getAvailableFuture().isDone());
            for (BufferPool localPool : localBufferPools) {
                Assert.assertTrue((boolean)localPool.getAvailableFuture().isDone());
            }
            CountDownLatch latch = new CountDownLatch(2);
            ArrayBlockingQueue segmentsRequested = new ArrayBlockingQueue(20);
            AtomicReference cause = new AtomicReference();
            for (BufferPool localPool : localBufferPools) {
                executorService.submit(() -> {
                    try {
                        for (int num = 10; num > 0; --num) {
                            segmentsRequested.add(localPool.requestBufferBuilderBlocking());
                        }
                    }
                    catch (Exception e) {
                        cause.set(e);
                    }
                    finally {
                        latch.countDown();
                    }
                });
            }
            while (globalPool.getNumberOfAvailableMemorySegments() > 0) {
                Thread.sleep(100L);
                Assert.assertNull(cause.get());
            }
            CompletableFuture globalPoolAvailableFuture = globalPool.getAvailableFuture();
            Assert.assertFalse((boolean)globalPoolAvailableFuture.isDone());
            ArrayList<CompletableFuture> localPoolAvailableFutures = new ArrayList<CompletableFuture>(2);
            for (BufferPool localPool : localBufferPools) {
                CompletableFuture localPoolAvailableFuture = localPool.getAvailableFuture();
                localPoolAvailableFutures.add(localPoolAvailableFuture);
                Assert.assertFalse((boolean)localPoolAvailableFuture.isDone());
            }
            for (MemorySegment segment : segments) {
                globalPool.recycle(segment);
            }
            globalPool.recycleMemorySegments((Collection)exclusiveSegments);
            Assert.assertTrue((boolean)globalPoolAvailableFuture.isDone());
            for (CompletableFuture localPoolAvailableFuture : localPoolAvailableFutures) {
                Assert.assertTrue((boolean)localPoolAvailableFuture.isDone());
            }
            latch.await();
            Assert.assertNull(cause.get());
            Assert.assertEquals((long)0L, (long)globalPool.getNumberOfAvailableMemorySegments());
            Assert.assertFalse((boolean)globalPool.getAvailableFuture().isDone());
            for (BufferPool localPool : localBufferPools) {
                Assert.assertFalse((boolean)localPool.getAvailableFuture().isDone());
                Assert.assertEquals((long)10L, (long)localPool.bestEffortGetNumOfUsedBuffers());
            }
            for (BufferBuilder bufferBuilder : segmentsRequested) {
                bufferBuilder.createBufferConsumer().close();
            }
        }
        finally {
            for (BufferPool bufferPool : localBufferPools) {
                bufferPool.lazyDestroy();
            }
            executorService.shutdown();
            globalPool.destroy();
        }
    }

    private static final class TestIOException
    extends IOException {
        private static final long serialVersionUID = -814705441998024472L;

        private TestIOException() {
        }
    }
}

