package org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.flink.kinesis.shaded.io.netty.channel.Channel;
import org.apache.flink.kinesis.shaded.io.netty.channel.ChannelDuplexHandler;
import org.apache.flink.kinesis.shaded.io.netty.channel.ChannelHandler;
import org.apache.flink.kinesis.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.kinesis.shaded.io.netty.channel.EventLoop;
import org.apache.flink.kinesis.shaded.io.netty.channel.EventLoopGroup;
import org.apache.flink.kinesis.shaded.io.netty.channel.pool.ChannelPool;
import org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2Connection;
import org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2Exception;
import org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2Stream;
import org.apache.flink.kinesis.shaded.io.netty.util.AttributeKey;
import org.apache.flink.kinesis.shaded.io.netty.util.concurrent.Future;
import org.apache.flink.kinesis.shaded.io.netty.util.concurrent.Promise;
import org.apache.flink.kinesis.shaded.io.netty.util.concurrent.PromiseCombiner;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.annotations.SdkInternalApi;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.annotations.SdkTestInternalApi;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.HttpMetric;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.Protocol;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.SdkChannelPool;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyClientLogger;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.metrics.MetricCollector;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.Validate;

@SdkInternalApi
/* loaded from: input_file:org/apache/flink/kinesis/shaded/software/amazon/awssdk/http/nio/netty/internal/http2/Http2MultiplexedChannelPool.class */
public class Http2MultiplexedChannelPool implements SdkChannelPool {
    private static final NettyClientLogger log = NettyClientLogger.getLogger(Http2MultiplexedChannelPool.class);
    private static final AttributeKey<MultiplexedChannelRecord> MULTIPLEXED_CHANNEL = NettyUtils.getOrCreateAttributeKey("org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.MULTIPLEXED_CHANNEL");
    private static final AttributeKey<Boolean> RELEASED = NettyUtils.getOrCreateAttributeKey("org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.RELEASED");
    private final ChannelPool connectionPool;
    private final EventLoopGroup eventLoopGroup;
    private final Set<MultiplexedChannelRecord> connections;
    private final Duration idleConnectionTimeout;
    private AtomicBoolean closed;

    /* JADX INFO: Access modifiers changed from: private */
    @ChannelHandler.Sharable
    /* loaded from: input_file:org/apache/flink/kinesis/shaded/software/amazon/awssdk/http/nio/netty/internal/http2/Http2MultiplexedChannelPool$ReleaseOnExceptionHandler.class */
    public static final class ReleaseOnExceptionHandler extends ChannelDuplexHandler {
        private static final ReleaseOnExceptionHandler INSTANCE = new ReleaseOnExceptionHandler();

        private ReleaseOnExceptionHandler() {
        }

        @Override // org.apache.flink.kinesis.shaded.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.flink.kinesis.shaded.io.netty.channel.ChannelInboundHandler
        public void channelInactive(ChannelHandlerContext channelHandlerContext) {
            closeAndReleaseParent(channelHandlerContext, new ClosedChannelException());
        }

        @Override // org.apache.flink.kinesis.shaded.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.flink.kinesis.shaded.io.netty.channel.ChannelHandlerAdapter, org.apache.flink.kinesis.shaded.io.netty.channel.ChannelHandler, org.apache.flink.kinesis.shaded.io.netty.channel.ChannelInboundHandler
        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            if (th instanceof Http2ConnectionTerminatingException) {
                closeConnectionToNewRequests(channelHandlerContext, th);
            } else {
                closeAndReleaseParent(channelHandlerContext, th);
            }
        }

        void closeConnectionToNewRequests(ChannelHandlerContext channelHandlerContext, Throwable th) {
            MultiplexedChannelRecord multiplexedChannelRecord = (MultiplexedChannelRecord) channelHandlerContext.channel().attr(Http2MultiplexedChannelPool.MULTIPLEXED_CHANNEL).get();
            if (multiplexedChannelRecord != null) {
                multiplexedChannelRecord.closeToNewStreams();
            } else {
                closeAndReleaseParent(channelHandlerContext, th);
            }
        }

