package com.aliyun.openservices.ons.api.exactlyonce.manager.impl;

import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.exactlyonce.aop.model.MQTxContext;
import com.aliyun.openservices.ons.api.exactlyonce.aop.model.MQTxRecord;
import com.aliyun.openservices.ons.api.exactlyonce.datasource.DataSourceConfig;
import com.aliyun.openservices.ons.api.exactlyonce.datasource.core.MQTxConnection;
import com.aliyun.openservices.ons.api.exactlyonce.manager.TransactionManager;
import com.aliyun.openservices.ons.api.exactlyonce.manager.datebase.LoadRecordDo;
import com.aliyun.openservices.ons.api.exactlyonce.manager.util.DBAccessUtil;
import com.aliyun.openservices.ons.api.exactlyonce.manager.util.LogUtil;
import com.aliyun.openservices.ons.api.exactlyonce.manager.util.OffsetUtil;
import com.aliyun.openservices.ons.api.exactlyonce.manager.util.TxContextUtil;
import com.aliyun.openservices.ons.api.impl.util.ClientLoggerUtil;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.UtilAll;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.utils.ThreadUtils;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;
import com.aliyun.openservices.shade.io.netty.util.internal.ConcurrentSet;
import com.aliyun.openservices.shade.org.apache.commons.lang3.RandomUtils;
import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
import com.aliyun.openservices.shade.org.apache.commons.lang3.time.DateUtils;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/aliyun/openservices/ons/api/exactlyonce/manager/impl/TxRecordManagerImpl.class */
public class TxRecordManagerImpl {
    private static final InternalLogger LOGGER = ClientLoggerUtil.getClientLogger();
    private ScheduledThreadPoolExecutor txRecordCheckExecutor = new ScheduledThreadPoolExecutor(2, ThreadUtils.newGenericThreadFactory("txRecordCheckExecutor", false));
    private int refreshInterval;
    private static final String LOG_SPLITOR = ",";
    private static final int TIME_EXPIRED = 14400;
    private static final int SECOND_OF_HOUR = 3600;
    private static final int SECOND_OF_DAY = 86400;
    private static final int MAX_UNACTIVE_PROCESS_TIME = 1200000;
    private static final int MAX_SCANACKED_PROCESS_TIME = 9000;
    private static final int MAX_DELETEACKED_PROCESS_TIME = 9000;

    public TxRecordManagerImpl(Properties properties) {
        this.refreshInterval = 10000;
        String property = properties.getProperty(PropertyKeyConst.EXACTLYONCE_RM_REFRESHINTERVAL);
        if (UtilAll.isBlank(property)) {
            return;
        }
        try {
            this.refreshInterval = Integer.parseInt(property);
        } catch (NumberFormatException e) {
        }
    }

