package org.springframework.integration.ip.tcp;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory;
import org.springframework.integration.ip.tcp.connection.ClientModeCapable;
import org.springframework.integration.ip.tcp.connection.ClientModeConnectionManager;
import org.springframework.integration.ip.tcp.connection.ConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpConnection;
import org.springframework.integration.ip.tcp.connection.TcpConnectionFailedCorrelationEvent;
import org.springframework.integration.ip.tcp.connection.TcpSender;
import org.springframework.integration.support.management.ManageableLifecycle;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/ip/tcp/TcpSendingMessageHandler.class */
public class TcpSendingMessageHandler extends AbstractMessageHandler implements TcpSender, ManageableLifecycle, ClientModeCapable {
    public static final long DEFAULT_RETRY_INTERVAL = 60000;
    private AbstractConnectionFactory clientConnectionFactory;
    private AbstractConnectionFactory serverConnectionFactory;
    private boolean isClientMode;
    private boolean isSingleUse;
    private volatile ScheduledFuture<?> scheduledFuture;
    private volatile ClientModeConnectionManager clientModeConnectionManager;
    private volatile boolean active;
    protected final Object lifecycleMonitor = new Object();
    private final Map<String, TcpConnection> connections = new ConcurrentHashMap();
    private long retryInterval = 60000;

    protected TcpConnection obtainConnection(Message<?> message) {
        Assert.notNull(this.clientConnectionFactory, "'clientConnectionFactory' cannot be null");
        try {
            return this.clientConnectionFactory.getConnection();
        } catch (Exception e) {
            this.logger.error(e, "Error creating connection");
            throw new MessageHandlingException(message, "Failed to obtain a connection in the [" + this + "]", e);
        }
    }

    public void handleMessageInternal(Message<?> message) {
        if (this.serverConnectionFactory != null) {
            handleMessageAsServer(message);
        } else {
            handleMessageAsClient(message);
        }
    }

