/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.runtime.stream;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.computation.Settings;
import org.nuxeo.lib.stream.computation.StreamProcessor;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.lib.stream.computation.log.LogStreamProcessor;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.chronicle.ChronicleLogManager;
import org.nuxeo.lib.stream.log.kafka.KafkaLogManager;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.codec.CodecService;
import org.nuxeo.runtime.kafka.KafkaConfigService;
import org.nuxeo.runtime.model.ComponentContext;
import org.nuxeo.runtime.model.ComponentInstance;
import org.nuxeo.runtime.model.ComponentManager;
import org.nuxeo.runtime.model.DefaultComponent;
import org.nuxeo.runtime.stream.LogConfigDescriptor;
import org.nuxeo.runtime.stream.StreamProcessorDescriptor;
import org.nuxeo.runtime.stream.StreamService;

public class StreamServiceImpl
extends DefaultComponent
implements StreamService {
    public static final String NUXEO_STREAM_DIR_PROP = "nuxeo.stream.chronicle.dir";
    public static final String NUXEO_STREAM_RET_DURATION_PROP = "nuxeo.stream.chronicle.retention.duration";
    protected static final String LOG_CONFIG_XP = "logConfig";
    protected static final String STREAM_PROCESSOR_XP = "streamProcessor";
    private static final Log log = LogFactory.getLog(StreamServiceImpl.class);
    protected final Map<String, LogConfigDescriptor> configs = new HashMap<String, LogConfigDescriptor>();
    protected final Map<String, LogManager> managers = new HashMap<String, LogManager>();
    protected final Map<String, StreamProcessor> processors = new HashMap<String, StreamProcessor>();
    protected final Map<String, StreamProcessorDescriptor> processorDescriptors = new HashMap<String, StreamProcessorDescriptor>();

    public int getApplicationStartedOrder() {
        return -590;
    }

    @Override
    public LogManager getLogManager(String name) {
        if (!this.managers.containsKey(name)) {
            if (!this.configs.containsKey(name)) {
                throw new IllegalArgumentException("Unknown logConfig: " + name);
            }
            LogConfigDescriptor config = this.configs.get(name);
            if (config.isKafkaLog()) {
                this.managers.put(name, this.createKafkaLogManager(config));
            } else {
                this.managers.put(name, this.createChronicleLogManager(config));
            }
        }
        return this.managers.get(name);
    }

    protected LogManager createKafkaLogManager(LogConfigDescriptor config) {
        String kafkaConfig = config.getOption("kafkaConfig", "default");
        KafkaConfigService service = (KafkaConfigService)Framework.getService(KafkaConfigService.class);
        return new KafkaLogManager(service.getTopicPrefix(kafkaConfig), service.getProducerProperties(kafkaConfig), service.getConsumerProperties(kafkaConfig));
    }

    protected LogManager createChronicleLogManager(LogConfigDescriptor config) {
        String basePath = config.getOption("basePath", null);
        String directory = config.getOption("directory", config.getName());
        Path path = this.getChroniclePath(basePath, directory);
        String retention = this.getChronicleRetention(config.getOption("retention", null));
        return new ChronicleLogManager(path, retention);
    }

    protected String getChronicleRetention(String retention) {
        return retention != null ? retention : Framework.getProperty((String)NUXEO_STREAM_RET_DURATION_PROP, (String)"4d");
    }

    protected Path getChroniclePath(String basePath, String name) {
        if (basePath != null) {
            return Paths.get(basePath, name).toAbsolutePath();
        }
        basePath = Framework.getProperty((String)NUXEO_STREAM_DIR_PROP);
        if (basePath != null) {
            return Paths.get(basePath, name).toAbsolutePath();
        }
        basePath = Framework.getProperty((String)"nuxeo.data.dir");
        if (basePath != null) {
            return Paths.get(basePath, "stream", name).toAbsolutePath();
        }
        return Paths.get(Framework.getRuntime().getHome().getAbsolutePath(), "data", "stream", name).toAbsolutePath();
    }

    protected void createStreamIfNotExists(String name, LogConfigDescriptor config) {
        if (config.getLogsToCreate().isEmpty()) {
            return;
        }
        LogManager manager = this.getLogManager(name);
        config.getLogsToCreate().forEach((stream, size) -> {
            log.info((Object)("Create if not exists stream: " + stream + " with manager: " + name));
            manager.createIfNotExists(stream, size.intValue());
        });
    }

    public void start(ComponentContext context) {
        super.start(context);
        this.configs.forEach(this::createStreamIfNotExists);
        this.processorDescriptors.forEach(this::initProcessor);
        new ComponentsLifeCycleListener().install();
    }

    protected void initProcessor(String name, StreamProcessorDescriptor descriptor) {
        if (this.processors.containsKey(name)) {
            log.error((Object)("Processor already initialized: " + name));
            return;
        }
        log.info((Object)("Init Stream processor: " + name + " with manager: " + descriptor.config));
        LogManager manager = this.getLogManager(descriptor.config);
        Topology topology = descriptor.getTopology();
        LogStreamProcessor streamProcessor = new LogStreamProcessor(manager);
        Settings settings = descriptor.getSettings((CodecService)Framework.getService(CodecService.class));
        if (log.isDebugEnabled()) {
            log.debug((Object)("Starting computation topology: " + name + "\n" + topology.toPlantuml(settings)));
        }
        streamProcessor.init(topology, settings);
        this.processors.put(name, (StreamProcessor)streamProcessor);
    }

    public void stop(ComponentContext context) throws InterruptedException {
        super.stop(context);
        this.stopComputations();
        this.closeLogManagers();
    }

    protected void startComputations() {
        this.processorDescriptors.keySet().forEach(name -> {
            StreamProcessor manager = this.processors.get(name);
            if (manager != null) {
                manager.start();
            }
        });
    }

    protected void stopComputations() {
        this.processors.forEach((name, manager) -> manager.stop(Duration.ofSeconds(1L)));
        this.processors.clear();
    }

    protected void closeLogManagers() {
        this.managers.values().stream().filter(Objects::nonNull).forEach(LogManager::close);
        this.managers.clear();
    }

    public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
        if (extensionPoint.equals(LOG_CONFIG_XP)) {
            LogConfigDescriptor descriptor = (LogConfigDescriptor)contribution;
            this.configs.put(descriptor.name, descriptor);
            log.debug((Object)String.format("Register logConfig: %s", descriptor.name));
        } else if (extensionPoint.equals(STREAM_PROCESSOR_XP)) {
            StreamProcessorDescriptor descriptor = (StreamProcessorDescriptor)contribution;
            this.processorDescriptors.put(descriptor.name, descriptor);
            log.debug((Object)String.format("Register Stream StreamProcessorTopologyProcessor: %s", descriptor.name));
        }
    }

    public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) {
        if (extensionPoint.equals(LOG_CONFIG_XP)) {
            LogConfigDescriptor descriptor = (LogConfigDescriptor)contribution;
            this.configs.remove(descriptor.name);
            log.debug((Object)String.format("Unregister logConfig: %s", descriptor.name));
        } else if (extensionPoint.equals(STREAM_PROCESSOR_XP)) {
            StreamProcessorDescriptor descriptor = (StreamProcessorDescriptor)contribution;
            this.processorDescriptors.remove(descriptor.name);
            log.debug((Object)String.format("Unregister Stream StreamProcessorTopologyProcessor: %s", descriptor.name));
        }
    }

    protected class ComponentsLifeCycleListener
    implements ComponentManager.Listener {
        protected ComponentsLifeCycleListener() {
        }

        public void afterStart(ComponentManager mgr, boolean isResume) {
            StreamServiceImpl.this.startComputations();
        }

        public void beforeStop(ComponentManager mgr, boolean isStandby) {
            StreamServiceImpl.this.stopComputations();
            Framework.getRuntime().getComponentManager().removeListener((ComponentManager.Listener)this);
        }
    }
}

