/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ons.api.exactlyonce.manager.impl;

import com.aliyun.openservices.ons.api.exactlyonce.aop.model.MQTxContext;
import com.aliyun.openservices.ons.api.exactlyonce.aop.model.MQTxRecord;
import com.aliyun.openservices.ons.api.exactlyonce.datasource.DataSourceConfig;
import com.aliyun.openservices.ons.api.exactlyonce.datasource.core.MQTxConnection;
import com.aliyun.openservices.ons.api.exactlyonce.manager.TransactionManager;
import com.aliyun.openservices.ons.api.exactlyonce.manager.datebase.LoadRecordDo;
import com.aliyun.openservices.ons.api.exactlyonce.manager.util.DBAccessUtil;
import com.aliyun.openservices.ons.api.exactlyonce.manager.util.LogUtil;
import com.aliyun.openservices.ons.api.exactlyonce.manager.util.OffsetUtil;
import com.aliyun.openservices.ons.api.exactlyonce.manager.util.TxContextUtil;
import com.aliyun.openservices.ons.api.impl.util.ClientLoggerUtil;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.UtilAll;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.utils.ThreadUtils;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;
import com.aliyun.openservices.shade.io.netty.util.internal.ConcurrentSet;
import com.aliyun.openservices.shade.org.apache.commons.lang3.RandomUtils;
import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
import com.aliyun.openservices.shade.org.apache.commons.lang3.time.DateUtils;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TxRecordManagerImpl {
    private static final InternalLogger LOGGER = ClientLoggerUtil.getClientLogger();
    private ScheduledThreadPoolExecutor txRecordCheckExecutor = new ScheduledThreadPoolExecutor(2, ThreadUtils.newGenericThreadFactory("txRecordCheckExecutor", false));
    private int refreshInterval = 10000;
    private static final String LOG_SPLITOR = ",";
    private static final int TIME_EXPIRED = 14400;
    private static final int SECOND_OF_HOUR = 3600;
    private static final int SECOND_OF_DAY = 86400;
    private static final int MAX_UNACTIVE_PROCESS_TIME = 1200000;
    private static final int MAX_SCANACKED_PROCESS_TIME = 9000;
    private static final int MAX_DELETEACKED_PROCESS_TIME = 9000;

    public TxRecordManagerImpl(Properties properties) {
        String refreshIntervalStr = properties.getProperty("exactlyOnceRmRefreshInterval");
        if (!UtilAll.isBlank(refreshIntervalStr)) {
            try {
                this.refreshInterval = Integer.parseInt(refreshIntervalStr);
            }
            catch (NumberFormatException numberFormatException) {
                // empty catch block
            }
        }
    }

    public void start() {
        this.txRecordCheckExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    long before = System.currentTimeMillis();
                    TxRecordManagerImpl.this.refreshExpiredTxRecord();
                    LogUtil.info(LOGGER, "scan expired record use:{} ms", (Object)(System.currentTimeMillis() - before));
                }
                catch (Throwable e) {
                    LogUtil.error(LOGGER, "scan active record fail, err:{}", (Object)e.getMessage());
                }
            }
        }, 0L, this.refreshInterval, TimeUnit.MILLISECONDS);
        this.txRecordCheckExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    long before = System.currentTimeMillis();
                    TxRecordManagerImpl.this.deleteUnActiveConsumeRecord();
                    LogUtil.info(LOGGER, "delete unactive record use:{} ms", (Object)(System.currentTimeMillis() - before));
                }
                catch (Throwable e) {
                    LogUtil.error(LOGGER, "delete unactive record fail, err:{}", (Object)e.getMessage());
                }
            }
        }, this.getInitialDelay(), 86400L, TimeUnit.SECONDS);
        LogUtil.info(LOGGER, "start TxRecordManager...");
    }

    public void stop() {
        ThreadUtils.shutdownGracefully(this.txRecordCheckExecutor, 10000L, TimeUnit.MILLISECONDS);
    }

    private long getInitialDelay() {
        long timePast = 14400L;
        long pastSecond = DateUtils.getFragmentInSeconds(Calendar.getInstance(), 5);
        long initDelay = pastSecond < timePast ? timePast - pastSecond + (long)RandomUtils.nextInt(0, 3600) : 86400L - pastSecond + timePast + (long)RandomUtils.nextInt(0, 3600);
        return initDelay;
    }

    private void refreshExpiredTxRecord() {
        long begin = System.currentTimeMillis();
        ConcurrentMap<DataSourceConfig, ConcurrentSet<String>> consumeSessionMap = TransactionManager.getConsumerSessionMap();
        LogUtil.info(LOGGER, "consume session map:{}", (Object)StringUtils.join(consumeSessionMap, LOG_SPLITOR));
        ArrayList dataSourceConfigs = new ArrayList(consumeSessionMap.keySet());
        Collections.shuffle(dataSourceConfigs);
        for (DataSourceConfig config : dataSourceConfigs) {
            ConcurrentSet cidSet = (ConcurrentSet)consumeSessionMap.get(config);
            if (cidSet == null || cidSet.isEmpty()) continue;
            ArrayList cidList = new ArrayList(cidSet);
            Collections.shuffle(cidList);
            for (String cid : cidList) {
                if (System.currentTimeMillis() - begin > 9000L) {
                    return;
                }
                try {
                    List<MessageQueue> currentQueue = OffsetUtil.getCurrentConsumeQueue(cid);
                    if (currentQueue == null || currentQueue.isEmpty()) continue;
                    this.deleteExpiredRecord(config, currentQueue, cid);
                }
                catch (Exception e) {
                    LogUtil.error(LOGGER, "refresh expired record fail, consumerGroup:{}, err:{}", (Object)cid, (Object)e.getMessage());
                }
            }
        }
    }

    private void deleteUnActiveConsumeRecord() {
        long begin = System.currentTimeMillis();
        long expiredTime = begin - 259200000L;
        ArrayList dataSourceConfigs = new ArrayList(TransactionManager.getConsumerSessionMap().keySet());
        Collections.shuffle(dataSourceConfigs);
        block2: for (DataSourceConfig config : dataSourceConfigs) {
            while (System.currentTimeMillis() - begin <= 1200000L) {
                List<Long> recordList = DBAccessUtil.queryExpiredRecord(config, expiredTime, 200);
                if (recordList == null || recordList.isEmpty()) continue block2;
                try {
                    DBAccessUtil.deleteRecordById(config, recordList);
                }
                catch (Exception e) {
                    LogUtil.error(LOGGER, "delete expire record fail, dataSource:{}, count:{}, err:{}", config, recordList.size(), e.getMessage());
                    continue block2;
                }
            }
            return;
        }
    }

    private void deleteExpiredRecord(DataSourceConfig config, List<MessageQueue> mqList, String consumerGroup) {
        long begin = System.currentTimeMillis();
        Collections.shuffle(mqList);
        LogUtil.info(LOGGER, "start delete expired, config:{}, consumerGroup:{}, mqList:{}", config, consumerGroup, StringUtils.join(mqList));
        block2: for (MessageQueue mq : mqList) {
            List<Long> txRecordList;
            Long offset = OffsetUtil.getMQSafeOffset(mq, consumerGroup);
            if (offset == null) continue;
            long checkTime = System.currentTimeMillis() - 180000L;
            LoadRecordDo loadRecordDo = new LoadRecordDo(mq, consumerGroup, offset, checkTime, 200);
            do {
                if (System.currentTimeMillis() - begin > 9000L) {
                    return;
                }
                txRecordList = DBAccessUtil.queryAckedRecord(config, loadRecordDo);
                if (txRecordList == null || txRecordList.isEmpty()) continue block2;
                try {
                    DBAccessUtil.deleteRecordById(config, txRecordList);
                }
                catch (Exception e) {
                    LogUtil.error(LOGGER, "delete record directly fail, config:{}, id:{}, err:{}", config, StringUtils.join(txRecordList, LOG_SPLITOR), e.getMessage());
                }
            } while (txRecordList.size() >= 50);
        }
    }

    public void flushTxRecord(MQTxConnection connection, MQTxContext context) throws Exception {
        if (connection == null) {
            throw new Exception("Connection is null");
        }
        MQTxRecord record = TxContextUtil.buildTxRecord(context);
        DataSourceConfig config = connection.getTxDateSource().getDataSourceConfig();
        record.setDataSourceConfig(config);
        DBAccessUtil.insertTxRecord(connection.getTargetConnection(), config, record);
    }
}