    private void handleMessageAsServer(Message<?> message) {
        String str = (String) message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class);
        TcpConnection tcpConnection = null;
        if (str != null) {
            tcpConnection = this.connections.get(str);
        }
        try {
            if (tcpConnection == null) {
                this.logger.error(() -> {
                    return "Unable to find outbound socket for " + message;
                });
                MessageHandlingException messageHandlingException = new MessageHandlingException(message, "Unable to find outbound socket in the [" + this + "]");
                publishNoConnectionEvent(messageHandlingException, str);
                throw messageHandlingException;
            }
            try {
                tcpConnection.send(message);
                if (this.isSingleUse) {
                    tcpConnection.close();
                }
            } catch (Exception e) {
                this.logger.error(e, "Error sending message");
                tcpConnection.close();
                throw IntegrationUtils.wrapInHandlingExceptionIfNecessary(message, () -> {
                    return "Error sending message in the [" + this + "]";
                }, e);
            }
        } catch (Throwable th) {
            if (this.isSingleUse) {
                tcpConnection.close();
            }
            throw th;
        }
    }

    private void handleMessageAsClient(Message<?> message) {
        TcpConnection tcpConnection = null;
        try {
            try {
                tcpConnection = doWrite(message);
                if (tcpConnection != null && this.isSingleUse && this.clientConnectionFactory.getListener() == null) {
                    tcpConnection.close();
                }
            } catch (MessageHandlingException e) {
                if (!(e.getCause() instanceof IOException)) {
                    throw e;
                }
                this.logger.debug(e, "Fail on first write attempt");
                TcpConnection doWrite = doWrite(message);
                if (doWrite != null && this.isSingleUse && this.clientConnectionFactory.getListener() == null) {
                    doWrite.close();
                }
            }
        } catch (Throwable th) {
            if (tcpConnection != null && this.isSingleUse && this.clientConnectionFactory.getListener() == null) {
                tcpConnection.close();
            }
            throw th;
        }
    }

    protected TcpConnection doWrite(Message<?> message) {
        TcpConnection tcpConnection = null;
        try {
            tcpConnection = obtainConnection(message);
            this.logger.debug(() -> {
                return "Got Connection " + tcpConnection.getConnectionId();
            });
            tcpConnection.send(message);
            return tcpConnection;
        } catch (Exception e) {
            String connectionId = tcpConnection != null ? tcpConnection.getConnectionId() : null;
            throw IntegrationUtils.wrapInHandlingExceptionIfNecessary(message, () -> {
                return "Failed to handle message in the [" + this + "] using " + connectionId;
            }, e);
        }
    }

    private void publishNoConnectionEvent(MessageHandlingException messageHandlingException, String str) {
        ApplicationEventPublisher applicationEventPublisher = (this.serverConnectionFactory != null ? this.serverConnectionFactory : this.clientConnectionFactory).getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent(new TcpConnectionFailedCorrelationEvent(this, str, messageHandlingException));
        }
    }

    public void setConnectionFactory(AbstractConnectionFactory abstractConnectionFactory) {
        if (abstractConnectionFactory instanceof AbstractClientConnectionFactory) {
            this.clientConnectionFactory = abstractConnectionFactory;
        } else {
            this.serverConnectionFactory = abstractConnectionFactory;
            abstractConnectionFactory.registerSender(this);
        }
        this.isSingleUse = abstractConnectionFactory.isSingleUse();
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpSender
    public void addNewConnection(TcpConnection tcpConnection) {
        this.connections.put(tcpConnection.getConnectionId(), tcpConnection);
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpSender
    public void removeDeadConnection(TcpConnection tcpConnection) {
        this.connections.remove(tcpConnection.getConnectionId());
    }

    public String getComponentType() {
        return "ip:tcp-outbound-channel-adapter";
    }

    protected void onInit() {
        super.onInit();
        if (this.isClientMode) {
            Assert.notNull(this.clientConnectionFactory, "For client-mode, connection factory must be type='client'");
            Assert.isTrue(!this.clientConnectionFactory.isSingleUse(), "For client-mode, connection factory must have single-use='false'");
        }
    }

    public void start() {
        synchronized (this.lifecycleMonitor) {
            if (!this.active) {
                this.active = true;
                if (this.clientConnectionFactory != null) {
                    this.clientConnectionFactory.start();
                }
                if (this.serverConnectionFactory != null) {
                    this.serverConnectionFactory.start();
                }
                if (this.isClientMode) {
                    Assert.notNull(this.clientConnectionFactory, "For client-mode, connection factory must be type='client'");
                    ClientModeConnectionManager clientModeConnectionManager = new ClientModeConnectionManager(this.clientConnectionFactory);
                    this.clientModeConnectionManager = clientModeConnectionManager;
                    TaskScheduler taskScheduler = getTaskScheduler();
                    Assert.state(taskScheduler != null, "Client mode requires a task scheduler");
                    this.scheduledFuture = taskScheduler.scheduleAtFixedRate(clientModeConnectionManager, Duration.ofMillis(this.retryInterval));
                }
            }
        }
    }

    public void stop() {
        synchronized (this.lifecycleMonitor) {
            if (this.active) {
                this.active = false;
                if (this.scheduledFuture != null) {
                    this.scheduledFuture.cancel(true);
                }
                this.clientModeConnectionManager = null;
                if (this.clientConnectionFactory != null) {
                    this.clientConnectionFactory.stop();
                }
                if (this.serverConnectionFactory != null) {
                    this.serverConnectionFactory.stop();
                }
            }
        }
    }

    public boolean isRunning() {
        return this.active;
    }

    protected ConnectionFactory getClientConnectionFactory() {
        return this.clientConnectionFactory;
    }

    protected ConnectionFactory getServerConnectionFactory() {
        return this.serverConnectionFactory;
    }

    protected Map<String, TcpConnection> getConnections() {
        return this.connections;
    }

    @Override // org.springframework.integration.ip.tcp.connection.ClientModeCapable
    public boolean isClientMode() {
        return this.isClientMode;
    }

    public void setClientMode(boolean z) {
        this.isClientMode = z;
    }

    public long getRetryInterval() {
        return this.retryInterval;
    }

    public void setRetryInterval(long j) {
        this.retryInterval = j;
    }

    @Override // org.springframework.integration.ip.tcp.connection.ClientModeCapable
    public boolean isClientModeConnected() {
        if (!this.isClientMode || this.clientModeConnectionManager == null) {
            return false;
        }
        return this.clientModeConnectionManager.isConnected();
    }

    @Override // org.springframework.integration.ip.tcp.connection.ClientModeCapable
    public void retryConnection() {
        if (this.active && this.isClientMode && this.clientModeConnectionManager != null) {
            this.clientModeConnectionManager.run();
        }
    }
}
