/*
 * Decompiled with CFR 0.152.
 */
package com.dtp.adapter.rocketmq;

import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import com.dtp.adapter.common.AbstractDtpAdapter;
import com.dtp.common.ApplicationContextHolder;
import com.dtp.common.properties.DtpProperties;
import com.dtp.common.util.ReflectionUtil;
import com.dtp.core.support.ExecutorWrapper;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.collections.MapUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binding.BindingService;

public class StreamRocketMqDtpAdapter
extends AbstractDtpAdapter {
    private static final Logger log = LoggerFactory.getLogger(StreamRocketMqDtpAdapter.class);
    private static final String NAME = "rocketMqTp";
    private static final String CONSUME_EXECUTOR_FIELD_NAME = "consumeExecutor";

    public void refresh(DtpProperties dtpProperties) {
        this.refresh(NAME, dtpProperties.getRocketMqTp(), dtpProperties.getPlatforms());
    }

    protected void initialize() {
        super.initialize();
        Map beans = ApplicationContextHolder.getBeansOfType(BindingService.class);
        if (MapUtils.isEmpty((Map)beans)) {
            log.warn("Cannot find beans of type BindingService.");
            return;
        }
        beans.forEach((bindingServiceName, bindingService) -> {
            Map consumerBindings = (Map)ReflectionUtil.getFieldValue(BindingService.class, (String)"consumerBindings", (Object)bindingService);
            if (MapUtils.isEmpty((Map)consumerBindings)) {
                return;
            }
            consumerBindings.forEach((bindingName, messageChannelBinders) -> {
                Binding messageChannelBinder = (Binding)messageChannelBinders.get(0);
                Class<?> messageChannelBinderClass = messageChannelBinder.getClass();
                RocketMQInboundChannelAdapter lifecycle = (RocketMQInboundChannelAdapter)ReflectionUtil.getFieldValue(messageChannelBinderClass, (String)"lifecycle", (Object)messageChannelBinder);
                DefaultMQPushConsumer pushConsumer = (DefaultMQPushConsumer)ReflectionUtil.getFieldValue(RocketMQInboundChannelAdapter.class, (String)"pushConsumer", (Object)lifecycle);
                DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = (DefaultMQPushConsumerImpl)ReflectionUtil.getFieldValue(DefaultMQPushConsumer.class, (String)"defaultMQPushConsumerImpl", (Object)pushConsumer);
                if (defaultMQPushConsumerImpl == null || defaultMQPushConsumerImpl.getConsumeMessageService() == null) {
                    return;
                }
                ConsumeMessageService consumeMessageService = defaultMQPushConsumerImpl.getConsumeMessageService();
                ThreadPoolExecutor executor = null;
                if (consumeMessageService instanceof ConsumeMessageConcurrentlyService) {
                    executor = (ThreadPoolExecutor)ReflectionUtil.getFieldValue(ConsumeMessageConcurrentlyService.class, (String)CONSUME_EXECUTOR_FIELD_NAME, (Object)consumeMessageService);
                } else if (consumeMessageService instanceof ConsumeMessageOrderlyService) {
                    executor = (ThreadPoolExecutor)ReflectionUtil.getFieldValue(ConsumeMessageOrderlyService.class, (String)CONSUME_EXECUTOR_FIELD_NAME, (Object)consumeMessageService);
                }
                if (Objects.nonNull(executor)) {
                    String destination = (String)ReflectionUtil.getFieldValue(messageChannelBinderClass, (String)"name", (Object)messageChannelBinder);
                    String group = (String)ReflectionUtil.getFieldValue(messageChannelBinderClass, (String)"group", (Object)messageChannelBinder);
                    String key = group + "#" + destination;
                    ExecutorWrapper executorWrapper = new ExecutorWrapper(key, (Executor)executor);
                    this.initNotifyItems(key, executorWrapper);
                    this.executors.put(key, executorWrapper);
                }
            });
        });
        log.info("DynamicTp adapter, rocketMq consumer executors init end, executors: {}", (Object)this.executors);
    }
}

