package org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal;

import java.net.URI;
import org.apache.flink.kinesis.shaded.io.netty.buffer.ByteBufAllocator;
import org.apache.flink.kinesis.shaded.io.netty.channel.Channel;
import org.apache.flink.kinesis.shaded.io.netty.channel.ChannelHandler;
import org.apache.flink.kinesis.shaded.io.netty.channel.EventLoop;
import org.apache.flink.kinesis.shaded.io.netty.channel.pool.ChannelPool;
import org.apache.flink.kinesis.shaded.io.netty.channel.pool.ChannelPoolHandler;
import org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslContext;
import org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler;
import org.apache.flink.kinesis.shaded.io.netty.util.AttributeKey;
import org.apache.flink.kinesis.shaded.io.netty.util.concurrent.Future;
import org.apache.flink.kinesis.shaded.io.netty.util.concurrent.Promise;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.annotations.SdkInternalApi;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.annotations.SdkTestInternalApi;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.Logger;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.StringUtils;

@SdkInternalApi
/* loaded from: input_file:org/apache/flink/kinesis/shaded/software/amazon/awssdk/http/nio/netty/internal/Http1TunnelConnectionPool.class */
public class Http1TunnelConnectionPool implements ChannelPool {
    static final AttributeKey<Boolean> TUNNEL_ESTABLISHED_KEY = NettyUtils.getOrCreateAttributeKey("aws.http.nio.netty.async.Http1TunnelConnectionPool.tunnelEstablished");
    private static final Logger log = Logger.loggerFor((Class<?>) Http1TunnelConnectionPool.class);
    private final EventLoop eventLoop;
    private final ChannelPool delegate;
    private final SslContext sslContext;
    private final URI proxyAddress;
    private final String proxyUser;
    private final String proxyPassword;
    private final URI remoteAddress;
    private final ChannelPoolHandler handler;
    private final InitHandlerSupplier initHandlerSupplier;

    /* JADX INFO: Access modifiers changed from: package-private */
    @SdkTestInternalApi
    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/kinesis/shaded/software/amazon/awssdk/http/nio/netty/internal/Http1TunnelConnectionPool$InitHandlerSupplier.class */
    public interface InitHandlerSupplier {
        ChannelHandler newInitHandler(ChannelPool channelPool, String str, String str2, URI uri, Promise<Channel> promise);
    }

    public Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool channelPool, SslContext sslContext, URI uri, String str, String str2, URI uri2, ChannelPoolHandler channelPoolHandler) {
        this(eventLoop, channelPool, sslContext, uri, str, str2, uri2, channelPoolHandler, ProxyTunnelInitHandler::new);
    }

    public Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool channelPool, SslContext sslContext, URI uri, URI uri2, ChannelPoolHandler channelPoolHandler) {
        this(eventLoop, channelPool, sslContext, uri, null, null, uri2, channelPoolHandler, ProxyTunnelInitHandler::new);
    }

    @SdkTestInternalApi
    Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool channelPool, SslContext sslContext, URI uri, String str, String str2, URI uri2, ChannelPoolHandler channelPoolHandler, InitHandlerSupplier initHandlerSupplier) {
        this.eventLoop = eventLoop;
        this.delegate = channelPool;
        this.sslContext = sslContext;
        this.proxyAddress = uri;
        this.proxyUser = str;
        this.proxyPassword = str2;
        this.remoteAddress = uri2;
        this.handler = channelPoolHandler;
        this.initHandlerSupplier = initHandlerSupplier;
    }

    @Override // org.apache.flink.kinesis.shaded.io.netty.channel.pool.ChannelPool
    public Future<Channel> acquire() {
        return acquire(this.eventLoop.newPromise());
    }

    @Override // org.apache.flink.kinesis.shaded.io.netty.channel.pool.ChannelPool
    public Future<Channel> acquire(Promise<Channel> promise) {
        this.delegate.acquire(this.eventLoop.newPromise()).addListener2(future -> {
            if (future.isSuccess()) {
                setupChannel((Channel) future.getNow(), promise);
            } else {
                promise.setFailure(future.cause());
            }
        });
        return promise;
    }

    @Override // org.apache.flink.kinesis.shaded.io.netty.channel.pool.ChannelPool
    public Future<Void> release(Channel channel) {
        return release(channel, this.eventLoop.newPromise());
    }

    @Override // org.apache.flink.kinesis.shaded.io.netty.channel.pool.ChannelPool
    public Future<Void> release(Channel channel, Promise<Void> promise) {
        return this.delegate.release(channel, promise);
    }

    @Override // org.apache.flink.kinesis.shaded.io.netty.channel.pool.ChannelPool, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.delegate.close();
    }

    private void setupChannel(Channel channel, Promise<Channel> promise) {
        if (isTunnelEstablished(channel)) {
            log.debug(() -> {
                return String.format("Tunnel already established for %s", channel.id().asShortText());
            });
            promise.setSuccess(channel);
            return;
        }
        log.debug(() -> {
            return String.format("Tunnel not yet established for channel %s. Establishing tunnel now.", channel.id().asShortText());
        });
        Promise<Channel> newPromise = this.eventLoop.newPromise();
        SslHandler createSslHandlerIfNeeded = createSslHandlerIfNeeded(channel.alloc());
        if (createSslHandlerIfNeeded != null) {
            channel.pipeline().addLast(createSslHandlerIfNeeded);
        }
        channel.pipeline().addLast(this.initHandlerSupplier.newInitHandler(this.delegate, this.proxyUser, this.proxyPassword, this.remoteAddress, newPromise));
        newPromise.addListener2(future -> {
            if (future.isSuccess()) {
                Channel channel2 = (Channel) future.getNow();
                this.handler.channelCreated(channel2);
                channel2.attr(TUNNEL_ESTABLISHED_KEY).set(true);
                promise.setSuccess(channel2);
                return;
            }
            channel.close();
            this.delegate.release(channel);
            Throwable cause = future.cause();
            log.error(() -> {
                return String.format("Unable to establish tunnel for channel %s", channel.id().asShortText());
            }, cause);
            promise.setFailure(cause);
        });
    }

    private SslHandler createSslHandlerIfNeeded(ByteBufAllocator byteBufAllocator) {
        if (this.sslContext != null && "https".equals(StringUtils.lowerCase(this.proxyAddress.getScheme()))) {
            return NettyUtils.newSslHandler(this.sslContext, byteBufAllocator, this.proxyAddress.getHost(), this.proxyAddress.getPort());
        }
        return null;
    }

    private static boolean isTunnelEstablished(Channel channel) {
        return Boolean.TRUE.equals((Boolean) channel.attr(TUNNEL_ESTABLISHED_KEY).get());
    }
}
