package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.net.InetAddress;
import java.time.Duration;
import java.util.List;
import java.util.function.Function;
import javax.net.ssl.SSLSessionContext;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.io.network.netty.NettyServer;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.runtime.net.SSLUtilsTest;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringEncoder;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.NetUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.class */
class NettyClientServerSslTest {

    @Parameter
    private String sslProvider;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest$TestingServerChannelInitializer.class */
    public static class TestingServerChannelInitializer extends NettyServer.ServerChannelInitializer {
        private final OneShotLatch latch;
        private final SslHandler[] serverHandler;

        TestingServerChannelInitializer(NettyProtocol nettyProtocol, SSLHandlerFactory sSLHandlerFactory, OneShotLatch oneShotLatch, SslHandler[] sslHandlerArr) {
            super(nettyProtocol, sSLHandlerFactory);
            this.latch = oneShotLatch;
            this.serverHandler = sslHandlerArr;
        }

        public void initChannel(SocketChannel socketChannel) throws Exception {
            super.initChannel(socketChannel);
            SslHandler sslHandler = socketChannel.pipeline().get("ssl");
            Assertions.assertThat(sslHandler).isNotNull();
            this.serverHandler[0] = sslHandler;
            this.latch.trigger();
        }
    }

    NettyClientServerSslTest() {
    }

    @Parameters(name = "SSL provider = {0}")
    public static List<String> parameters() {
        return SSLUtilsTest.AVAILABLE_SSL_PROVIDERS;
    }

    @TestTemplate
    void testValidSslConnection() throws Exception {
        testValidSslConnection(createSslConfig());
    }

    @TestTemplate
    void testValidSslConnectionAdvanced() throws Exception {
        Configuration createSslConfig = createSslConfig();
        createSslConfig.set(SecurityOptions.SSL_INTERNAL_SESSION_CACHE_SIZE, 1);
        int millis = (int) Duration.ofHours(1L).toMillis();
        createSslConfig.set(SecurityOptions.SSL_INTERNAL_SESSION_TIMEOUT, Integer.valueOf(millis + 1));
        createSslConfig.set(SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT, Integer.valueOf(millis + 2));
        createSslConfig.set(SecurityOptions.SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT, Integer.valueOf(millis + 3));
        testValidSslConnection(createSslConfig);
    }

