/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.server.embedded;

import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.SecurityUtil;
import com.alibaba.otter.canal.protocol.position.Position;
import com.alibaba.otter.canal.protocol.position.PositionRange;
import com.alibaba.otter.canal.server.CanalServer;
import com.alibaba.otter.canal.server.CanalService;
import com.alibaba.otter.canal.server.exception.CanalServerException;
import com.alibaba.otter.canal.spi.CanalMetricsProvider;
import com.alibaba.otter.canal.spi.CanalMetricsService;
import com.alibaba.otter.canal.spi.NopCanalMetricsService;
import com.alibaba.otter.canal.store.CanalEventStore;
import com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer;
import com.alibaba.otter.canal.store.model.Event;
import com.alibaba.otter.canal.store.model.Events;
import com.google.common.collect.Maps;
import com.google.common.collect.MigrateMap;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;

public class CanalServerWithEmbedded
extends AbstractCanalLifeCycle
implements CanalServer,
CanalService {
    private static final Logger logger = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
    private Map<String, CanalInstance> canalInstances;
    private CanalInstanceGenerator canalInstanceGenerator;
    private int metricsPort;
    private CanalMetricsService metrics = NopCanalMetricsService.NOP;
    private String user;
    private String passwd;

    public static CanalServerWithEmbedded instance() {
        return SingletonHolder.CANAL_SERVER_WITH_EMBEDDED;
    }

    @Override
    public void start() {
        if (!this.isStart()) {
            super.start();
            this.loadCanalMetrics();
            this.metrics.setServerPort(this.metricsPort);
            this.metrics.initialize();
            this.canalInstances = MigrateMap.makeComputingMap(destination -> this.canalInstanceGenerator.generate(destination));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        super.stop();
        for (Map.Entry<String, CanalInstance> entry : this.canalInstances.entrySet()) {
            try {
                CanalInstance instance = entry.getValue();
                if (!instance.isStart()) continue;
                try {
                    String destination = entry.getKey();
                    MDC.put((String)"destination", (String)destination);
                    entry.getValue().stop();
                    logger.info("stop CanalInstances[{}] successfully", (Object)destination);
                }
                finally {
                    MDC.remove((String)"destination");
                }
            }
            catch (Exception e) {
                logger.error(String.format("stop CanalInstance[%s] has an error", entry.getKey()), (Throwable)e);
            }
        }
        this.metrics.terminate();
    }

    public boolean auth(String user, String passwd, byte[] seed) {
        if (StringUtils.isEmpty((String)this.user) || StringUtils.equals((String)this.user, (String)user)) {
            if (StringUtils.isEmpty((String)this.passwd)) {
                return true;
            }
            if (StringUtils.isEmpty((String)passwd)) {
                return false;
            }
            try {
                byte[] passForClient = SecurityUtil.hexStr2Bytes((String)passwd);
                return SecurityUtil.scrambleServerAuth((byte[])passForClient, (byte[])SecurityUtil.hexStr2Bytes((String)this.passwd), (byte[])seed);
            }
            catch (NoSuchAlgorithmException e) {
                return false;
            }
        }
        return false;
    }

    public void start(String destination) {
        CanalInstance canalInstance = this.canalInstances.get(destination);
        if (!canalInstance.isStart()) {
            try {
                MDC.put((String)"destination", (String)destination);
                if (this.metrics.isRunning()) {
                    this.metrics.register(canalInstance);
                }
                canalInstance.start();
                logger.info("start CanalInstances[{}] successfully", (Object)destination);
            }
            finally {
                MDC.remove((String)"destination");
            }
        }
    }

    public void stop(String destination) {
        CanalInstance canalInstance = this.canalInstances.remove(destination);
        if (canalInstance != null && canalInstance.isStart()) {
            try {
                MDC.put((String)"destination", (String)destination);
                canalInstance.stop();
                if (this.metrics.isRunning()) {
                    this.metrics.unregister(canalInstance);
                }
                logger.info("stop CanalInstances[{}] successfully", (Object)destination);
            }
            finally {
                MDC.remove((String)"destination");
            }
        }
    }

    public boolean isStart(String destination) {
        return this.canalInstances.containsKey(destination) && this.canalInstances.get(destination).isStart();
    }

    @Override
    public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
        this.checkStart(clientIdentity.getDestination());
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        if (!canalInstance.getMetaManager().isStart()) {
            canalInstance.getMetaManager().start();
        }
        canalInstance.getMetaManager().subscribe(clientIdentity);
        Position position = canalInstance.getMetaManager().getCursor(clientIdentity);
        if (position == null) {
            position = canalInstance.getEventStore().getFirstPosition();
            if (position != null) {
                canalInstance.getMetaManager().updateCursor(clientIdentity, position);
            }
            logger.info("subscribe successfully, {} with first position:{} ", (Object)clientIdentity, (Object)position);
        } else {
            logger.info("subscribe successfully, {} use last cursor position:{} ", (Object)clientIdentity, (Object)position);
        }
        canalInstance.subscribeChange(clientIdentity);
    }

    @Override
    public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException {
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        canalInstance.getMetaManager().unsubscribe(clientIdentity);
        logger.info("unsubscribe successfully, {}", (Object)clientIdentity);
    }

    public List<ClientIdentity> listAllSubscribe(String destination) throws CanalServerException {
        CanalInstance canalInstance = this.canalInstances.get(destination);
        return canalInstance.getMetaManager().listAllSubscribeInfo(destination);
    }

    @Override
    public Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException {
        return this.get(clientIdentity, batchSize, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException {
        CanalInstance canalInstance;
        this.checkStart(clientIdentity.getDestination());
        this.checkSubscribe(clientIdentity);
        CanalInstance canalInstance2 = canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        synchronized (canalInstance2) {
            PositionRange positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);
            if (positionRanges != null) {
                throw new CanalServerException(String.format("clientId:%s has last batch:[%s] isn't ack , maybe loss data", clientIdentity.getClientId(), positionRanges));
            }
            Events<Event> events = null;
            Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
            events = this.getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);
            if (CollectionUtils.isEmpty((Collection)events.getEvents())) {
                logger.debug("get successfully, clientId:{} batchSize:{} but result is null", (Object)clientIdentity.getClientId(), (Object)batchSize);
                return new Message(-1L, true, new ArrayList());
            }
            Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
            boolean raw = this.isRaw(canalInstance.getEventStore());
            List entrys = null;
            entrys = raw ? events.getEvents().stream().map(Event::getRawEntry).collect(Collectors.toList()) : events.getEvents().stream().map(Event::getEntry).collect(Collectors.toList());
            if (logger.isInfoEnabled()) {
                logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]", new Object[]{clientIdentity.getClientId(), batchSize, entrys.size(), batchId, events.getPositionRange()});
            }
            this.ack(clientIdentity, batchId);
            return new Message(batchId.longValue(), raw, entrys);
        }
    }

    @Override
    public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize) throws CanalServerException {
        return this.getWithoutAck(clientIdentity, batchSize, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException {
        CanalInstance canalInstance;
        this.checkStart(clientIdentity.getDestination());
        this.checkSubscribe(clientIdentity);
        CanalInstance canalInstance2 = canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        synchronized (canalInstance2) {
            PositionRange positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);
            Events<Event> events = null;
            if (positionRanges != null) {
                events = this.getEvents(canalInstance.getEventStore(), positionRanges.getStart(), batchSize, timeout, unit);
            } else {
                Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
                if (start == null) {
                    start = canalInstance.getEventStore().getFirstPosition();
                }
                events = this.getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);
            }
            if (CollectionUtils.isEmpty((Collection)events.getEvents())) {
                return new Message(-1L, true, new ArrayList());
            }
            Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
            boolean raw = this.isRaw(canalInstance.getEventStore());
            List entrys = null;
            entrys = raw ? events.getEvents().stream().map(Event::getRawEntry).collect(Collectors.toList()) : events.getEvents().stream().map(Event::getEntry).collect(Collectors.toList());
            if (logger.isInfoEnabled()) {
                logger.info("getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , position:{}]", new Object[]{clientIdentity.getClientId(), batchSize, entrys.size(), batchId, events.getPositionRange()});
            }
            return new Message(batchId.longValue(), raw, entrys);
        }
    }

    public List<Long> listBatchIds(ClientIdentity clientIdentity) throws CanalServerException {
        this.checkStart(clientIdentity.getDestination());
        this.checkSubscribe(clientIdentity);
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        Map batchs = canalInstance.getMetaManager().listAllBatchs(clientIdentity);
        ArrayList<Long> result = new ArrayList<Long>(batchs.keySet());
        Collections.sort(result);
        return result;
    }

    @Override
    public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
        this.checkStart(clientIdentity.getDestination());
        this.checkSubscribe(clientIdentity);
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        PositionRange positionRanges = null;
        positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, Long.valueOf(batchId));
        if (positionRanges == null) {
            throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check", clientIdentity.getClientId(), batchId));
        }
        if (positionRanges.getAck() != null) {
            canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
            if (logger.isInfoEnabled()) {
                logger.info("ack successfully, clientId:{} batchId:{} position:{}", new Object[]{clientIdentity.getClientId(), batchId, positionRanges});
            }
        }
        canalInstance.getEventStore().ack(positionRanges.getEnd(), positionRanges.getEndSeq());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void rollback(ClientIdentity clientIdentity) throws CanalServerException {
        this.checkStart(clientIdentity.getDestination());
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
        if (!hasSubscribe) {
            return;
        }
        CanalInstance canalInstance2 = canalInstance;
        synchronized (canalInstance2) {
            canalInstance.getMetaManager().clearAllBatchs(clientIdentity);
            canalInstance.getEventStore().rollback();
            logger.info("rollback successfully, clientId:{}", new Object[]{clientIdentity.getClientId()});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException {
        this.checkStart(clientIdentity.getDestination());
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
        if (!hasSubscribe) {
            return;
        }
        CanalInstance canalInstance2 = canalInstance;
        synchronized (canalInstance2) {
            PositionRange positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId);
            if (positionRanges == null) {
                throw new CanalServerException(String.format("rollback error, clientId:%s batchId:%d is not exist , please check", clientIdentity.getClientId(), batchId));
            }
            canalInstance.getEventStore().rollback();
            logger.info("rollback successfully, clientId:{} batchId:{} position:{}", new Object[]{clientIdentity.getClientId(), batchId, positionRanges});
        }
    }

    public Map<String, CanalInstance> getCanalInstances() {
        return Maps.newHashMap(this.canalInstances);
    }

    private Events<Event> getEvents(CanalEventStore eventStore, Position start, int batchSize, Long timeout, TimeUnit unit) {
        if (timeout == null) {
            return eventStore.tryGet(start, batchSize);
        }
        try {
            if (timeout <= 0L) {
                return eventStore.get(start, batchSize);
            }
            return eventStore.get(start, batchSize, timeout.longValue(), unit);
        }
        catch (Exception e) {
            throw new CanalServerException(e);
        }
    }

    private void checkSubscribe(ClientIdentity clientIdentity) {
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
        if (!hasSubscribe) {
            throw new CanalServerException(String.format("ClientIdentity:%s should subscribe first", clientIdentity.toString()));
        }
    }

    private void checkStart(String destination) {
        if (!this.isStart(destination)) {
            throw new CanalServerException(String.format("destination:%s should start first", destination));
        }
    }

    private void loadCanalMetrics() {
        ServiceLoader<CanalMetricsProvider> providers = ServiceLoader.load(CanalMetricsProvider.class);
        ArrayList<CanalMetricsProvider> list = new ArrayList<CanalMetricsProvider>();
        for (CanalMetricsProvider provider : providers) {
            list.add(provider);
        }
        if (list.isEmpty()) {
            return;
        }
        if (list.size() > 1) {
            logger.warn("Found more than one CanalMetricsProvider, use the first one.");
            for (CanalMetricsProvider p : list) {
                logger.warn("Found CanalMetricsProvider: {}.", (Object)p.getClass().getName());
            }
        }
        CanalMetricsProvider provider = (CanalMetricsProvider)list.get(0);
        this.metrics = provider.getService();
    }

    private boolean isRaw(CanalEventStore eventStore) {
        if (eventStore instanceof MemoryEventStoreWithBuffer) {
            return ((MemoryEventStoreWithBuffer)eventStore).isRaw();
        }
        return true;
    }

    public void setCanalInstanceGenerator(CanalInstanceGenerator canalInstanceGenerator) {
        this.canalInstanceGenerator = canalInstanceGenerator;
    }

    public void setMetricsPort(int metricsPort) {
        this.metricsPort = metricsPort;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public void setPasswd(String passwd) {
        this.passwd = passwd;
    }

    private static class SingletonHolder {
        private static final CanalServerWithEmbedded CANAL_SERVER_WITH_EMBEDDED = new CanalServerWithEmbedded();

        private SingletonHolder() {
        }
    }
}

