/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler;
import org.apache.flink.runtime.io.network.netty.NettyClient;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.util.NetUtils;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class NettyPartitionRequestClientTest {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetriggerPartitionRequest() throws Exception {
        long deadline = System.currentTimeMillis() + 30000L;
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        NettyPartitionRequestClient client = this.createPartitionRequestClient((Channel)channel, (NetworkClientHandler)handler);
        int numExclusiveBuffers = 2;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = InputChannelBuilder.newBuilder().setConnectionManager(InputChannelTestUtils.mockConnectionManagerWithPartitionRequestClient((PartitionRequestClient)client)).setInitialBackoff(1).setMaxBackoff(2).buildRemoteChannel(inputGate);
        try {
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            inputGate.assignExclusiveSegments();
            inputChannel.requestSubpartition(0);
            Assert.assertTrue((boolean)channel.isWritable());
            Object readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals((Object)inputChannel.getInputChannelId(), (Object)((NettyMessage.PartitionRequest)readFromOutbound).receiverId);
            Assert.assertEquals((long)2L, (long)((NettyMessage.PartitionRequest)readFromOutbound).credit);
            inputGate.retriggerPartitionRequest(inputChannel.getPartitionId().getPartitionId());
            this.runAllScheduledPendingTasks(channel, deadline);
            readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals((Object)inputChannel.getInputChannelId(), (Object)((NettyMessage.PartitionRequest)readFromOutbound).receiverId);
            Assert.assertEquals((long)2L, (long)((NettyMessage.PartitionRequest)readFromOutbound).credit);
            inputGate.retriggerPartitionRequest(inputChannel.getPartitionId().getPartitionId());
            this.runAllScheduledPendingTasks(channel, deadline);
            readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals((Object)inputChannel.getInputChannelId(), (Object)((NettyMessage.PartitionRequest)readFromOutbound).receiverId);
            Assert.assertEquals((long)2L, (long)((NettyMessage.PartitionRequest)readFromOutbound).credit);
            Assert.assertNull((Object)channel.readOutbound());
        }
        finally {
            inputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDoublePartitionRequest() throws Exception {
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        NettyPartitionRequestClient client = this.createPartitionRequestClient((Channel)channel, (NetworkClientHandler)handler);
        int numExclusiveBuffers = 2;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, (PartitionRequestClient)client);
        try {
            inputGate.setInputChannels(new InputChannel[]{inputChannel});
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            inputGate.assignExclusiveSegments();
            inputChannel.requestSubpartition(0);
            Assert.assertTrue((boolean)channel.isWritable());
            Object readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals((Object)inputChannel.getInputChannelId(), (Object)((NettyMessage.PartitionRequest)readFromOutbound).receiverId);
            Assert.assertEquals((long)2L, (long)((NettyMessage.PartitionRequest)readFromOutbound).credit);
            Assert.assertNull((Object)channel.readOutbound());
        }
        finally {
            inputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testResumeConsumption() throws Exception {
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        NettyPartitionRequestClient client = this.createPartitionRequestClient((Channel)channel, (NetworkClientHandler)handler);
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, (PartitionRequestClient)client);
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(6, 6);
            inputGate.setBufferPool(bufferPool);
            inputGate.assignExclusiveSegments();
            inputChannel.requestSubpartition(0);
            inputChannel.resumeConsumption();
            channel.runPendingTasks();
            Object readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            readFromOutbound = channel.readOutbound();
            Assert.assertThat((Object)readFromOutbound, (Matcher)Matchers.instanceOf(NettyMessage.ResumeConsumption.class));
            Assert.assertEquals((Object)inputChannel.getInputChannelId(), (Object)((NettyMessage.ResumeConsumption)readFromOutbound).receiverId);
            Assert.assertNull((Object)channel.readOutbound());
        }
        finally {
            inputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    private NettyPartitionRequestClient createPartitionRequestClient(Channel tcpChannel, NetworkClientHandler clientHandler) throws Exception {
        int port = NetUtils.getAvailablePort();
        ConnectionID connectionID = new ConnectionID(new InetSocketAddress("localhost", port), 0);
        NettyConfig config = new NettyConfig(InetAddress.getLocalHost(), port, 1024, 1, new Configuration());
        NettyClient nettyClient = new NettyClient(config);
        PartitionRequestClientFactory partitionRequestClientFactory = new PartitionRequestClientFactory(nettyClient);
        return new NettyPartitionRequestClient(tcpChannel, clientHandler, connectionID, partitionRequestClientFactory);
    }

    void runAllScheduledPendingTasks(EmbeddedChannel channel, long deadline) throws InterruptedException {
        while (channel.runScheduledPendingTasks() != -1L && System.currentTimeMillis() < deadline) {
            Thread.sleep(1L);
        }
    }
}

