/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.node.etl.select.selector.canal;

import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.canal.common.alarm.CanalAlarmHandler;
import com.alibaba.otter.canal.extend.communication.CanalConfigClient;
import com.alibaba.otter.canal.extend.ha.MediaHAController;
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.parse.CanalEventParser;
import com.alibaba.otter.canal.parse.CanalHASwitchable;
import com.alibaba.otter.canal.parse.ha.CanalHAController;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.alibaba.otter.canal.sink.AbstractCanalEventSink;
import com.alibaba.otter.canal.sink.CanalEventDownStreamHandler;
import com.alibaba.otter.canal.sink.CanalEventSink;
import com.alibaba.otter.node.common.config.ConfigClientService;
import com.alibaba.otter.node.etl.OtterContextLocator;
import com.alibaba.otter.node.etl.select.exceptions.SelectException;
import com.alibaba.otter.node.etl.select.selector.Message;
import com.alibaba.otter.node.etl.select.selector.MessageDumper;
import com.alibaba.otter.node.etl.select.selector.MessageParser;
import com.alibaba.otter.node.etl.select.selector.OtterSelector;
import com.alibaba.otter.node.etl.select.selector.canal.CanalFilterSupport;
import com.alibaba.otter.node.etl.select.selector.canal.OtterAlarmHandler;
import com.alibaba.otter.node.etl.select.selector.canal.OtterDownStreamHandler;
import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline;
import com.alibaba.otter.shared.etl.model.EventData;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;

