package org.springframework.integration.stomp.inbound;

import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.Lifecycle;
import org.springframework.core.log.LogMessage;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mapping.HeaderMapper;
import org.springframework.integration.stomp.StompSessionManager;
import org.springframework.integration.stomp.event.StompReceiptEvent;
import org.springframework.integration.stomp.support.StompHeaderMapper;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandler;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;

@IntegrationManagedResource
@ManagedResource
/* loaded from: input_file:org/springframework/integration/stomp/inbound/StompInboundChannelAdapter.class */
public class StompInboundChannelAdapter extends MessageProducerSupport implements ApplicationEventPublisherAware {
    private final StompSessionManager stompSessionManager;
    private ApplicationEventPublisher applicationEventPublisher;
    private volatile StompSession stompSession;
    private final StompSessionHandler stompSessionHandler = new IntegrationInboundStompSessionHandler();
    private final Set<String> destinations = new LinkedHashSet();
    private final Map<String, StompSession.Subscription> subscriptions = new HashMap();
    private final Lock destinationLock = new ReentrantLock();
    private Class<?> payloadType = String.class;
    private HeaderMapper<StompHeaders> headerMapper = new StompHeaderMapper();

    /* loaded from: input_file:org/springframework/integration/stomp/inbound/StompInboundChannelAdapter$IntegrationInboundStompSessionHandler.class */
    private class IntegrationInboundStompSessionHandler extends StompSessionHandlerAdapter {
        private IntegrationInboundStompSessionHandler() {
        }

        public void afterConnected(StompSession stompSession, StompHeaders stompHeaders) {
            StompInboundChannelAdapter.this.stompSession = stompSession;
            Set<String> set = StompInboundChannelAdapter.this.destinations;
            StompInboundChannelAdapter stompInboundChannelAdapter = StompInboundChannelAdapter.this;
            set.forEach(stompInboundChannelAdapter::subscribeDestination);
        }

        public void handleException(StompSession stompSession, StompCommand stompCommand, StompHeaders stompHeaders, byte[] bArr, Throwable th) {
            String str = "STOMP Frame handling error in the [" + StompInboundChannelAdapter.this + "]";
            MessageChannel errorChannel = StompInboundChannelAdapter.this.getErrorChannel();
            if (errorChannel == null) {
                StompInboundChannelAdapter.this.logger.error(th, str);
                return;
            }
            StompHeaderAccessor create = StompHeaderAccessor.create(stompCommand);
            create.copyHeaders(StompInboundChannelAdapter.this.headerMapper.toHeaders(stompHeaders));
            StompInboundChannelAdapter.this.getMessagingTemplate().send(errorChannel, new ErrorMessage(IntegrationUtils.wrapInHandlingExceptionIfNecessary(MessageBuilder.createMessage(bArr, create.getMessageHeaders()), () -> {
                return str;
            }, th)));
        }

        public void handleTransportError(StompSession stompSession, Throwable th) {
            StompInboundChannelAdapter.this.stompSession = null;
        }
    }

    public StompInboundChannelAdapter(StompSessionManager stompSessionManager, String... strArr) {
        Assert.notNull(stompSessionManager, "'stompSessionManager' is required.");
        if (strArr != null) {
            for (String str : strArr) {
                Assert.hasText(str, "'destinations' must not have empty strings.");
                this.destinations.add(str);
            }
        }
        this.stompSessionManager = stompSessionManager;
    }

    public void setPayloadType(Class<?> cls) {
        Assert.notNull(cls, "'payloadType' must not be null.");
        this.payloadType = cls;
    }

