/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.node.etl.load.loader.db.interceptor.operation;

import com.alibaba.otter.node.common.config.ConfigClientService;
import com.alibaba.otter.node.etl.common.db.dialect.DbDialect;
import com.alibaba.otter.node.etl.load.loader.db.context.DbLoadContext;
import com.alibaba.otter.node.etl.load.loader.interceptor.AbstractLoadInterceptor;
import com.alibaba.otter.shared.common.model.config.channel.Channel;
import com.alibaba.otter.shared.etl.model.EventData;
import com.alibaba.otter.shared.etl.model.Identity;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;

public abstract class AbstractOperationInterceptor
extends AbstractLoadInterceptor<DbLoadContext, EventData> {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    protected static final int GLOBAL_THREAD_COUNT = 1000;
    protected static final int INNER_THREAD_COUNT = 300;
    protected static final String checkDataSql = "SELECT COUNT(*) FROM {0} WHERE id BETWEEN 0 AND {1}";
    protected static final String deleteDataSql = "DELETE FROM {0}";
    protected String updateSql;
    protected String updateInfoSql;
    protected String clearSql = "UPDATE {0} SET {1} = 0 WHERE id = ? and {1} = ?";
    protected String clearInfoSql = "UPDATE {0} SET {1} = 0 , {2} = null WHERE id = ? and {1} = ? and {2} = ?";
    protected int innerIdCount = 300;
    protected int globalIdCount = 1000;
    protected ConfigClientService configClientService;
    protected Set<JdbcTemplate> tableCheckStatus = Collections.synchronizedSet(new HashSet());
    protected AtomicInteger THREAD_COUNTER = new AtomicInteger(0);
    protected ThreadLocal<Integer> threadLocal = new ThreadLocal();

    protected AbstractOperationInterceptor(String updateSql, String updateInfoSql) {
        this.updateSql = updateSql;
        this.updateInfoSql = updateInfoSql;
    }

    private void init(final JdbcTemplate jdbcTemplate, final String markTableName, final String markTableColumn) {
        int count = jdbcTemplate.queryForInt(MessageFormat.format(checkDataSql, markTableName, 999));
        if (count != 1000) {
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Interceptor: init " + markTableName + "'s data.");
            }
            TransactionTemplate transactionTemplate = new TransactionTemplate();
            transactionTemplate.setTransactionManager((PlatformTransactionManager)new DataSourceTransactionManager(jdbcTemplate.getDataSource()));
            transactionTemplate.setPropagationBehavior(4);
            transactionTemplate.execute(new TransactionCallback(){

                public Object doInTransaction(TransactionStatus status) {
                    jdbcTemplate.execute(MessageFormat.format(AbstractOperationInterceptor.deleteDataSql, markTableName));
                    String batchSql = MessageFormat.format(AbstractOperationInterceptor.this.updateSql, markTableName, markTableColumn);
                    jdbcTemplate.batchUpdate(batchSql, new BatchPreparedStatementSetter(){

                        public void setValues(PreparedStatement ps, int idx) throws SQLException {
                            ps.setInt(1, idx);
                            ps.setInt(2, 0);
                        }

                        public int getBatchSize() {
                            return 1000;
                        }
                    });
                    return null;
                }
            });
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Interceptor: Init EROSA Client Data: " + this.updateSql);
            }
        }
    }

    @Override
    public void transactionBegin(DbLoadContext context, List<EventData> currentDatas, DbDialect dialect) {
        boolean needInfo = StringUtils.isNotEmpty((String)context.getPipeline().getParameters().getChannelInfo());
        if (context.getChannel().getPipelines().size() > 1 || needInfo) {
            String hint = currentDatas.get(0).getHint();
            String sql = needInfo ? this.updateInfoSql : this.updateSql;
            this.threadLocal.remove();
            int threadId = this.currentId();
            this.updateMark(context, dialect, threadId, sql, needInfo, hint);
            this.threadLocal.set(threadId);
        }
    }

    @Override
    public void transactionEnd(DbLoadContext context, List<EventData> currentDatas, DbDialect dialect) {
        boolean needInfo = StringUtils.isNotEmpty((String)context.getPipeline().getParameters().getChannelInfo());
        if (context.getChannel().getPipelines().size() > 1 || needInfo) {
            String hint = currentDatas.get(0).getHint();
            String sql = needInfo ? this.clearInfoSql : this.clearSql;
            Integer threadId = this.threadLocal.get();
            this.updateMark(context, dialect, threadId, sql, needInfo, hint);
            this.threadLocal.remove();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateMark(DbLoadContext context, DbDialect dialect, int threadId, String sql, boolean needInfo, String hint) {
        Identity identity = context.getIdentity();
        Channel channel = context.getChannel();
        String markTableName = context.getPipeline().getParameters().getSystemSchema() + "." + context.getPipeline().getParameters().getSystemMarkTable();
        String markTableColumn = context.getPipeline().getParameters().getSystemMarkTableColumn();
        JdbcTemplate jdbcTemplate = dialect.getJdbcTemplate();
        synchronized (jdbcTemplate) {
            if (!this.tableCheckStatus.contains(dialect.getJdbcTemplate())) {
                this.init(dialect.getJdbcTemplate(), markTableName, markTableColumn);
                this.tableCheckStatus.add(dialect.getJdbcTemplate());
            }
        }
        int affectedCount = 0;
        if (needInfo) {
            String infoColumn = context.getPipeline().getParameters().getSystemMarkTableInfo();
            String info = context.getPipeline().getParameters().getChannelInfo();
            String esql = MessageFormat.format(sql, markTableName, markTableColumn, infoColumn);
            if (hint != null) {
                esql = hint + esql;
            }
            affectedCount = dialect.getJdbcTemplate().update(esql, new Object[]{threadId, channel.getId(), info});
        } else {
            String esql = MessageFormat.format(sql, markTableName, markTableColumn);
            if (hint != null) {
                esql = hint + esql;
            }
            affectedCount = dialect.getJdbcTemplate().update(esql, new Object[]{threadId, channel.getId()});
        }
        if (affectedCount <= 0) {
            this.logger.warn("## update {} failed by [{}]", (Object)markTableName, (Object)threadId);
        } else if (this.logger.isInfoEnabled()) {
            this.logger.debug("Interceptor For [{}]", (Object)identity);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int currentId() {
        AbstractOperationInterceptor abstractOperationInterceptor = this;
        synchronized (abstractOperationInterceptor) {
            if (this.THREAD_COUNTER.get() == 300) {
                this.THREAD_COUNTER.set(0);
            }
            return this.THREAD_COUNTER.incrementAndGet();
        }
    }

    public void setInnerIdCount(int innerIdCount) {
        this.innerIdCount = innerIdCount;
    }

    public void setGlobalIdCount(int globalIdCount) {
        this.globalIdCount = globalIdCount;
    }

    public void setConfigClientService(ConfigClientService configClientService) {
        this.configClientService = configClientService;
    }
}

