/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.runtime.stream;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.nuxeo.common.xmap.annotation.XNode;
import org.nuxeo.common.xmap.annotation.XNodeList;
import org.nuxeo.common.xmap.annotation.XNodeMap;
import org.nuxeo.common.xmap.annotation.XObject;
import org.nuxeo.lib.stream.StreamRuntimeException;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Settings;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.runtime.codec.CodecService;
import org.nuxeo.runtime.stream.StreamProcessorTopology;

@XObject(value="streamProcessor")
public class StreamProcessorDescriptor {
    public static final Integer DEFAULT_CONCURRENCY = 4;
    @XNode(value="@name")
    public String name;
    @XNode(value="@logConfig")
    public String config;
    @XNode(value="@class")
    public Class<? extends StreamProcessorTopology> klass;
    @XNode(value="@defaultConcurrency")
    public Integer defaultConcurrency = DEFAULT_CONCURRENCY;
    @XNode(value="@defaultPartitions")
    public Integer defaultPartitions = DEFAULT_CONCURRENCY;
    @XNode(value="@defaultCodec")
    public String defaultCodec;
    @XNodeMap(value="option", key="@name", type=HashMap.class, componentType=String.class)
    public Map<String, String> options = new HashMap<String, String>();
    @XNodeList(value="computation", type=ArrayList.class, componentType=ComputationDescriptor.class)
    public List<ComputationDescriptor> computations = new ArrayList<ComputationDescriptor>(0);
    @XNodeList(value="stream", type=ArrayList.class, componentType=StreamDescriptor.class)
    public List<StreamDescriptor> streams = new ArrayList<StreamDescriptor>(0);

    public String getName() {
        return this.name;
    }

    public Settings getSettings(CodecService codecService) {
        Settings settings = new Settings(this.defaultConcurrency.intValue(), this.defaultPartitions.intValue(), this.getDefaultCodec(codecService));
        this.computations.forEach(comp -> settings.setConcurrency(comp.name, comp.concurrency.intValue()));
        this.streams.forEach(stream -> settings.setPartitions(stream.name, stream.partitions.intValue()));
        this.streams.stream().filter(stream -> Objects.nonNull(stream.codec)).forEach(stream -> settings.setCodec(stream.name, codecService.getCodec(stream.codec, Record.class)));
        return settings;
    }

    public Codec<Record> getDefaultCodec(CodecService codecService) {
        if (this.defaultCodec == null) {
            return null;
        }
        return codecService.getCodec(this.defaultCodec, Record.class);
    }

    public Topology getTopology() {
        try {
            return this.klass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]).getTopology(this.options);
        }
        catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new StreamRuntimeException("Can not create topology for processor: " + this.name, (Throwable)e);
        }
    }

    @XObject(value="stream")
    public static class StreamDescriptor {
        @XNode(value="@name")
        public String name;
        @XNode(value="@partitions")
        public Integer partitions = DEFAULT_CONCURRENCY;
        @XNode(value="@codec")
        public String codec;
    }

    @XObject(value="computation")
    public static class ComputationDescriptor {
        @XNode(value="@name")
        public String name;
        @XNode(value="@concurrency")
        public Integer concurrency = DEFAULT_CONCURRENCY;
    }
}

