package org.apache.flink.networking;

import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.flink.shaded.testutils.org.jboss.netty.bootstrap.ClientBootstrap;
import org.apache.flink.shaded.testutils.org.jboss.netty.buffer.ChannelBuffer;
import org.apache.flink.shaded.testutils.org.jboss.netty.buffer.ChannelBuffers;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.Channel;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.ChannelFuture;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.ChannelStateEvent;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.ExceptionEvent;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.MessageEvent;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/networking/NetworkFailureHandler.class */
public class NetworkFailureHandler extends SimpleChannelUpstreamHandler {
    private static final Logger LOG = LoggerFactory.getLogger(NetworkFailureHandler.class);
    private static final String TARGET_CHANNEL_HANDLER_NAME = "target_channel_handler";
    private final Map<Channel, Channel> sourceToTargetChannels = new ConcurrentHashMap();
    private final Consumer<NetworkFailureHandler> onClose;
    private final ClientSocketChannelFactory channelFactory;
    private final String remoteHost;
    private final int remotePort;
    private final AtomicBoolean blocked;

    /* loaded from: input_file:org/apache/flink/networking/NetworkFailureHandler$TargetChannelHandler.class */
    private static class TargetChannelHandler extends SimpleChannelUpstreamHandler {
        private final Channel sourceChannel;
        private final AtomicBoolean blocked;

        TargetChannelHandler(Channel channel, AtomicBoolean atomicBoolean) {
            this.sourceChannel = channel;
            this.blocked = atomicBoolean;
        }

        @Override // org.apache.flink.shaded.testutils.org.jboss.netty.channel.SimpleChannelUpstreamHandler
        public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
            if (this.blocked.get()) {
                return;
            }
            this.sourceChannel.write((ChannelBuffer) messageEvent.getMessage());
        }

        @Override // org.apache.flink.shaded.testutils.org.jboss.netty.channel.SimpleChannelUpstreamHandler
        public void channelClosed(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
            NetworkFailureHandler.closeOnFlush(this.sourceChannel);
        }

        @Override // org.apache.flink.shaded.testutils.org.jboss.netty.channel.SimpleChannelUpstreamHandler
        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) throws Exception {
            NetworkFailureHandler.LOG.error("Closing communication channel because of an exception", exceptionEvent.getCause());
            NetworkFailureHandler.closeOnFlush(exceptionEvent.getChannel());
        }
    }

    public NetworkFailureHandler(AtomicBoolean atomicBoolean, Consumer<NetworkFailureHandler> consumer, ClientSocketChannelFactory clientSocketChannelFactory, String str, int i) {
        this.blocked = atomicBoolean;
        this.onClose = consumer;
        this.channelFactory = clientSocketChannelFactory;
        this.remoteHost = str;
        this.remotePort = i;
    }

    static void closeOnFlush(Channel channel) {
        if (channel.isConnected()) {
            channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }

    public void closeConnections() {
        Iterator<Map.Entry<Channel, Channel>> it = this.sourceToTargetChannels.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getKey().close();
        }
    }

    @Override // org.apache.flink.shaded.testutils.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void channelOpen(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        Channel channel = channelStateEvent.getChannel();
        channel.setReadable(false);
        boolean z = this.blocked.get();
        LOG.debug("Attempt to open proxy channel from [{}] to [{}:{}] in state [blocked = {}]", new Object[]{channel.getLocalAddress(), this.remoteHost, Integer.valueOf(this.remotePort), Boolean.valueOf(z)});
        if (z) {
            channel.close();
            return;
        }
        ClientBootstrap clientBootstrap = new ClientBootstrap(this.channelFactory);
        clientBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME, new TargetChannelHandler(channelStateEvent.getChannel(), this.blocked));
        ChannelFuture connect = clientBootstrap.connect(new InetSocketAddress(this.remoteHost, this.remotePort));
        this.sourceToTargetChannels.put(channel, connect.getChannel());
        connect.addListener(channelFuture -> {
            if (channelFuture.isSuccess()) {
                channel.setReadable(true);
            } else {
                channel.close();
            }
        });
    }

    @Override // org.apache.flink.shaded.testutils.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        if (this.blocked.get()) {
            return;
        }
        ChannelBuffer channelBuffer = (ChannelBuffer) messageEvent.getMessage();
        Channel channel = this.sourceToTargetChannels.get(messageEvent.getChannel());
        if (channel == null) {
            throw new IllegalStateException("Could not find a target channel for the source channel");
        }
        channel.write(channelBuffer);
    }

    @Override // org.apache.flink.shaded.testutils.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void channelClosed(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        Channel channel = this.sourceToTargetChannels.get(channelStateEvent.getChannel());
        if (channel == null) {
            return;
        }
        closeOnFlush(channel);
        this.sourceToTargetChannels.remove(channelStateEvent.getChannel());
        this.onClose.accept(this);
    }

    @Override // org.apache.flink.shaded.testutils.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) throws Exception {
        LOG.error("Closing communication channel because of an exception", exceptionEvent.getCause());
        closeOnFlush(exceptionEvent.getChannel());
    }
}