    public void setHeaderMapper(HeaderMapper<StompHeaders> headerMapper) {
        Assert.notNull(headerMapper, "'headerMapper' must not be null.");
        this.headerMapper = headerMapper;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    @ManagedAttribute
    public String[] getDestinations() {
        this.destinationLock.lock();
        try {
            return (String[]) this.destinations.toArray(new String[0]);
        } finally {
            this.destinationLock.unlock();
        }
    }

    @ManagedOperation
    public void addDestination(String... strArr) {
        Assert.notNull(strArr, "'destination' cannot be null");
        this.destinationLock.lock();
        try {
            Stream stream = Arrays.stream(strArr);
            Set<String> set = this.destinations;
            Objects.requireNonNull(set);
            stream.filter((v1) -> {
                return r1.add(v1);
            }).forEach(str -> {
                this.logger.debug(LogMessage.format("Subscribe to destination '%s'.", str));
                subscribeDestination(str);
            });
        } finally {
            this.destinationLock.unlock();
        }
    }

    @ManagedOperation
    public void removeDestination(String... strArr) {
        Assert.notNull(strArr, "'destination' cannot be null");
        this.destinationLock.lock();
        try {
            Stream stream = Arrays.stream(strArr);
            Set<String> set = this.destinations;
            Objects.requireNonNull(set);
            stream.filter((v1) -> {
                return r1.remove(v1);
            }).forEach(str -> {
                this.logger.debug(LogMessage.format("Removed '%s' from subscriptions.", str));
                StompSession.Subscription subscription = this.subscriptions.get(str);
                if (subscription != null) {
                    subscription.unsubscribe();
                } else {
                    this.logger.debug(LogMessage.format("No subscription for destination '%s'.", str));
                }
            });
        } finally {
            this.destinationLock.unlock();
        }
    }

    public String getComponentType() {
        return "stomp:inbound-channel-adapter";
    }

    protected void doStart() {
        if (this.stompSessionManager instanceof Lifecycle) {
            this.stompSessionManager.start();
        }
        this.stompSessionManager.connect(this.stompSessionHandler);
    }

    protected void doStop() {
        this.stompSessionManager.disconnect(this.stompSessionHandler);
        try {
            Iterator<StompSession.Subscription> it = this.subscriptions.values().iterator();
            while (it.hasNext()) {
                it.next().unsubscribe();
            }
        } catch (Exception e) {
            this.logger.warn(e, "The exception during unsubscribing.");
        }
        this.subscriptions.clear();
    }

    private void subscribeDestination(String str) {
        if (this.stompSession == null) {
            this.logger.warn(() -> {
                return "The StompInboundChannelAdapter [" + getComponentName() + "] is not connected to StompSession. Check the state of [" + this.stompSessionManager + "]";
            });
            return;
        }
        StompSession.Subscription subscribe = this.stompSession.subscribe(str, new StompFrameHandler() { // from class: org.springframework.integration.stomp.inbound.StompInboundChannelAdapter.1FrameHandler
            public Type getPayloadType(StompHeaders stompHeaders) {
                return StompInboundChannelAdapter.this.payloadType;
            }

            public void handleFrame(StompHeaders stompHeaders, @Nullable Object obj) {
                if (obj == null) {
                    StompInboundChannelAdapter.this.logger.info("No body in STOMP frame: nothing to produce.");
                } else {
                    StompInboundChannelAdapter.this.sendMessage(obj instanceof Message ? (Message) obj : StompInboundChannelAdapter.this.getMessageBuilderFactory().withPayload(obj).copyHeaders(StompInboundChannelAdapter.this.headerMapper.toHeaders(stompHeaders)).build());
                }
            }
        });
        if (this.stompSessionManager.isAutoReceiptEnabled()) {
            ApplicationEventPublisher applicationEventPublisher = this.applicationEventPublisher;
            if (applicationEventPublisher != null) {
                subscribe.addReceiptTask(() -> {
                    applicationEventPublisher.publishEvent(new StompReceiptEvent(this, str, subscribe.getReceiptId(), StompCommand.SUBSCRIBE, false));
                });
            }
            subscribe.addReceiptLostTask(() -> {
                if (applicationEventPublisher != null) {
                    applicationEventPublisher.publishEvent(new StompReceiptEvent(this, str, subscribe.getReceiptId(), StompCommand.SUBSCRIBE, true));
                } else {
                    this.logger.error(() -> {
                        return "The receipt [" + subscribe.getReceiptId() + "] is lost for [" + subscribe.getSubscriptionId() + "] on destination [" + str + "]";
                    });
                }
            });
        }
        this.subscriptions.put(str, subscribe);
    }
}