        private void closeAndReleaseParent(ChannelHandlerContext channelHandlerContext, Throwable th) {
            ((Http2MultiplexedChannelPool) channelHandlerContext.channel().attr(ChannelAttributeKey.HTTP2_MULTIPLEXED_CHANNEL_POOL).get()).closeAndReleaseParent(channelHandlerContext.channel(), th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Http2MultiplexedChannelPool(ChannelPool channelPool, EventLoopGroup eventLoopGroup, Duration duration) {
        this.closed = new AtomicBoolean(false);
        this.connectionPool = channelPool;
        this.eventLoopGroup = eventLoopGroup;
        this.connections = ConcurrentHashMap.newKeySet();
        this.idleConnectionTimeout = duration;
    }

    @SdkTestInternalApi
    Http2MultiplexedChannelPool(ChannelPool channelPool, EventLoopGroup eventLoopGroup, Set<MultiplexedChannelRecord> set, Duration duration) {
        this(channelPool, eventLoopGroup, duration);
        this.connections.addAll(set);
    }

    @Override // org.apache.flink.kinesis.shaded.io.netty.channel.pool.ChannelPool
    public Future<Channel> acquire() {
        return acquire(this.eventLoopGroup.next().newPromise());
    }

    @Override // org.apache.flink.kinesis.shaded.io.netty.channel.pool.ChannelPool
    public Future<Channel> acquire(Promise<Channel> promise) {
        if (this.closed.get()) {
            return promise.setFailure(new IOException("Channel pool is closed!"));
        }
        Iterator<MultiplexedChannelRecord> it = this.connections.iterator();
        while (it.hasNext()) {
            if (acquireStreamOnInitializedConnection(it.next(), promise)) {
                return promise;
            }
        }
        acquireStreamOnNewConnection(promise);
        return promise;
    }

    private void acquireStreamOnNewConnection(Promise<Channel> promise) {
        Future<Channel> acquire = this.connectionPool.acquire();
        acquire.addListener2(future -> {
            if (!acquire.isSuccess()) {
                promise.setFailure(acquire.cause());
                return;
            }
            Channel channel = (Channel) acquire.getNow();
            try {
                channel.attr(ChannelAttributeKey.HTTP2_MULTIPLEXED_CHANNEL_POOL).set(this);
                ((CompletableFuture) channel.attr(ChannelAttributeKey.PROTOCOL_FUTURE).get()).thenAccept(protocol -> {
                    acquireStreamOnFreshConnection(promise, channel, protocol);
                }).exceptionally(th -> {
                    return failAndCloseParent(promise, channel, th);
                });
            } catch (Throwable th2) {
                failAndCloseParent(promise, channel, th2);
            }
        });
    }

    private void acquireStreamOnFreshConnection(Promise<Channel> promise, Channel channel, Protocol protocol) {
        try {
            Long l = (Long) channel.attr(ChannelAttributeKey.MAX_CONCURRENT_STREAMS).get();
            Validate.isTrue(protocol == Protocol.HTTP2, "Protocol negotiated on connection (%s) was expected to be HTTP/2, but it was %s.", channel, Protocol.HTTP1_1);
            Validate.isTrue(l != null, "HTTP/2 was negotiated on the connection (%s), but the maximum number of streams was not initialized.", channel);
            Validate.isTrue(l.longValue() > 0, "Maximum streams were not positive on channel (%s).", channel);
            MultiplexedChannelRecord multiplexedChannelRecord = new MultiplexedChannelRecord(channel, l.longValue(), this.idleConnectionTimeout);
            channel.attr(MULTIPLEXED_CHANNEL).set(multiplexedChannelRecord);
            Promise<Channel> newPromise = channel.eventLoop().newPromise();
            if (acquireStreamOnInitializedConnection(multiplexedChannelRecord, newPromise)) {
                newPromise.addListener2(future -> {
                    if (newPromise.isSuccess()) {
                        cacheConnectionForFutureStreams((Channel) newPromise.getNow(), multiplexedChannelRecord, promise);
                    } else {
                        promise.setFailure(newPromise.cause());
                    }
                });
            } else {
                failAndCloseParent(promise, channel, new IOException("Connection was closed while creating a new stream."));
            }
        } catch (Throwable th) {
            failAndCloseParent(promise, channel, th);
        }
    }

    private void cacheConnectionForFutureStreams(Channel channel, MultiplexedChannelRecord multiplexedChannelRecord, Promise<Channel> promise) {
        Channel parent = channel.parent();
        parent.pipeline().addLast(ReleaseOnExceptionHandler.INSTANCE);
        this.connections.add(multiplexedChannelRecord);
        if (this.closed.get()) {
            failAndCloseParent(promise, parent, new IOException("Connection pool was closed while creating a new stream."));
        } else {
            promise.setSuccess(channel);
        }
    }

    private void tryExpandConnectionWindow(Channel channel) {
        NettyUtils.doInEventLoop(channel.eventLoop(), () -> {
            Http2Connection http2Connection = (Http2Connection) channel.attr(ChannelAttributeKey.HTTP2_CONNECTION).get();
            Integer num = (Integer) channel.attr(ChannelAttributeKey.HTTP2_INITIAL_WINDOW_SIZE).get();
            Validate.notNull(http2Connection, "http2Connection should not be null on channel " + channel, new Object[0]);
            Validate.notNull(http2Connection, "initialWindowSize should not be null on channel " + channel, new Object[0]);
            Http2Stream connectionStream = http2Connection.connectionStream();
            log.debug(channel, () -> {
                return "Expanding connection window size for " + channel + " by " + num;
            });
            try {
                http2Connection.local().flowController().incrementWindowSize(connectionStream, num.intValue());
            } catch (Http2Exception e) {
                log.warn(channel, () -> {
                    return "Failed to increment windowSize of connection " + channel;
                }, e);
            }
        });
    }

    private Void failAndCloseParent(Promise<Channel> promise, Channel channel, Throwable th) {
        log.debug(channel, () -> {
            return "Channel acquiring failed, closing connection " + channel;
        }, th);
        promise.setFailure(th);
        closeAndReleaseParent(channel);
        return null;
    }

    private boolean acquireStreamOnInitializedConnection(MultiplexedChannelRecord multiplexedChannelRecord, Promise<Channel> promise) {
        Promise<Channel> newPromise = multiplexedChannelRecord.getConnection().eventLoop().newPromise();
        if (!multiplexedChannelRecord.acquireStream(newPromise)) {
            return false;
        }
        newPromise.addListener2(future -> {
            try {
                if (!newPromise.isSuccess()) {
                    promise.setFailure(newPromise.cause());
                    return;
                }
                Channel channel = (Channel) newPromise.getNow();
                channel.attr(ChannelAttributeKey.HTTP2_MULTIPLEXED_CHANNEL_POOL).set(this);
                channel.attr(MULTIPLEXED_CHANNEL).set(multiplexedChannelRecord);
                promise.setSuccess(channel);
                tryExpandConnectionWindow(channel.parent());
            } catch (Exception e) {
                promise.setFailure(e);
            }
        });
        return true;
    }

    @Override // org.apache.flink.kinesis.shaded.io.netty.channel.pool.ChannelPool
    public Future<Void> release(Channel channel) {
        return release(channel, channel.eventLoop().newPromise());
    }

    @Override // org.apache.flink.kinesis.shaded.io.netty.channel.pool.ChannelPool
    public Future<Void> release(Channel channel, Promise<Void> promise) {
        if (channel.parent() == null) {
            closeAndReleaseParent(channel);
            return promise.setFailure(new IllegalArgumentException("Channel (" + channel + ") is not a child channel."));
        }
        Channel parent = channel.parent();
        MultiplexedChannelRecord multiplexedChannelRecord = (MultiplexedChannelRecord) parent.attr(MULTIPLEXED_CHANNEL).get();
        if (multiplexedChannelRecord != null) {
            multiplexedChannelRecord.closeAndReleaseChild(channel);
            return multiplexedChannelRecord.canBeClosedAndReleased() ? closeAndReleaseParent(parent, null, promise) : promise.setSuccess(null);
        }
        IOException iOException = new IOException("Channel (" + channel + ") is not associated with any channel records. It will be closed, but cannot be released within this pool.");
        NettyClientLogger nettyClientLogger = log;
        Objects.requireNonNull(iOException);
        nettyClientLogger.error(channel, iOException::getMessage);
        channel.close();
        return promise.setFailure(iOException);
    }

    private Future<Void> closeAndReleaseParent(Channel channel) {
        return closeAndReleaseParent(channel, null, channel.eventLoop().newPromise());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<Void> closeAndReleaseParent(Channel channel, Throwable th) {
        return closeAndReleaseParent(channel, th, channel.eventLoop().newPromise());
    }

    private Future<Void> closeAndReleaseParent(Channel channel, Throwable th, Promise<Void> promise) {
        if (channel.parent() != null) {
            IOException iOException = new IOException("Channel (" + channel + ") is not a parent channel. It will be closed, but cannot be released within this pool.");
            NettyClientLogger nettyClientLogger = log;
            Objects.requireNonNull(iOException);
            nettyClientLogger.error(channel, iOException::getMessage);
            channel.close();
            return promise.setFailure(iOException);
        }
        MultiplexedChannelRecord multiplexedChannelRecord = (MultiplexedChannelRecord) channel.attr(MULTIPLEXED_CHANNEL).get();
        if (multiplexedChannelRecord != null) {
            if (th == null) {
                multiplexedChannelRecord.closeChildChannels();
            } else {
                multiplexedChannelRecord.closeChildChannels(th);
            }
            this.connections.remove(multiplexedChannelRecord);
        }
        channel.close();
        return channel.attr(RELEASED).getAndSet(Boolean.TRUE) == null ? this.connectionPool.release(channel, promise) : promise.setSuccess(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleGoAway(Channel channel, int i, GoAwayException goAwayException) {
        log.debug(channel, () -> {
            return "Received GOAWAY on " + channel + " with lastStreamId of " + i;
        });
        try {
            MultiplexedChannelRecord multiplexedChannelRecord = (MultiplexedChannelRecord) channel.attr(MULTIPLEXED_CHANNEL).get();
            if (multiplexedChannelRecord != null) {
                multiplexedChannelRecord.handleGoAway(i, goAwayException);
            } else {
                closeAndReleaseParent(channel, goAwayException);
            }
        } catch (Exception e) {
            log.error(channel, () -> {
                return "Failed to handle GOAWAY frame on channel " + channel;
            }, e);
        }
    }

    @Override // org.apache.flink.kinesis.shaded.io.netty.channel.pool.ChannelPool, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            Future<?> doClose = doClose();
            try {
                if (!doClose.await(10L, TimeUnit.SECONDS)) {
                    throw new RuntimeException("Event loop didn't close after 10 seconds.");
                }
                Throwable cause = doClose.cause();
                if (cause != null) {
                    throw new RuntimeException("Failed to close channel pool.", cause);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    }

    private Future<?> doClose() {
        EventLoop next = this.eventLoopGroup.next();
        Promise newPromise = next.newPromise();
        NettyUtils.doInEventLoop(next, () -> {
            Promise<Void> newPromise2 = next.newPromise();
            PromiseCombiner promiseCombiner = new PromiseCombiner(next);
            Iterator it = new ArrayList(this.connections).iterator();
            while (it.hasNext()) {
                promiseCombiner.add(closeAndReleaseParent(((MultiplexedChannelRecord) it.next()).getConnection()));
            }
            promiseCombiner.finish(newPromise2);
            newPromise2.addListener2(future -> {
                this.connectionPool.close();
                newPromise.setSuccess(null);
            });
        });
        return newPromise;
    }

    @Override // org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.SdkChannelPool
    public CompletableFuture<Void> collectChannelPoolMetrics(MetricCollector metricCollector) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        CompletableFuture<MultiplexedChannelRecord.Metrics> completableFuture2 = new CompletableFuture<>();
        accumulateMetrics(completableFuture2, (List) this.connections.stream().map((v0) -> {
            return v0.getMetrics();
        }).collect(Collectors.toList()));
        completableFuture2.whenComplete((metrics, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
                return;
            }
            try {
                metricCollector.reportMetric(HttpMetric.AVAILABLE_CONCURRENCY, Integer.valueOf(Math.toIntExact(metrics.getAvailableStreams())));
                completableFuture.complete(null);
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
        });
        return completableFuture;
    }

    private void accumulateMetrics(CompletableFuture<MultiplexedChannelRecord.Metrics> completableFuture, List<CompletableFuture<MultiplexedChannelRecord.Metrics>> list) {
        accumulateMetrics(completableFuture, list, new MultiplexedChannelRecord.Metrics(), 0);
    }

    private void accumulateMetrics(CompletableFuture<MultiplexedChannelRecord.Metrics> completableFuture, List<CompletableFuture<MultiplexedChannelRecord.Metrics>> list, MultiplexedChannelRecord.Metrics metrics, int i) {
        if (i >= list.size()) {
            completableFuture.complete(metrics);
        } else {
            list.get(i).whenComplete((metrics2, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                } else {
                    metrics.add(metrics2);
                    accumulateMetrics(completableFuture, list, metrics, i + 1);
                }
            });
        }
    }
}