public class CanalEmbedSelector
implements OtterSelector {
    private static final Logger logger = LoggerFactory.getLogger(CanalEmbedSelector.class);
    private static final String SEP = SystemUtils.LINE_SEPARATOR;
    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final int maxEmptyTimes = 10;
    private int logSplitSize = 50;
    private boolean dump = true;
    private boolean dumpDetail = true;
    private Long pipelineId;
    private CanalServerWithEmbedded canalServer;
    private ClientIdentity clientIdentity;
    private MessageParser messageParser;
    private ConfigClientService configClientService;
    private OtterDownStreamHandler handler;
    private String destination;
    private String filter;
    private int batchSize = 10000;
    private long batchTimeout = -1L;
    private boolean ddlSync = true;
    private boolean filterTableError = false;
    private CanalConfigClient canalConfigClient;
    private volatile boolean running = false;
    private volatile long lastEntryTime = 0L;

    public CanalEmbedSelector(Long pipelineId) {
        this.pipelineId = pipelineId;
        this.canalServer = new CanalServerWithEmbedded();
    }

    @Override
    public boolean isStart() {
        return this.running;
    }

    @Override
    public void start() {
        if (this.running) {
            return;
        }
        Pipeline pipeline = this.configClientService.findPipeline(this.pipelineId);
        this.filter = CanalFilterSupport.makeFilterExpression(pipeline);
        this.destination = pipeline.getParameters().getDestinationName();
        this.batchSize = pipeline.getParameters().getMainstemBatchsize();
        this.batchTimeout = pipeline.getParameters().getBatchTimeout();
        this.ddlSync = pipeline.getParameters().getDdlSync();
        final boolean syncFull = pipeline.getParameters().getSyncMode().isRow() || pipeline.getParameters().isEnableRemedy() != false;
        this.filterTableError = pipeline.getParameters().getSkipSelectException();
        if (pipeline.getParameters().getDumpSelector() != null) {
            this.dump = pipeline.getParameters().getDumpSelector();
        }
        if (pipeline.getParameters().getDumpSelectorDetail() != null) {
            this.dumpDetail = pipeline.getParameters().getDumpSelectorDetail();
        }
        this.canalServer.setCanalInstanceGenerator(new CanalInstanceGenerator(){

            public CanalInstance generate(String destination) {
                Canal canal = CanalEmbedSelector.this.canalConfigClient.findCanal(destination);
                OtterAlarmHandler otterAlarmHandler = new OtterAlarmHandler();
                otterAlarmHandler.setPipelineId(CanalEmbedSelector.this.pipelineId);
                OtterContextLocator.autowire((Object)otterAlarmHandler);
                long slaveId = 10000L;
                if (canal.getCanalParameter().getSlaveId() != null) {
                    slaveId = canal.getCanalParameter().getSlaveId();
                }
                canal.getCanalParameter().setSlaveId(Long.valueOf(slaveId + CanalEmbedSelector.this.pipelineId));
                canal.getCanalParameter().setDdlIsolation(Boolean.valueOf(CanalEmbedSelector.this.ddlSync));
                canal.getCanalParameter().setFilterTableError(Boolean.valueOf(CanalEmbedSelector.this.filterTableError));
                canal.getCanalParameter().setMemoryStorageRawEntry(Boolean.valueOf(false));
                CanalInstanceWithManager instance = new CanalInstanceWithManager(canal, CanalEmbedSelector.this.filter){

                    protected CanalHAController initHaController() {
                        CanalParameter.HAMode haMode = this.parameters.getHaMode();
                        if (haMode.isMedia()) {
                            return new MediaHAController(this.parameters.getMediaGroup(), this.parameters.getDbUsername(), this.parameters.getDbPassword(), this.parameters.getDefaultDatabaseName());
                        }
                        return super.initHaController();
                    }

                    protected void startEventParserInternal(CanalEventParser parser, boolean isGroup) {
                        super.startEventParserInternal(parser, isGroup);
                        if (this.eventParser instanceof MysqlEventParser) {
                            ((MysqlEventParser)this.eventParser).setSupportBinlogFormats("ROW");
                            if (syncFull) {
                                ((MysqlEventParser)this.eventParser).setSupportBinlogImages("FULL");
                            } else {
                                ((MysqlEventParser)this.eventParser).setSupportBinlogImages("FULL,MINIMAL");
                            }
                            MysqlEventParser mysqlEventParser = (MysqlEventParser)this.eventParser;
                            mysqlEventParser.setParallel(false);
                            CanalHAController haController = mysqlEventParser.getHaController();
                            if (haController instanceof MediaHAController) {
                                if (isGroup) {
                                    throw new CanalException("not support group database use media HA");
                                }
                                ((MediaHAController)haController).setCanalHASwitchable((CanalHASwitchable)mysqlEventParser);
                            }
                            if (!haController.isStart()) {
                                haController.start();
                            }
                            if (haController instanceof MediaHAController) {
                                AuthenticationInfo authenticationInfo = ((MediaHAController)haController).getAvailableAuthenticationInfo();
                                ((MysqlEventParser)this.eventParser).setMasterInfo(authenticationInfo);
                            }
                        }
                    }
                };
                instance.setAlarmHandler((CanalAlarmHandler)otterAlarmHandler);
                CanalEventSink eventSink = instance.getEventSink();
                if (eventSink instanceof AbstractCanalEventSink) {
                    CanalEmbedSelector.this.handler = new OtterDownStreamHandler();
                    CanalEmbedSelector.this.handler.setPipelineId(CanalEmbedSelector.this.pipelineId);
                    CanalEmbedSelector.this.handler.setDetectingIntervalInSeconds(canal.getCanalParameter().getDetectingIntervalInSeconds());
                    OtterContextLocator.autowire((Object)CanalEmbedSelector.this.handler);
                    ((AbstractCanalEventSink)eventSink).addHandler((CanalEventDownStreamHandler)CanalEmbedSelector.this.handler, 0);
                    CanalEmbedSelector.this.handler.start();
                }
                return instance;
            }
        });
        this.canalServer.start();
        this.canalServer.start(this.destination);
        this.clientIdentity = new ClientIdentity(this.destination, pipeline.getParameters().getMainstemClientId().shortValue(), this.filter);
        this.canalServer.subscribe(this.clientIdentity);
        this.running = true;
    }

    @Override
    public void stop() {
        if (!this.running) {
            return;
        }
        this.running = false;
        try {
            this.handler.stop();
        }
        catch (Exception e) {
            logger.warn("failed destory handler", (Throwable)e);
        }
        this.handler = null;
        this.canalServer.stop(this.destination);
        this.canalServer.stop();
    }

    public Message<EventData> selector() throws InterruptedException {
        long lastEntryTime;
        int emptyTimes = 0;
        com.alibaba.otter.canal.protocol.Message message = null;
        if (this.batchTimeout < 0L) {
            while (this.running && ((message = this.canalServer.getWithoutAck(this.clientIdentity, this.batchSize)) == null || message.getId() == -1L)) {
                this.applyWait(emptyTimes++);
            }
            if (!this.running) {
                throw new InterruptedException();
            }
        } else {
            while (this.running && ((message = this.canalServer.getWithoutAck(this.clientIdentity, this.batchSize, Long.valueOf(this.batchTimeout), TimeUnit.MILLISECONDS)) == null || message.getId() == -1L)) {
            }
            if (!this.running) {
                throw new InterruptedException();
            }
        }
        ArrayList<CanalEntry.Entry> entries = null;
        if (message.isRaw()) {
            entries = new ArrayList<CanalEntry.Entry>(message.getRawEntries().size());
            for (ByteString entry : message.getRawEntries()) {
                try {
                    entries.add(CanalEntry.Entry.parseFrom((ByteString)entry));
                }
                catch (InvalidProtocolBufferException e) {
                    throw new SelectException(e);
                }
            }
        } else {
            entries = message.getEntries();
        }
        List<EventData> eventDatas = this.messageParser.parse(this.pipelineId, entries);
        Message<EventData> result = new Message<EventData>(message.getId(), eventDatas);
        if (!CollectionUtils.isEmpty(entries) && (lastEntryTime = ((CanalEntry.Entry)entries.get(entries.size() - 1)).getHeader().getExecuteTime()) > 0L) {
            this.lastEntryTime = lastEntryTime;
        }
        if (this.dump && logger.isInfoEnabled()) {
            String startPosition = null;
            String endPosition = null;
            if (!CollectionUtils.isEmpty(entries)) {
                startPosition = this.buildPositionForDump((CanalEntry.Entry)entries.get(0));
                endPosition = this.buildPositionForDump((CanalEntry.Entry)entries.get(entries.size() - 1));
            }
            this.dumpMessages(result, startPosition, endPosition, entries.size());
        }
        return result;
    }

    @Override
    public void rollback(Long batchId) {
        this.canalServer.rollback(this.clientIdentity, batchId);
    }

    @Override
    public void rollback() {
        this.canalServer.rollback(this.clientIdentity);
    }

    @Override
    public void ack(Long batchId) {
        this.canalServer.ack(this.clientIdentity, batchId.longValue());
    }

    @Override
    public List<Long> unAckBatchs() {
        return this.canalServer.listBatchIds(this.clientIdentity);
    }

    @Override
    public Long lastEntryTime() {
        return this.lastEntryTime;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void dumpMessages(Message message, String startPosition, String endPosition, int total) {
        try {
            MDC.put((String)"select", (String)String.valueOf(this.pipelineId));
            logger.info(SEP + "****************************************************" + SEP);
            logger.info(MessageDumper.dumpMessageInfo(message, startPosition, endPosition, total));
            logger.info("****************************************************" + SEP);
            if (this.dumpDetail) {
                this.dumpEventDatas(message.getDatas());
                logger.info("****************************************************" + SEP);
            }
        }
        finally {
            MDC.remove((String)"select");
        }
    }

    private void dumpEventDatas(List<EventData> eventDatas) {
        int size = eventDatas.size();
        int index = 0;
        do {
            if (index + this.logSplitSize >= size) {
                logger.info(MessageDumper.dumpEventDatas(eventDatas.subList(index, size)));
                continue;
            }
            logger.info(MessageDumper.dumpEventDatas(eventDatas.subList(index, index + this.logSplitSize)));
        } while ((index += this.logSplitSize) < size);
    }

    private void applyWait(int emptyTimes) {
        int newEmptyTimes;
        int n = newEmptyTimes = emptyTimes > 10 ? 10 : emptyTimes;
        if (emptyTimes <= 3) {
            Thread.yield();
        } else {
            LockSupport.parkNanos(1000000L * (long)newEmptyTimes);
        }
    }

    private String buildPositionForDump(CanalEntry.Entry entry) {
        long time = entry.getHeader().getExecuteTime();
        Date date = new Date(time);
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        return entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":" + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";
    }

    public void setMessageParser(MessageParser messageParser) {
        this.messageParser = messageParser;
    }

    public void setConfigClientService(ConfigClientService configClientService) {
        this.configClientService = configClientService;
    }

    public void setCanalConfigClient(CanalConfigClient canalConfigClient) {
        this.canalConfigClient = canalConfigClient;
    }

    public void setDump(boolean dump) {
        this.dump = dump;
    }
}