    private void testValidSslConnection(Configuration configuration) throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        SslHandler[] sslHandlerArr = new SslHandler[1];
        NettyTestUtil.NoOpProtocol noOpProtocol = new NettyTestUtil.NoOpProtocol();
        NetUtils.Port availablePort = NetUtils.getAvailablePort();
        try {
            NettyConfig createNettyConfig = createNettyConfig(configuration, availablePort);
            NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
            NettyTestUtil.NettyServerAndClient nettyServerAndClient = new NettyTestUtil.NettyServerAndClient(NettyTestUtil.initServer(createNettyConfig, nettyBufferPool, (Function<SSLHandlerFactory, NettyServer.ServerChannelInitializer>) sSLHandlerFactory -> {
                return new TestingServerChannelInitializer(noOpProtocol, sSLHandlerFactory, oneShotLatch, sslHandlerArr);
            }), NettyTestUtil.initClient(createNettyConfig, noOpProtocol, nettyBufferPool));
            if (availablePort != null) {
                availablePort.close();
            }
            Assertions.assertThat(nettyServerAndClient).withFailMessage("serverAndClient is null due to fail to get a free port", new Object[0]).isNotNull();
            Channel connect = NettyTestUtil.connect(nettyServerAndClient);
            SslHandler sslHandler = connect.pipeline().get("ssl");
            assertEqualsOrDefault(configuration, SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT, sslHandler.getHandshakeTimeoutMillis());
            assertEqualsOrDefault(configuration, SecurityOptions.SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT, sslHandler.getCloseNotifyFlushTimeoutMillis());
            connect.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
            connect.writeAndFlush("test").sync();
            oneShotLatch.await();
            Assertions.assertThat(sslHandlerArr[0]).isNotNull();
            assertEqualsOrDefault(configuration, SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT, sslHandlerArr[0].getHandshakeTimeoutMillis());
            assertEqualsOrDefault(configuration, SecurityOptions.SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT, sslHandlerArr[0].getCloseNotifyFlushTimeoutMillis());
            SSLSessionContext sessionContext = sslHandlerArr[0].engine().getSession().getSessionContext();
            Assertions.assertThat(sessionContext).withFailMessage("bug in unit test setup: session context not available", new Object[0]).isNotNull();
            assertEqualsOrDefault(configuration, SecurityOptions.SSL_INTERNAL_SESSION_CACHE_SIZE, sessionContext.getSessionCacheSize());
            int intValue = ((Integer) configuration.get(SecurityOptions.SSL_INTERNAL_SESSION_TIMEOUT)).intValue();
            if (intValue != -1) {
                Assertions.assertThat(sessionContext.getSessionTimeout()).isEqualTo(intValue / HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS);
            } else {
                Assertions.assertThat(sessionContext.getSessionTimeout()).withFailMessage("default value (-1) should not be propagated", new Object[0]).isGreaterThanOrEqualTo(0);
            }
            NettyTestUtil.shutdown(nettyServerAndClient);
        } catch (Throwable th) {
            if (availablePort != null) {
                try {
                    availablePort.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static void assertEqualsOrDefault(Configuration configuration, ConfigOption<Integer> configOption, long j) {
        long intValue = ((Integer) configuration.get(configOption)).intValue();
        if (intValue != ((Integer) configOption.defaultValue()).intValue()) {
            Assertions.assertThat(j).isEqualTo(intValue);
        } else {
            Assertions.assertThat(j).withFailMessage("default value (%d) should not be propagated", new Object[]{configOption.defaultValue()}).isGreaterThanOrEqualTo(0L);
        }
    }

    @TestTemplate
    public void testInvalidSslConfiguration() throws Exception {
        NettyTestUtil.NoOpProtocol noOpProtocol = new NettyTestUtil.NoOpProtocol();
        Configuration createSslConfig = createSslConfig();
        createSslConfig.set(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "invalidpassword");
        NetUtils.Port availablePort = NetUtils.getAvailablePort();
        try {
            NettyConfig createNettyConfig = createNettyConfig(createSslConfig, availablePort);
            Assertions.assertThatThrownBy(() -> {
                NettyTestUtil.initServerAndClient(noOpProtocol, createNettyConfig);
            }).withFailMessage("Created server and client from invalid configuration", new Object[0]).isInstanceOf(IOException.class);
            if (availablePort != null) {
                availablePort.close();
            }
        } catch (Throwable th) {
            if (availablePort != null) {
                try {
                    availablePort.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testSslHandshakeError() throws Exception {
        NettyTestUtil.NoOpProtocol noOpProtocol = new NettyTestUtil.NoOpProtocol();
        Configuration createSslConfig = createSslConfig();
        createSslConfig.set(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore");
        NetUtils.Port availablePort = NetUtils.getAvailablePort();
        try {
            NettyTestUtil.NettyServerAndClient initServerAndClient = NettyTestUtil.initServerAndClient(noOpProtocol, createNettyConfig(createSslConfig, availablePort));
            if (availablePort != null) {
                availablePort.close();
            }
            Assertions.assertThat(initServerAndClient).withFailMessage("serverAndClient is null due to fail to get a free port", new Object[0]).isNotNull();
            Channel connect = NettyTestUtil.connect(initServerAndClient);
            connect.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
            Assertions.assertThat(connect.writeAndFlush("test").await().isSuccess()).isFalse();
            NettyTestUtil.shutdown(initServerAndClient);
        } catch (Throwable th) {
            if (availablePort != null) {
                try {
                    availablePort.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testClientUntrustedCertificate() throws Exception {
        Configuration createSslConfig = createSslConfig();
        Configuration createSslConfig2 = createSslConfig();
        createSslConfig2.set(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore");
        NetUtils.Port availablePort = NetUtils.getAvailablePort();
        try {
            NetUtils.Port availablePort2 = NetUtils.getAvailablePort();
            try {
                NettyConfig createNettyConfig = createNettyConfig(createSslConfig, availablePort);
                NettyConfig createNettyConfig2 = createNettyConfig(createSslConfig2, availablePort2);
                NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
                NettyTestUtil.NoOpProtocol noOpProtocol = new NettyTestUtil.NoOpProtocol();
                NettyTestUtil.NettyServerAndClient nettyServerAndClient = new NettyTestUtil.NettyServerAndClient(NettyTestUtil.initServer(createNettyConfig, noOpProtocol, nettyBufferPool), NettyTestUtil.initClient(createNettyConfig2, noOpProtocol, nettyBufferPool));
                if (availablePort2 != null) {
                    availablePort2.close();
                }
                if (availablePort != null) {
                    availablePort.close();
                }
                Assertions.assertThat(nettyServerAndClient).withFailMessage("serverAndClient is null due to fail to get a free port", new Object[0]).isNotNull();
                Channel connect = NettyTestUtil.connect(nettyServerAndClient);
                connect.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
                Assertions.assertThat(connect.writeAndFlush("test").await().isSuccess()).isFalse();
                NettyTestUtil.shutdown(nettyServerAndClient);
            } catch (Throwable th) {
                if (availablePort2 != null) {
                    try {
                        availablePort2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (availablePort != null) {
                try {
                    availablePort.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @TestTemplate
    void testSslPinningForValidFingerprint() throws Exception {
        NettyTestUtil.NoOpProtocol noOpProtocol = new NettyTestUtil.NoOpProtocol();
        Configuration createSslConfig = createSslConfig();
        createSslConfig.set(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT, SSLUtilsTest.getCertificateFingerprint(createSslConfig, "flink.test"));
        NetUtils.Port availablePort = NetUtils.getAvailablePort();
        try {
            NettyTestUtil.NettyServerAndClient initServerAndClient = NettyTestUtil.initServerAndClient(noOpProtocol, createNettyConfig(createSslConfig, availablePort));
            if (availablePort != null) {
                availablePort.close();
            }
            Assertions.assertThat(initServerAndClient).withFailMessage("serverAndClient is null due to fail to get a free port", new Object[0]).isNotNull();
            Channel connect = NettyTestUtil.connect(initServerAndClient);
            connect.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
            Assertions.assertThat(connect.writeAndFlush("test").await().isSuccess()).isTrue();
            NettyTestUtil.shutdown(initServerAndClient);
        } catch (Throwable th) {
            if (availablePort != null) {
                try {
                    availablePort.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @TestTemplate
    void testSslPinningForInvalidFingerprint() throws Exception {
        NettyTestUtil.NoOpProtocol noOpProtocol = new NettyTestUtil.NoOpProtocol();
        Configuration createSslConfig = createSslConfig();
        createSslConfig.set(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT, SSLUtilsTest.getCertificateFingerprint(createSslConfig, "flink.test").replaceAll("[0-9A-Z]", "0"));
        NetUtils.Port availablePort = NetUtils.getAvailablePort();
        try {
            NettyTestUtil.NettyServerAndClient initServerAndClient = NettyTestUtil.initServerAndClient(noOpProtocol, createNettyConfig(createSslConfig, availablePort));
            if (availablePort != null) {
                availablePort.close();
            }
            Assertions.assertThat(initServerAndClient).withFailMessage("serverAndClient is null due to fail to get a free port", new Object[0]).isNotNull();
            Channel connect = NettyTestUtil.connect(initServerAndClient);
            connect.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
            Assertions.assertThat(connect.writeAndFlush("test").await().isSuccess()).isFalse();
            NettyTestUtil.shutdown(initServerAndClient);
        } catch (Throwable th) {
            if (availablePort != null) {
                try {
                    availablePort.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Configuration createSslConfig() {
        return SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores(this.sslProvider);
    }

    private static NettyConfig createNettyConfig(Configuration configuration, NetUtils.Port port) {
        return new NettyConfig(InetAddress.getLoopbackAddress(), port.getPort(), HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 1, configuration);
    }
}
