/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.net.InetAddress;
import java.util.List;
import javax.net.ssl.SSLSessionContext;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyClient;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyProtocol;
import org.apache.flink.runtime.io.network.netty.NettyServer;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
import org.apache.flink.runtime.net.SSLUtilsTest;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringEncoder;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class NettyClientServerSslTest
extends TestLogger {
    @Parameterized.Parameter
    public String sslProvider;

    @Parameterized.Parameters(name="SSL provider = {0}")
    public static List<String> parameters() {
        return SSLUtilsTest.AVAILABLE_SSL_PROVIDERS;
    }

    @Test
    public void testValidSslConnection() throws Exception {
        this.testValidSslConnection(this.createSslConfig());
    }

    @Test
    public void testValidSslConnectionAdvanced() throws Exception {
        Configuration sslConfig = this.createSslConfig();
        sslConfig.setInteger(SecurityOptions.SSL_INTERNAL_SESSION_CACHE_SIZE, 1);
        sslConfig.setInteger(SecurityOptions.SSL_INTERNAL_SESSION_TIMEOUT, 1000);
        sslConfig.setInteger(SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT, 1000);
        sslConfig.setInteger(SecurityOptions.SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT, 1000);
        this.testValidSslConnection(sslConfig);
    }

    private void testValidSslConnection(Configuration sslConfig) throws Exception {
        OneShotLatch serverChannelInitComplete = new OneShotLatch();
        SslHandler[] serverSslHandler = new SslHandler[1];
        NoOpProtocol protocol = new NoOpProtocol();
        NettyConfig nettyConfig = NettyClientServerSslTest.createNettyConfig(sslConfig);
        NettyBufferPool bufferPool = new NettyBufferPool(1);
        NettyServer server = NettyTestUtil.initServer(nettyConfig, bufferPool, sslHandlerFactory -> new TestingServerChannelInitializer(protocol, (SSLHandlerFactory)sslHandlerFactory, serverChannelInitComplete, serverSslHandler));
        NettyClient client = NettyTestUtil.initClient(nettyConfig, protocol, bufferPool);
        NettyTestUtil.NettyServerAndClient serverAndClient = new NettyTestUtil.NettyServerAndClient(server, client);
        Channel ch = NettyTestUtil.connect(serverAndClient);
        SslHandler clientSslHandler = (SslHandler)ch.pipeline().get("ssl");
        NettyClientServerSslTest.assertEqualsOrDefault(sslConfig, (ConfigOption<Integer>)SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT, clientSslHandler.getHandshakeTimeoutMillis());
        NettyClientServerSslTest.assertEqualsOrDefault(sslConfig, (ConfigOption<Integer>)SecurityOptions.SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT, clientSslHandler.getCloseNotifyFlushTimeoutMillis());
        ch.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        ch.writeAndFlush((Object)"test").sync();
        serverChannelInitComplete.await();
        Assert.assertNotNull((Object)serverSslHandler[0]);
        NettyClientServerSslTest.assertEqualsOrDefault(sslConfig, (ConfigOption<Integer>)SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT, serverSslHandler[0].getHandshakeTimeoutMillis());
        NettyClientServerSslTest.assertEqualsOrDefault(sslConfig, (ConfigOption<Integer>)SecurityOptions.SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT, serverSslHandler[0].getCloseNotifyFlushTimeoutMillis());
        SSLSessionContext sessionContext = serverSslHandler[0].engine().getSession().getSessionContext();
        Assert.assertNotNull((String)"bug in unit test setup: session context not available", (Object)sessionContext);
        NettyClientServerSslTest.assertEqualsOrDefault(sslConfig, (ConfigOption<Integer>)SecurityOptions.SSL_INTERNAL_SESSION_CACHE_SIZE, sessionContext.getSessionCacheSize());
        int sessionTimeout = sslConfig.getInteger(SecurityOptions.SSL_INTERNAL_SESSION_TIMEOUT);
        if (sessionTimeout != -1) {
            Assert.assertEquals((long)(sessionTimeout / 1000), (long)sessionContext.getSessionTimeout());
        } else {
            Assert.assertTrue((String)"default value (-1) should not be propagated", (sessionContext.getSessionTimeout() >= 0 ? 1 : 0) != 0);
        }
        NettyTestUtil.shutdown(serverAndClient);
    }

    private static void assertEqualsOrDefault(Configuration sslConfig, ConfigOption<Integer> option, long actual) {
        long expected = sslConfig.getInteger(option);
        if (expected != (long)((Integer)option.defaultValue()).intValue()) {
            Assert.assertEquals((long)expected, (long)actual);
        } else {
            Assert.assertTrue((String)("default value (" + option.defaultValue() + ") should not be propagated"), (actual >= 0L ? 1 : 0) != 0);
        }
    }

    @Test
    public void testInvalidSslConfiguration() throws Exception {
        NoOpProtocol protocol = new NoOpProtocol();
        Configuration config = this.createSslConfig();
        config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "invalidpassword");
        NettyConfig nettyConfig = NettyClientServerSslTest.createNettyConfig(config);
        NettyTestUtil.NettyServerAndClient serverAndClient = null;
        try {
            serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
            Assert.fail((String)"Created server and client from invalid configuration");
        }
        catch (Exception exception) {
            // empty catch block
        }
        NettyTestUtil.shutdown(serverAndClient);
    }

    @Test
    public void testSslHandshakeError() throws Exception {
        NoOpProtocol protocol = new NoOpProtocol();
        Configuration config = this.createSslConfig();
        config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore");
        NettyConfig nettyConfig = NettyClientServerSslTest.createNettyConfig(config);
        NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
        Channel ch = NettyTestUtil.connect(serverAndClient);
        ch.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        Assert.assertFalse((boolean)ch.writeAndFlush((Object)"test").await().isSuccess());
        NettyTestUtil.shutdown(serverAndClient);
    }

    @Test
    public void testClientUntrustedCertificate() throws Exception {
        Configuration serverConfig = this.createSslConfig();
        Configuration clientConfig = this.createSslConfig();
        clientConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore");
        NettyConfig nettyServerConfig = NettyClientServerSslTest.createNettyConfig(serverConfig);
        NettyConfig nettyClientConfig = NettyClientServerSslTest.createNettyConfig(clientConfig);
        NettyBufferPool bufferPool = new NettyBufferPool(1);
        NoOpProtocol protocol = new NoOpProtocol();
        NettyServer server = NettyTestUtil.initServer(nettyServerConfig, protocol, bufferPool);
        NettyClient client = NettyTestUtil.initClient(nettyClientConfig, protocol, bufferPool);
        NettyTestUtil.NettyServerAndClient serverAndClient = new NettyTestUtil.NettyServerAndClient(server, client);
        Channel ch = NettyTestUtil.connect(serverAndClient);
        ch.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        Assert.assertFalse((boolean)ch.writeAndFlush((Object)"test").await().isSuccess());
        NettyTestUtil.shutdown(serverAndClient);
    }

    @Test
    public void testSslPinningForValidFingerprint() throws Exception {
        NoOpProtocol protocol = new NoOpProtocol();
        Configuration config = this.createSslConfig();
        config.setString(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT, SSLUtilsTest.getCertificateFingerprint(config, "flink.test"));
        NettyConfig nettyConfig = NettyClientServerSslTest.createNettyConfig(config);
        NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
        Channel ch = NettyTestUtil.connect(serverAndClient);
        ch.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        Assert.assertTrue((boolean)ch.writeAndFlush((Object)"test").await().isSuccess());
        NettyTestUtil.shutdown(serverAndClient);
    }

    @Test
    public void testSslPinningForInvalidFingerprint() throws Exception {
        NoOpProtocol protocol = new NoOpProtocol();
        Configuration config = this.createSslConfig();
        config.setString(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT, SSLUtilsTest.getCertificateFingerprint(config, "flink.test").replaceAll("[0-9A-Z]", "0"));
        NettyConfig nettyConfig = NettyClientServerSslTest.createNettyConfig(config);
        NettyTestUtil.NettyServerAndClient serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
        Channel ch = NettyTestUtil.connect(serverAndClient);
        ch.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        Assert.assertFalse((boolean)ch.writeAndFlush((Object)"test").await().isSuccess());
        NettyTestUtil.shutdown(serverAndClient);
    }

    private Configuration createSslConfig() {
        return SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores(this.sslProvider);
    }

    private static NettyConfig createNettyConfig(Configuration config) {
        return new NettyConfig(InetAddress.getLoopbackAddress(), NetUtils.getAvailablePort(), 1024, 1, config);
    }

    private static class TestingServerChannelInitializer
    extends NettyServer.ServerChannelInitializer {
        private final OneShotLatch latch;
        private final SslHandler[] serverHandler;

        TestingServerChannelInitializer(NettyProtocol protocol, SSLHandlerFactory sslHandlerFactory, OneShotLatch latch, SslHandler[] serverHandler) {
            super(protocol, sslHandlerFactory);
            this.latch = latch;
            this.serverHandler = serverHandler;
        }

        public void initChannel(SocketChannel channel) throws Exception {
            super.initChannel(channel);
            SslHandler sslHandler = (SslHandler)channel.pipeline().get("ssl");
            Assert.assertNotNull((Object)sslHandler);
            this.serverHandler[0] = sslHandler;
            this.latch.trigger();
        }
    }

    private static final class NoOpProtocol
    extends NettyProtocol {
        NoOpProtocol() {
            super(null, null);
        }

        public ChannelHandler[] getServerChannelHandlers() {
            return new ChannelHandler[0];
        }

        public ChannelHandler[] getClientChannelHandlers() {
            return new ChannelHandler[0];
        }
    }
}

