package org.apache.storm.trident.planner;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
import org.apache.storm.shade.org.jgrapht.graph.DefaultDirectedGraph;
import org.apache.storm.shade.org.jgrapht.graph.DirectedSubgraph;
import org.apache.storm.shade.org.jgrapht.traverse.TopologicalOrderIterator;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.trident.planner.processor.TridentContext;
import org.apache.storm.trident.topology.BatchInfo;
import org.apache.storm.trident.topology.ITridentBatchBolt;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.trident.tuple.TridentTupleView;
import org.apache.storm.trident.util.IndexedEdge;
import org.apache.storm.trident.util.TridentUtils;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

/* loaded from: input_file:org/apache/storm/trident/planner/SubtopologyBolt.class */
public class SubtopologyBolt implements ITridentBatchBolt {
    private static final long serialVersionUID = 1475508603138688412L;
    final DirectedGraph<Node, IndexedEdge> graph;
    final Set<Node> nodes;
    final Map<String, InitialReceiver> roots = new HashMap();
    final Map<Node, TridentTuple.Factory> outputFactories = new HashMap();
    final Map<String, List<TridentProcessor>> myTopologicallyOrdered = new HashMap();
    final Map<Node, String> batchGroups;

    /* loaded from: input_file:org/apache/storm/trident/planner/SubtopologyBolt$InitialReceiver.class */
    protected static class InitialReceiver {
        List<TridentProcessor> receivers = new ArrayList();
        TridentTupleView.RootFactory factory;
        TridentTupleView.ProjectionFactory project;
        String stream;

        public InitialReceiver(String str, Fields fields) {
            this.stream = str;
            this.factory = new TridentTupleView.RootFactory(fields);
            ArrayList arrayList = new ArrayList(fields.toList());
            arrayList.remove(0);
            this.project = new TridentTupleView.ProjectionFactory(this.factory, new Fields(arrayList));
        }

        public void receive(ProcessorContext processorContext, Tuple tuple) {
            TridentTuple create = this.project.create(this.factory.create(tuple));
            Iterator<TridentProcessor> it = this.receivers.iterator();
            while (it.hasNext()) {
                it.next().execute(processorContext, this.stream, create);
            }
        }

        public void addReceiver(TridentProcessor tridentProcessor) {
            this.receivers.add(tridentProcessor);
        }

        public TridentTuple.Factory getOutputFactory() {
            return this.project;
        }
    }

    public SubtopologyBolt(DefaultDirectedGraph<Node, IndexedEdge> defaultDirectedGraph, Set<Node> set, Map<Node, String> map) {
        this.nodes = set;
        this.graph = (DirectedGraph) defaultDirectedGraph.clone();
        this.batchGroups = copyAndOnlyKeep(map, set);
        HashSet hashSet = new HashSet();
        for (IndexedEdge indexedEdge : this.graph.edgeSet()) {
            Node node = (Node) this.graph.getEdgeSource(indexedEdge);
            Node node2 = (Node) this.graph.getEdgeTarget(indexedEdge);
            if (this.nodes.contains(node) || this.nodes.contains(node2)) {
                hashSet.add(node);
                hashSet.add(node2);
            }
        }
        HashSet hashSet2 = new HashSet(this.graph.vertexSet());
        hashSet2.removeAll(hashSet);
        this.graph.removeAllVertices(hashSet2);
    }

    private static Map<Node, String> copyAndOnlyKeep(Map<Node, String> map, Set<Node> set) {
        HashMap hashMap = new HashMap(set.size());
        for (Map.Entry<Node, String> entry : map.entrySet()) {
            if (set.contains(entry.getKey())) {
                hashMap.put(entry.getKey(), entry.getValue());
            }
        }
        return hashMap;
    }

