/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.node.etl.select;

import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.node.common.statistics.StatisticsClientService;
import com.alibaba.otter.node.etl.common.jmx.StageAggregation;
import com.alibaba.otter.node.etl.common.pipe.PipeKey;
import com.alibaba.otter.node.etl.common.task.GlobalTask;
import com.alibaba.otter.node.etl.extract.SetlFuture;
import com.alibaba.otter.node.etl.select.exceptions.SelectException;
import com.alibaba.otter.node.etl.select.selector.Message;
import com.alibaba.otter.node.etl.select.selector.OtterSelector;
import com.alibaba.otter.node.etl.select.selector.OtterSelectorFactory;
import com.alibaba.otter.shared.arbitrate.ArbitrateEventService;
import com.alibaba.otter.shared.arbitrate.model.EtlEventData;
import com.alibaba.otter.shared.arbitrate.model.TerminEventData;
import com.alibaba.otter.shared.common.model.config.channel.Channel;
import com.alibaba.otter.shared.common.model.config.enums.StageType;
import com.alibaba.otter.shared.common.model.statistics.delay.DelayCount;
import com.alibaba.otter.shared.common.utils.lock.BooleanMutex;
import com.alibaba.otter.shared.etl.model.DbBatch;
import com.alibaba.otter.shared.etl.model.EventData;
import com.alibaba.otter.shared.etl.model.Identity;
import com.alibaba.otter.shared.etl.model.RowBatch;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.slf4j.Logger;
import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;