    public void start() {
        this.txRecordCheckExecutor.scheduleAtFixedRate(new Runnable() { // from class: com.aliyun.openservices.ons.api.exactlyonce.manager.impl.TxRecordManagerImpl.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    TxRecordManagerImpl.this.refreshExpiredTxRecord();
                    LogUtil.info(TxRecordManagerImpl.LOGGER, "scan expired record use:{} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                } catch (Throwable th) {
                    LogUtil.error(TxRecordManagerImpl.LOGGER, "scan active record fail, err:{}", th.getMessage());
                }
            }
        }, 0L, this.refreshInterval, TimeUnit.MILLISECONDS);
        this.txRecordCheckExecutor.scheduleAtFixedRate(new Runnable() { // from class: com.aliyun.openservices.ons.api.exactlyonce.manager.impl.TxRecordManagerImpl.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    TxRecordManagerImpl.this.deleteUnActiveConsumeRecord();
                    LogUtil.info(TxRecordManagerImpl.LOGGER, "delete unactive record use:{} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                } catch (Throwable th) {
                    LogUtil.error(TxRecordManagerImpl.LOGGER, "delete unactive record fail, err:{}", th.getMessage());
                }
            }
        }, getInitialDelay(), 86400L, TimeUnit.SECONDS);
        LogUtil.info(LOGGER, "start TxRecordManager...");
    }

    public void stop() {
        ThreadUtils.shutdownGracefully(this.txRecordCheckExecutor, 10000L, TimeUnit.MILLISECONDS);
    }

    private long getInitialDelay() {
        long fragmentInSeconds = DateUtils.getFragmentInSeconds(Calendar.getInstance(), 5);
        return fragmentInSeconds < 14400 ? (14400 - fragmentInSeconds) + RandomUtils.nextInt(0, SECOND_OF_HOUR) : (86400 - fragmentInSeconds) + 14400 + RandomUtils.nextInt(0, SECOND_OF_HOUR);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void refreshExpiredTxRecord() {
        long currentTimeMillis = System.currentTimeMillis();
        ConcurrentMap<DataSourceConfig, ConcurrentSet<String>> consumerSessionMap = TransactionManager.getConsumerSessionMap();
        LogUtil.info(LOGGER, "consume session map:{}", StringUtils.join(consumerSessionMap, LOG_SPLITOR));
        ArrayList<DataSourceConfig> arrayList = new ArrayList(consumerSessionMap.keySet());
        Collections.shuffle(arrayList);
        for (DataSourceConfig dataSourceConfig : arrayList) {
            ConcurrentSet<String> concurrentSet = consumerSessionMap.get(dataSourceConfig);
            if (concurrentSet != null && !concurrentSet.isEmpty()) {
                ArrayList<String> arrayList2 = new ArrayList(concurrentSet);
                Collections.shuffle(arrayList2);
                for (String str : arrayList2) {
                    if (System.currentTimeMillis() - currentTimeMillis > 9000) {
                        return;
                    }
                    try {
                        List<MessageQueue> currentConsumeQueue = OffsetUtil.getCurrentConsumeQueue(str);
                        if (currentConsumeQueue != null && !currentConsumeQueue.isEmpty()) {
                            deleteExpiredRecord(dataSourceConfig, currentConsumeQueue, str);
                        }
                    } catch (Exception e) {
                        LogUtil.error(LOGGER, "refresh expired record fail, consumerGroup:{}, err:{}", str, e.getMessage());
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void deleteUnActiveConsumeRecord() {
        long currentTimeMillis = System.currentTimeMillis();
        long j = currentTimeMillis - 259200000;
        ArrayList<DataSourceConfig> arrayList = new ArrayList(TransactionManager.getConsumerSessionMap().keySet());
        Collections.shuffle(arrayList);
        for (DataSourceConfig dataSourceConfig : arrayList) {
            while (System.currentTimeMillis() - currentTimeMillis <= 1200000) {
                List<Long> queryExpiredRecord = DBAccessUtil.queryExpiredRecord(dataSourceConfig, Long.valueOf(j), 200);
                if (queryExpiredRecord != null && !queryExpiredRecord.isEmpty()) {
                    try {
                        DBAccessUtil.deleteRecordById(dataSourceConfig, queryExpiredRecord);
                    } catch (Exception e) {
                        LogUtil.error(LOGGER, "delete expire record fail, dataSource:{}, count:{}, err:{}", dataSourceConfig, Integer.valueOf(queryExpiredRecord.size()), e.getMessage());
                    }
                }
            }
            return;
        }
    }

    private void deleteExpiredRecord(DataSourceConfig dataSourceConfig, List<MessageQueue> list, String str) {
        long currentTimeMillis = System.currentTimeMillis();
        Collections.shuffle(list);
        LogUtil.info(LOGGER, "start delete expired, config:{}, consumerGroup:{}, mqList:{}", dataSourceConfig, str, StringUtils.join(list));
        for (MessageQueue messageQueue : list) {
            Long mQSafeOffset = OffsetUtil.getMQSafeOffset(messageQueue, str);
            if (mQSafeOffset != null) {
                LoadRecordDo loadRecordDo = new LoadRecordDo(messageQueue, str, mQSafeOffset, Long.valueOf(System.currentTimeMillis() - 180000), 200);
                while (System.currentTimeMillis() - currentTimeMillis <= 9000) {
                    List<Long> queryAckedRecord = DBAccessUtil.queryAckedRecord(dataSourceConfig, loadRecordDo);
                    if (queryAckedRecord != null && !queryAckedRecord.isEmpty()) {
                        try {
                            DBAccessUtil.deleteRecordById(dataSourceConfig, queryAckedRecord);
                        } catch (Exception e) {
                            LogUtil.error(LOGGER, "delete record directly fail, config:{}, id:{}, err:{}", dataSourceConfig, StringUtils.join(queryAckedRecord, LOG_SPLITOR), e.getMessage());
                        }
                        if (queryAckedRecord.size() < 50) {
                            break;
                        }
                    }
                }
                return;
            }
        }
    }

    public void flushTxRecord(MQTxConnection mQTxConnection, MQTxContext mQTxContext) throws Exception {
        if (mQTxConnection == null) {
            throw new Exception("Connection is null");
        }
        MQTxRecord buildTxRecord = TxContextUtil.buildTxRecord(mQTxContext);
        DataSourceConfig dataSourceConfig = mQTxConnection.getTxDateSource().getDataSourceConfig();
        buildTxRecord.setDataSourceConfig(dataSourceConfig);
        DBAccessUtil.insertTxRecord(mQTxConnection.getTargetConnection(), dataSourceConfig, buildTxRecord);
    }
}