    @Override // org.apache.storm.trident.topology.ITridentBatchBolt
    public void prepare(Map<String, Object> map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector) {
        int size = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
        for (Node node : this.nodes) {
            if (node.stateInfo != null) {
                topologyContext.setTaskData(node.stateInfo.id, node.stateInfo.spec.stateFactory.makeState(map, topologyContext, topologyContext.getThisTaskIndex(), size));
            }
        }
        TopologicalOrderIterator topologicalOrderIterator = new TopologicalOrderIterator(new DirectedSubgraph(this.graph, this.nodes, (Set) null));
        int i = 0;
        while (topologicalOrderIterator.hasNext()) {
            Node node2 = (Node) topologicalOrderIterator.next();
            if (node2 instanceof ProcessorNode) {
                ProcessorNode processorNode = (ProcessorNode) node2;
                String str = this.batchGroups.get(node2);
                if (!this.myTopologicallyOrdered.containsKey(str)) {
                    this.myTopologicallyOrdered.put(str, new ArrayList());
                }
                this.myTopologicallyOrdered.get(str).add(processorNode.processor);
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                for (Node node3 : TridentUtils.getParents(this.graph, node2)) {
                    arrayList.add(node3.streamId);
                    if (this.nodes.contains(node3)) {
                        arrayList2.add(this.outputFactories.get(node3));
                    } else {
                        if (!this.roots.containsKey(node3.streamId)) {
                            this.roots.put(node3.streamId, new InitialReceiver(node3.streamId, getSourceOutputFields(topologyContext, node3.streamId)));
                        }
                        this.roots.get(node3.streamId).addReceiver(processorNode.processor);
                        arrayList2.add(this.roots.get(node3.streamId).getOutputFactory());
                    }
                }
                ArrayList arrayList3 = new ArrayList();
                boolean z = false;
                for (Node node4 : TridentUtils.getChildren(this.graph, node2)) {
                    if (this.nodes.contains(node4)) {
                        arrayList3.add(((ProcessorNode) node4).processor);
                    } else {
                        z = true;
                    }
                }
                if (z) {
                    arrayList3.add(new BridgeReceiver(batchOutputCollector));
                }
                processorNode.processor.prepare(map, topologyContext, new TridentContext(processorNode.selfOutFields, arrayList2, arrayList, arrayList3, processorNode.streamId, i, batchOutputCollector));
                this.outputFactories.put(node2, processorNode.processor.getOutputFactory());
            }
            i++;
        }
    }

    private Fields getSourceOutputFields(TopologyContext topologyContext, String str) {
        for (GlobalStreamId globalStreamId : topologyContext.getThisSources().keySet()) {
            if (globalStreamId.get_streamId().equals(str)) {
                return topologyContext.getComponentOutputFields(globalStreamId);
            }
        }
        throw new RuntimeException("Could not find fields for source stream " + str);
    }

    @Override // org.apache.storm.trident.topology.ITridentBatchBolt
    public void execute(BatchInfo batchInfo, Tuple tuple) {
        InitialReceiver initialReceiver = this.roots.get(tuple.getSourceStreamId());
        if (initialReceiver == null) {
            throw new RuntimeException("Received unexpected tuple " + tuple.toString());
        }
        initialReceiver.receive((ProcessorContext) batchInfo.state, tuple);
    }

    @Override // org.apache.storm.trident.topology.ITridentBatchBolt
    public void finishBatch(BatchInfo batchInfo) {
        Iterator<TridentProcessor> it = this.myTopologicallyOrdered.get(batchInfo.batchGroup).iterator();
        while (it.hasNext()) {
            it.next().finishBatch((ProcessorContext) batchInfo.state);
        }
    }

    @Override // org.apache.storm.trident.topology.ITridentBatchBolt
    public Object initBatchState(String str, Object obj) {
        ProcessorContext processorContext = new ProcessorContext(obj, new Object[this.nodes.size()]);
        Iterator<TridentProcessor> it = this.myTopologicallyOrdered.get(str).iterator();
        while (it.hasNext()) {
            it.next().startBatch(processorContext);
        }
        return processorContext;
    }

    @Override // org.apache.storm.trident.topology.ITridentBatchBolt
    public void cleanup() {
        Iterator<String> it = this.myTopologicallyOrdered.keySet().iterator();
        while (it.hasNext()) {
            Iterator<TridentProcessor> it2 = this.myTopologicallyOrdered.get(it.next()).iterator();
            while (it2.hasNext()) {
                it2.next().cleanup();
            }
        }
    }

    @Override // org.apache.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        for (Node node : this.nodes) {
            outputFieldsDeclarer.declareStream(node.streamId, TridentUtils.fieldsConcat(new Fields("$batchId"), node.allOutputFields));
        }
    }

    @Override // org.apache.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