public class SelectTask
extends GlobalTask {
    private volatile boolean isStart = false;
    private StatisticsClientService statisticsClientService;
    private OtterSelectorFactory otterSelectorFactory;
    private OtterSelector<Message> otterSelector;
    private ExecutorService executor;
    private BlockingQueue<BatchTermin> batchBuffer = new LinkedBlockingQueue<BatchTermin>(50);
    private boolean needCheck = false;
    private BooleanMutex canStartSelector = new BooleanMutex(Boolean.valueOf(false));
    private AtomicInteger rversion = new AtomicInteger(0);
    private long lastResetTime = new Date().getTime();

    public SelectTask(Long pipelineId) {
        super(pipelineId);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void run() {
        MDC.put((String)"otter", (String)String.valueOf(this.pipelineId));
        block8: while (true) {
            while (this.running) {
                try {
                    if (this.isStart) {
                        boolean working = this.arbitrateEventService.mainStemEvent().check(this.pipelineId);
                        if (!working) {
                            this.stopup(false);
                        }
                        LockSupport.parkNanos(5000000000L);
                        continue block8;
                    }
                    this.startup();
                }
                catch (Throwable e) {
                    if (this.isInterrupt(e)) {
                        this.logger.info("INFO ## select is interrupt", e);
                        this.arbitrateEventService.mainStemEvent().release(this.pipelineId);
                        return;
                    }
                    try {
                        this.logger.warn("WARN ## select is failed.", e);
                        this.sendRollbackTermin((long)this.pipelineId, e);
                        try {
                            Thread.sleep(10000L);
                        }
                        catch (InterruptedException interruptedException) {}
                        continue block8;
                    }
                    catch (Throwable throwable) {
                        throw throwable;
                        return;
                    }
                }
            }
        }
        finally {
            this.arbitrateEventService.mainStemEvent().release(this.pipelineId);
        }
    }

    private void startup() throws InterruptedException {
        try {
            this.arbitrateEventService.mainStemEvent().await(this.pipelineId);
        }
        catch (Throwable e) {
            if (this.isInterrupt(e)) {
                this.logger.info("INFO ## this node is interrupt", e);
            } else {
                this.logger.warn("WARN ## this node is crashed.", e);
            }
            this.arbitrateEventService.mainStemEvent().release(this.pipelineId);
            return;
        }
        this.executor = Executors.newFixedThreadPool(2);
        this.otterSelector = this.otterSelectorFactory.getSelector(this.pipelineId);
        this.otterSelector.start();
        this.canStartSelector.set(Boolean.valueOf(false));
        this.startProcessTermin();
        this.startProcessSelect();
        this.isStart = true;
    }

    private synchronized void stopup(boolean needInterrut) throws InterruptedException {
        if (this.isStart) {
            if (this.executor != null) {
                this.executor.shutdownNow();
            }
            if (this.otterSelector != null && this.otterSelector.isStart()) {
                this.otterSelector.stop();
            }
            if (needInterrut) {
                throw new InterruptedException();
            }
            this.isStart = false;
        }
    }

    private void startProcessSelect() {
        this.executor.submit(new Runnable(){

            @Override
            public void run() {
                MDC.put((String)"otter", (String)String.valueOf(SelectTask.this.pipelineId));
                String currentName = Thread.currentThread().getName();
                Thread.currentThread().setName(SelectTask.this.createTaskName(SelectTask.this.pipelineId, "ProcessSelect"));
                try {
                    SelectTask.this.processSelect();
                }
                finally {
                    Thread.currentThread().setName(currentName);
                    MDC.remove((String)"otter");
                }
            }
        });
    }

    private void processSelect() {
        while (this.running) {
            try {
                this.canStartSelector.get();
                if (this.needCheck) {
                    this.checkContinueWork();
                }
                this.arbitrateEventService.toolEvent().waitForPermit(this.pipelineId);
                Message<Message> gotMessage = this.otterSelector.selector();
                int startVersion = this.rversion.get();
                if (!this.canStartSelector.state()) {
                    this.rollback(gotMessage.getId());
                    continue;
                }
                if (CollectionUtils.isEmpty(gotMessage.getDatas())) {
                    this.batchBuffer.put(new BatchTermin(gotMessage.getId(), false));
                    continue;
                }
                final EtlEventData etlEventData = this.arbitrateEventService.selectEvent().await(this.pipelineId);
                if (this.rversion.get() != startVersion) {
                    this.logger.warn("rollback happend , should skip this data and get new message.");
                    this.canStartSelector.get();
                    Thread.sleep(10000L);
                    this.arbitrateEventService.toolEvent().waitForPermit(this.pipelineId);
                    gotMessage = this.otterSelector.selector();
                }
                final Message<Message> message = gotMessage;
                BatchTermin batchTermin = new BatchTermin(message.getId(), etlEventData.getProcessId());
                this.batchBuffer.put(batchTermin);
                Runnable task = new Runnable(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        boolean profiling = SelectTask.this.isProfiling();
                        Long profilingStartTime = null;
                        if (profiling) {
                            profilingStartTime = System.currentTimeMillis();
                        }
                        MDC.put((String)"otter", (String)String.valueOf(SelectTask.this.pipelineId));
                        String currentName = Thread.currentThread().getName();
                        Thread.currentThread().setName(SelectTask.this.createTaskName(SelectTask.this.pipelineId, "SelectWorker"));
                        try {
                            SelectTask.this.pipeline = SelectTask.this.configClientService.findPipeline(SelectTask.this.pipelineId);
                            List eventData = message.getDatas();
                            long startTime = etlEventData.getStartTime();
                            if (!CollectionUtils.isEmpty(eventData)) {
                                startTime = ((EventData)eventData.get(0)).getExecuteTime();
                            }
                            Channel channel = SelectTask.this.configClientService.findChannelByPipelineId(SelectTask.this.pipelineId);
                            RowBatch rowBatch = new RowBatch();
                            Identity identity = new Identity();
                            identity.setChannelId(channel.getId().longValue());
                            identity.setPipelineId(SelectTask.this.pipelineId.longValue());
                            identity.setProcessId(etlEventData.getProcessId().longValue());
                            rowBatch.setIdentity(identity);
                            for (EventData data : eventData) {
                                rowBatch.merge(data);
                            }
                            long nextNodeId = etlEventData.getNextNid();
                            List<PipeKey> pipeKeys = SelectTask.this.rowDataPipeDelegate.put(new DbBatch(rowBatch), nextNodeId);
                            etlEventData.setDesc(pipeKeys);
                            etlEventData.setNumber(Long.valueOf(eventData.size()));
                            etlEventData.setFirstTime(Long.valueOf(startTime));
                            etlEventData.setBatchId(message.getId());
                            if (profiling) {
                                Long profilingEndTime = System.currentTimeMillis();
                                SelectTask.this.stageAggregationCollector.push(SelectTask.this.pipelineId, StageType.SELECT, new StageAggregation.AggregationItem(profilingStartTime, profilingEndTime));
                            }
                            SelectTask.this.arbitrateEventService.selectEvent().single(etlEventData);
                        }
                        catch (Throwable e) {
                            if (!SelectTask.this.isInterrupt(e)) {
                                SelectTask.this.logger.error(String.format("[%s] selectwork executor is error! data:%s", SelectTask.this.pipelineId, etlEventData), e);
                                SelectTask.this.sendRollbackTermin(SelectTask.this.pipelineId, e);
                            } else {
                                SelectTask.this.logger.info(String.format("[%s] selectwork executor is interrrupt! data:%s", SelectTask.this.pipelineId, etlEventData), e);
                            }
                        }
                        finally {
                            Thread.currentThread().setName(currentName);
                            MDC.remove((String)"otter");
                        }
                    }
                };
                SetlFuture extractFuture = new SetlFuture(StageType.SELECT, etlEventData.getProcessId(), this.pendingFuture, task);
                this.executorService.execute(extractFuture);
            }
            catch (Throwable e) {
                if (!this.isInterrupt(e)) {
                    this.logger.error(String.format("[%s] selectTask is error!", this.pipelineId), e);
                    this.sendRollbackTermin((long)this.pipelineId, e);
                    continue;
                }
                this.logger.info(String.format("[%s] selectTask is interrrupt!", this.pipelineId), e);
                return;
            }
        }
    }

    private void startProcessTermin() {
        this.executor.submit(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             * Unable to fully structure code
             * Enabled force condition propagation
             * Lifted jumps to return sites
             */
            @Override
            public void run() {
                MDC.put((String)"otter", (String)String.valueOf(SelectTask.access$2500(SelectTask.this)));
                currentName = Thread.currentThread().getName();
                Thread.currentThread().setName(SelectTask.access$2700(SelectTask.this, SelectTask.access$2600(SelectTask.this), "ProcessTermin"));
                try {
                    lastStatus = true;
                    while (SelectTask.access$2800(SelectTask.this)) {
                        try {
                            lastStatus = true;
                            SelectTask.access$3000(SelectTask.this).terminEvent().exhaust(SelectTask.access$2900(SelectTask.this));
                            SelectTask.access$3100(SelectTask.this).clear();
                            while (SelectTask.access$3200(SelectTask.this)) {
                                if (SelectTask.access$3100(SelectTask.this).size() == 0) {
                                    if (!SelectTask.access$3300(SelectTask.this).state()) {
                                        SelectTask.access$3400(SelectTask.this).rollback();
                                    }
                                    lastStatus = true;
                                    SelectTask.access$3300(SelectTask.this).set(Boolean.valueOf(true));
                                }
                                batch = (BatchTermin)SelectTask.access$3100(SelectTask.this).take();
                                SelectTask.access$3500(SelectTask.this).info("start process termin : {}", (Object)batch.toString());
                                if (batch.isNeedWait()) {
                                    lastStatus = SelectTask.access$3600(SelectTask.this, lastStatus, batch.getBatchId(), batch.getProcessId());
                                } else if (lastStatus) {
                                    SelectTask.access$3700(SelectTask.this, batch.getBatchId());
                                    SelectTask.access$3900(SelectTask.this, SelectTask.access$3800(SelectTask.this));
                                } else {
                                    SelectTask.access$4000(SelectTask.this, batch.getBatchId());
                                }
                                SelectTask.access$4100(SelectTask.this).info("end process termin : {}  result : {}", (Object)batch.toString(), (Object)lastStatus);
                            }
                            ** GOTO lbl48
                        }
                        catch (CanalException e) {
                            SelectTask.access$4300(SelectTask.this).info(String.format("[%s] ProcessTermin has an error! retry...", new Object[]{SelectTask.access$4200(SelectTask.this)}), (Throwable)e);
                            SelectTask.access$4400(SelectTask.this);
                            ** GOTO lbl48
                        }
                        catch (SelectException e) {
                            SelectTask.access$4600(SelectTask.this).info(String.format("[%s] ProcessTermin has an error! retry...", new Object[]{SelectTask.access$4500(SelectTask.this)}), (Throwable)e);
                            SelectTask.access$4400(SelectTask.this);
                            ** GOTO lbl48
                        }
                        catch (Throwable e) {
                            if (SelectTask.access$4700(SelectTask.this, e)) {
                                SelectTask.access$4900(SelectTask.this).info(String.format("[%s] ProcessTermin is interrupted!", new Object[]{SelectTask.access$4800(SelectTask.this)}), e);
                                Thread.currentThread().setName(currentName);
                                MDC.remove((String)"otter");
                                return;
                            }
                            try {
                                SelectTask.access$5100(SelectTask.this).error(String.format("[%s] ProcessTermin is error!", new Object[]{SelectTask.access$5000(SelectTask.this)}), e);
                                SelectTask.access$4400(SelectTask.this);
                                SelectTask.access$5300(SelectTask.this, SelectTask.access$5200(SelectTask.this), e);
lbl48:
                                // 4 sources

                                try {
                                    Thread.sleep(30000L);
                                }
                                catch (InterruptedException var3_7) {}
                            }
                            catch (Throwable var4_8) {
                                throw var4_8;
                                return;
                            }
                        }
                    }
                }
                finally {
                    Thread.currentThread().setName(currentName);
                    MDC.remove((String)"otter");
                }
            }
        });
    }

    private boolean processTermin(boolean lastStatus, Long batchId, Long processId) throws InterruptedException {
        int retry = 0;
        SelectException exception = null;
        TerminEventData terminData = null;
        while (retry++ < 30) {
            terminData = this.arbitrateEventService.terminEvent().await(this.pipelineId);
            Long terminBatchId = terminData.getBatchId();
            Long terminProcessId = terminData.getProcessId();
            if (terminBatchId == null && processId != -1L && !processId.equals(terminProcessId)) {
                exception = new SelectException("unmatched processId, SelectTask batchId = " + batchId + " processId = " + processId + " and Termin Event: " + terminData.toString());
                Thread.sleep(1000L);
                continue;
            }
            if (terminBatchId != null && batchId != -1L && !batchId.equals(terminBatchId)) {
                exception = new SelectException("unmatched terminId, SelectTask batchId = " + batchId + " processId = " + processId + " and Termin Event: " + terminData.toString());
                Thread.sleep(1000L);
                continue;
            }
            exception = null;
            break;
        }
        if (exception != null) {
            throw exception;
        }
        if (this.needCheck) {
            this.checkContinueWork();
        }
        boolean status = terminData.getType().isNormal();
        if (!lastStatus && status) {
            throw new SelectException(String.format("last status is rollback , but now [batchId:%d , processId:%d] is ack", batchId, terminData.getProcessId()));
        }
        if (terminData.getType().isNormal()) {
            this.ack(batchId);
            this.sendDelayStat(this.pipelineId, terminData.getEndTime(), terminData.getFirstTime());
        } else {
            this.rollback(batchId);
        }
        this.arbitrateEventService.terminEvent().ack(terminData);
        return status;
    }

    private void rollback(Long batchId) {
        this.notifyRollback();
        this.otterSelector.rollback();
    }

    private void ack(Long batchId) {
        this.canStartSelector.set(Boolean.valueOf(true));
        this.otterSelector.ack(batchId);
    }

    private void notifyRollback() {
        this.canStartSelector.set(Boolean.valueOf(false));
        this.rversion.incrementAndGet();
    }

    private void checkContinueWork() throws InterruptedException {
        boolean working = this.arbitrateEventService.mainStemEvent().check(this.pipelineId);
        if (!working) {
            this.logger.warn("mainstem is not run in this node");
            this.stopup(true);
        }
    }

    @Override
    public void shutdown() {
        super.shutdown();
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
        if (this.otterSelector != null && this.otterSelector.isStart()) {
            this.otterSelector.stop();
        }
    }

    private void sendDelayStat(long pipelineId, Long endTime, Long startTime) {
        DelayCount delayCount = new DelayCount();
        delayCount.setPipelineId(Long.valueOf(pipelineId));
        delayCount.setNumber(Long.valueOf(0L));
        if (startTime != null && endTime != null) {
            delayCount.setTime(Long.valueOf(endTime - startTime));
        }
        this.statisticsClientService.sendResetDelayCount(delayCount);
    }

    private void sendDelayReset(long pipelineId) {
        long currentTime = System.currentTimeMillis();
        if (currentTime - this.lastResetTime > 60000L) {
            this.lastResetTime = currentTime;
            DelayCount delayCount = new DelayCount();
            delayCount.setPipelineId(Long.valueOf(pipelineId));
            delayCount.setNumber(Long.valueOf(0L));
            long delayTime = currentTime - this.otterSelector.lastEntryTime();
            delayCount.setTime(Long.valueOf(delayTime));
            this.statisticsClientService.sendResetDelayCount(delayCount);
        }
    }

    public void setOtterSelectorFactory(OtterSelectorFactory otterSelectorFactory) {
        this.otterSelectorFactory = otterSelectorFactory;
    }

    public void setStatisticsClientService(StatisticsClientService statisticsClientService) {
        this.statisticsClientService = statisticsClientService;
    }

    static /* synthetic */ Long access$2500(SelectTask x0) {
        return x0.pipelineId;
    }

    static /* synthetic */ Long access$2600(SelectTask x0) {
        return x0.pipelineId;
    }

    static /* synthetic */ String access$2700(SelectTask x0, long x1, String x2) {
        return x0.createTaskName(x1, x2);
    }

    static /* synthetic */ boolean access$2800(SelectTask x0) {
        return x0.running;
    }

    static /* synthetic */ Long access$2900(SelectTask x0) {
        return x0.pipelineId;
    }

    static /* synthetic */ ArbitrateEventService access$3000(SelectTask x0) {
        return x0.arbitrateEventService;
    }

    static /* synthetic */ BlockingQueue access$3100(SelectTask x0) {
        return x0.batchBuffer;
    }

    static /* synthetic */ boolean access$3200(SelectTask x0) {
        return x0.running;
    }

    static /* synthetic */ BooleanMutex access$3300(SelectTask x0) {
        return x0.canStartSelector;
    }

    static /* synthetic */ OtterSelector access$3400(SelectTask x0) {
        return x0.otterSelector;
    }

    static /* synthetic */ Logger access$3500(SelectTask x0) {
        return x0.logger;
    }

    static /* synthetic */ boolean access$3600(SelectTask x0, boolean x1, Long x2, Long x3) throws InterruptedException {
        return x0.processTermin(x1, x2, x3);
    }

    static /* synthetic */ void access$3700(SelectTask x0, Long x1) {
        x0.ack(x1);
    }

    static /* synthetic */ Long access$3800(SelectTask x0) {
        return x0.pipelineId;
    }

    static /* synthetic */ void access$3900(SelectTask x0, long x1) {
        x0.sendDelayReset(x1);
    }

    static /* synthetic */ void access$4000(SelectTask x0, Long x1) {
        x0.rollback(x1);
    }

    static /* synthetic */ Logger access$4100(SelectTask x0) {
        return x0.logger;
    }

    static /* synthetic */ Long access$4200(SelectTask x0) {
        return x0.pipelineId;
    }

    static /* synthetic */ Logger access$4300(SelectTask x0) {
        return x0.logger;
    }

    static /* synthetic */ void access$4400(SelectTask x0) {
        x0.notifyRollback();
    }

    static /* synthetic */ Long access$4500(SelectTask x0) {
        return x0.pipelineId;
    }

    static /* synthetic */ Logger access$4600(SelectTask x0) {
        return x0.logger;
    }

    static /* synthetic */ boolean access$4700(SelectTask x0, Throwable x1) {
        return x0.isInterrupt(x1);
    }

    static /* synthetic */ Long access$4800(SelectTask x0) {
        return x0.pipelineId;
    }

    static /* synthetic */ Logger access$4900(SelectTask x0) {
        return x0.logger;
    }

    static /* synthetic */ Long access$5000(SelectTask x0) {
        return x0.pipelineId;
    }

    static /* synthetic */ Logger access$5100(SelectTask x0) {
        return x0.logger;
    }

    static /* synthetic */ Long access$5200(SelectTask x0) {
        return x0.pipelineId;
    }

    static /* synthetic */ void access$5300(SelectTask x0, long x1, Throwable x2) {
        x0.sendRollbackTermin(x1, x2);
    }

    public static class BatchTermin {
        private Long batchId = -1L;
        private Long processId = -1L;
        private boolean needWait = true;

        public BatchTermin(Long batchId, Long processId) {
            this(batchId, processId, true);
        }

        public BatchTermin(Long batchId, boolean needWait) {
            this(batchId, -1L, needWait);
        }

        public BatchTermin(Long batchId, Long processId, boolean needWait) {
            this.batchId = batchId;
            this.processId = processId;
            this.needWait = needWait;
        }

        public Long getBatchId() {
            return this.batchId;
        }

        public void setBatchId(Long batchId) {
            this.batchId = batchId;
        }

        public Long getProcessId() {
            return this.processId;
        }

        public void setProcessId(Long processId) {
            this.processId = processId;
        }

        public boolean isNeedWait() {
            return this.needWait;
        }

        public void setNeedWait(boolean needWait) {
            this.needWait = needWait;
        }

        public String toString() {
            return "BatchTermin [batchId=" + this.batchId + ", needWait=" + this.needWait + ", processId=" + this.processId + "]";
        }
    }
}

