package com.alibaba.otter.canal.server;

import com.alibaba.otter.canal.connector.core.config.MQProperties;
import com.alibaba.otter.canal.connector.core.producer.MQDestination;
import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer;
import com.alibaba.otter.canal.connector.core.util.Callback;
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalMQConfig;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:com/alibaba/otter/canal/server/CanalMQStarter.class */
public class CanalMQStarter {
    private ExecutorService executorService;
    private CanalMQProducer canalMQProducer;
    private MQProperties mqProperties;
    private CanalServerWithEmbedded canalServer;
    private static final Logger logger = LoggerFactory.getLogger(CanalMQStarter.class);
    private static Thread shutdownThread = null;
    private volatile boolean running = false;
    private Map<String, CanalMQRunnable> canalMQWorks = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/otter/canal/server/CanalMQStarter$CanalMQRunnable.class */
    public class CanalMQRunnable implements Runnable {
        private String destination;
        private AtomicBoolean running = new AtomicBoolean(true);

        CanalMQRunnable(String str) {
            this.destination = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            CanalMQStarter.this.worker(this.destination, this.running);
        }

        public void stop() {
            this.running.set(false);
        }
    }

    public CanalMQStarter(CanalMQProducer canalMQProducer) {
        this.canalMQProducer = canalMQProducer;
    }

    public synchronized void start(String str) {
        try {
            if (this.running) {
                return;
            }
            this.mqProperties = this.canalMQProducer.getMqProperties();
            if (this.mqProperties.isFilterTransactionEntry()) {
                System.setProperty("canal.instance.filter.transaction.entry", "true");
            }
            this.canalServer = CanalServerWithEmbedded.instance();
            this.executorService = Executors.newCachedThreadPool();
            logger.info("## start the MQ workers.");
            for (String str2 : StringUtils.split(str, ",")) {
                String trim = str2.trim();
                CanalMQRunnable canalMQRunnable = new CanalMQRunnable(trim);
                this.canalMQWorks.put(trim, canalMQRunnable);
                this.executorService.execute(canalMQRunnable);
            }
            this.running = true;
            logger.info("## the MQ workers is running now ......");
            shutdownThread = new Thread(() -> {
                try {
                    try {
                        logger.info("## stop the MQ workers");
                        this.running = false;
                        this.executorService.shutdown();
                        this.canalMQProducer.stop();
                        logger.info("## canal MQ is down.");
                    } catch (Throwable th) {
                        logger.warn("##something goes wrong when stopping MQ workers:", th);
                        logger.info("## canal MQ is down.");
                    }
                } catch (Throwable th2) {
                    logger.info("## canal MQ is down.");
                    throw th2;
                }
            });
            Runtime.getRuntime().addShutdownHook(shutdownThread);
        } catch (Throwable th) {
            logger.error("## Something goes wrong when starting up the canal MQ workers:", th);
        }
    }

    public synchronized void destroy() {
        this.running = false;
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
        if (this.canalMQProducer != null) {
            this.canalMQProducer.stop();
        }
        if (shutdownThread != null) {
            Runtime.getRuntime().removeShutdownHook(shutdownThread);
            shutdownThread = null;
        }
    }

    public synchronized void startDestination(String str) {
        CanalInstance canalInstance = this.canalServer.getCanalInstances().get(str);
        if (canalInstance != null) {
            stopDestination(str);
            CanalMQRunnable canalMQRunnable = new CanalMQRunnable(str);
            this.canalMQWorks.put(canalInstance.getDestination(), canalMQRunnable);
            this.executorService.execute(canalMQRunnable);
            logger.info("## Start the MQ work of destination:" + str);
        }
    }

    public synchronized void stopDestination(String str) {
        CanalMQRunnable canalMQRunnable = this.canalMQWorks.get(str);
        if (canalMQRunnable != null) {
            canalMQRunnable.stop();
            this.canalMQWorks.remove(str);
            logger.info("## Stop the MQ work of destination:" + str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void worker(String str, AtomicBoolean atomicBoolean) {
        while (true) {
            if (this.running && atomicBoolean.get()) {
                break;
            } else {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
            }
        }
        logger.info("## start the MQ producer: {}.", str);
        MDC.put("destination", str);
        final ClientIdentity clientIdentity = new ClientIdentity(str, (short) 1001, "");
        while (this.running && atomicBoolean.get()) {
            try {
                CanalInstance canalInstance = this.canalServer.getCanalInstances().get(str);
                if (canalInstance == null) {
                    try {
                        Thread.sleep(3000L);
                    } catch (InterruptedException e2) {
                    }
                } else {
                    MQDestination mQDestination = new MQDestination();
                    mQDestination.setCanalDestination(str);
                    CanalMQConfig mqConfig = canalInstance.getMqConfig();
                    mQDestination.setTopic(mqConfig.getTopic());
                    mQDestination.setPartition(mqConfig.getPartition());
                    mQDestination.setDynamicTopic(mqConfig.getDynamicTopic());
                    mQDestination.setPartitionsNum(mqConfig.getPartitionsNum());
                    mQDestination.setPartitionHash(mqConfig.getPartitionHash());
                    mQDestination.setDynamicTopicPartitionNum(mqConfig.getDynamicTopicPartitionNum());
                    this.canalServer.subscribe(clientIdentity);
                    logger.info("## the MQ producer: {} is running now ......", str);
                    Integer fetchTimeout = this.mqProperties.getFetchTimeout();
                    Integer batchSize = this.mqProperties.getBatchSize();
                    while (this.running && atomicBoolean.get()) {
                        Message withoutAck = (fetchTimeout == null || fetchTimeout.intValue() <= 0) ? this.canalServer.getWithoutAck(clientIdentity, batchSize.intValue()) : this.canalServer.getWithoutAck(clientIdentity, batchSize.intValue(), Long.valueOf(fetchTimeout.longValue()), TimeUnit.MILLISECONDS);
                        final long id = withoutAck.getId();
                        try {
                            int size = withoutAck.isRaw() ? withoutAck.getRawEntries().size() : withoutAck.getEntries().size();
                            if (id == -1 || size == 0) {
                                try {
                                    Thread.sleep(100L);
                                } catch (InterruptedException e3) {
                                }
                            } else {
                                this.canalMQProducer.send(mQDestination, withoutAck, new Callback() { // from class: com.alibaba.otter.canal.server.CanalMQStarter.1
                                    public void commit() {
                                        CanalMQStarter.this.canalServer.ack(clientIdentity, id);
                                    }

                                    public void rollback() {
                                        CanalMQStarter.this.canalServer.rollback(clientIdentity, Long.valueOf(id));
                                    }
                                });
                            }
                        } catch (Exception e4) {
                            logger.error(e4.getMessage(), e4);
                        }
                    }
                }
            } catch (Exception e5) {
                logger.error("process error!", e5);
            }
        }
    }
}
