package org.apache.storm.streams;

import java.util.ArrayList;
import java.util.Set;
import org.apache.storm.annotation.InterfaceStability;
import org.apache.storm.streams.operations.BiFunction;
import org.apache.storm.streams.operations.CombinerAggregator;
import org.apache.storm.streams.operations.Consumer;
import org.apache.storm.streams.operations.FlatMapFunction;
import org.apache.storm.streams.operations.Function;
import org.apache.storm.streams.operations.IdentityFunction;
import org.apache.storm.streams.operations.PairFlatMapFunction;
import org.apache.storm.streams.operations.PairFunction;
import org.apache.storm.streams.operations.Predicate;
import org.apache.storm.streams.operations.PrintConsumer;
import org.apache.storm.streams.operations.Reducer;
import org.apache.storm.streams.operations.aggregators.Count;
import org.apache.storm.streams.processors.AggregateProcessor;
import org.apache.storm.streams.processors.BranchProcessor;
import org.apache.storm.streams.processors.FilterProcessor;
import org.apache.storm.streams.processors.FlatMapProcessor;
import org.apache.storm.streams.processors.ForEachProcessor;
import org.apache.storm.streams.processors.MapProcessor;
import org.apache.storm.streams.processors.MergeAggregateProcessor;
import org.apache.storm.streams.processors.PeekProcessor;
import org.apache.storm.streams.processors.Processor;
import org.apache.storm.streams.processors.ReduceProcessor;
import org.apache.storm.streams.processors.StateQueryProcessor;
import org.apache.storm.streams.windowing.Window;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/storm/streams/Stream.class */
public class Stream<T> {
    protected static final Fields KEY = new Fields("key");
    protected static final Fields VALUE = new Fields("value");
    protected static final Fields KEY_VALUE = new Fields("key", "value");
    private static final Logger LOG = LoggerFactory.getLogger(Stream.class);
    protected final StreamBuilder streamBuilder;
    protected final Node node;
    protected final String stream;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Stream(StreamBuilder streamBuilder, Node node) {
        this(streamBuilder, node, node.getOutputStreams().iterator().next());
    }

    private Stream(StreamBuilder streamBuilder, Node node, String str) {
        this.streamBuilder = streamBuilder;
        this.node = node;
        this.stream = str;
    }

    public Stream<T> filter(Predicate<? super T> predicate) {
        return new Stream<>(this.streamBuilder, addProcessorNode(new FilterProcessor(predicate), VALUE, true));
    }

    public <R> Stream<R> map(Function<? super T, ? extends R> function) {
        return new Stream<>(this.streamBuilder, addProcessorNode(new MapProcessor(function), VALUE));
    }

    public <K, V> PairStream<K, V> mapToPair(PairFunction<? super T, ? extends K, ? extends V> pairFunction) {
        return new PairStream<>(this.streamBuilder, addProcessorNode(new MapProcessor(pairFunction), KEY_VALUE));
    }

    public <R> Stream<R> flatMap(FlatMapFunction<? super T, ? extends R> flatMapFunction) {
        return new Stream<>(this.streamBuilder, addProcessorNode(new FlatMapProcessor(flatMapFunction), VALUE));
    }

    public <K, V> PairStream<K, V> flatMapToPair(PairFlatMapFunction<? super T, ? extends K, ? extends V> pairFlatMapFunction) {
        return new PairStream<>(this.streamBuilder, addProcessorNode(new FlatMapProcessor(pairFlatMapFunction), KEY_VALUE));
    }

    public Stream<T> window(Window<?, ?> window) {
        return new Stream<>(this.streamBuilder, addNode(new WindowNode(window, this.stream, this.node.getOutputFields())));
    }

    public void forEach(Consumer<? super T> consumer) {
        addProcessorNode(new ForEachProcessor(consumer), new Fields(new String[0]));
    }

    public Stream<T> peek(Consumer<? super T> consumer) {
        return new Stream<>(this.streamBuilder, addProcessorNode(new PeekProcessor(consumer), this.node.getOutputFields(), true));
    }

    public <A, R> Stream<R> aggregate(CombinerAggregator<? super T, A, ? extends R> combinerAggregator) {
        return combine(combinerAggregator);
    }

    public <R> Stream<R> aggregate(R r, BiFunction<? super R, ? super T, ? extends R> biFunction, BiFunction<? super R, ? super R, ? extends R> biFunction2) {
        return combine(CombinerAggregator.of(r, biFunction, biFunction2));
    }

    public Stream<Long> count() {
        return aggregate(new Count());
    }

    public Stream<T> reduce(Reducer<T> reducer) {
        return combine(reducer);
    }

    public Stream<T> repartition(int i) {
        if (i < 1) {
            throw new IllegalArgumentException("Parallelism should be >= 1");
        }
        if (this.node.getParallelism() == i) {
            LOG.debug("Node's current parallelism {}, new parallelism {}", Integer.valueOf(this.node.getParallelism()), Integer.valueOf(i));
            return this;
        }
        return new Stream<>(this.streamBuilder, addNode(this.node, new PartitionNode(this.stream, this.node.getOutputFields()), i));
    }

    public Stream<T>[] branch(Predicate<? super T>... predicateArr) {
        ArrayList arrayList = new ArrayList();
        if (predicateArr.length > 0) {
            BranchProcessor branchProcessor = new BranchProcessor();
            Node addProcessorNode = addProcessorNode(branchProcessor, VALUE);
            for (Predicate<? super T> predicate : predicateArr) {
                ProcessorNode makeProcessorNode = makeProcessorNode(new MapProcessor<>(new IdentityFunction()), this.node.getOutputFields());
                String str = makeProcessorNode.getOutputStreams().iterator().next() + "-branch";
                addProcessorNode.addOutputStream(str);
                addNode(addProcessorNode, makeProcessorNode, str);
                arrayList.add(new Stream(this.streamBuilder, makeProcessorNode));
                branchProcessor.addPredicate(predicate, str);
            }
        }
        return (Stream[]) arrayList.toArray(new Stream[arrayList.size()]);
    }

    public void print() {
        forEach(new PrintConsumer());
    }

    public void to(IRichBolt iRichBolt) {
        to(iRichBolt, 1);
    }

    public void to(IRichBolt iRichBolt, int i) {
        addSinkNode(new SinkNode(iRichBolt), i);
    }

    public void to(IBasicBolt iBasicBolt) {
        to(iBasicBolt, 1);
    }

    public void to(IBasicBolt iBasicBolt, int i) {
        addSinkNode(new SinkNode(iBasicBolt), i);
    }

    public <V> PairStream<T, V> stateQuery(StreamState<T, V> streamState) {
        return new PairStream<>(this.streamBuilder, partitionBy(VALUE, this.node.getParallelism()).addProcessorNode(new StateQueryProcessor(streamState), KEY_VALUE));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Node getNode() {
        return this.node;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Node addNode(Node node, Node node2, int i) {
        return this.streamBuilder.addNode(node, node2, i);
    }

    Node addNode(Node node) {
        return addNode(this.node, node);
    }

    private Node addNode(Node node, Node node2) {
        return this.streamBuilder.addNode(node, node2);
    }

    private Node addNode(Node node, Node node2, String str) {
        return this.streamBuilder.addNode(node, node2, str);
    }

    private Node addNode(Node node, Node node2, String str, int i) {
        return this.streamBuilder.addNode(node, node2, str, i);
    }

    Node addProcessorNode(Processor<?> processor, Fields fields) {
        return addNode(makeProcessorNode(processor, fields));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Node addProcessorNode(Processor<?> processor, Fields fields, boolean z) {
        return addNode(makeProcessorNode(processor, fields, z));
    }

    String getStream() {
        return this.stream;
    }

    private ProcessorNode makeProcessorNode(Processor<?> processor, Fields fields) {
        return makeProcessorNode(processor, fields, false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProcessorNode makeProcessorNode(Processor<?> processor, Fields fields, boolean z) {
        return new ProcessorNode(processor, UniqueIdGen.getInstance().getUniqueStreamId(), fields, z);
    }

    private void addSinkNode(SinkNode sinkNode, int i) {
        sinkNode.setComponentId(UniqueIdGen.getInstance().getUniqueBoltId());
        sinkNode.setParallelism(i);
        if (this.node instanceof SpoutNode) {
            addNode(this.node, sinkNode, "default", i);
        } else {
            addNode(this.node, sinkNode, i);
        }
    }

    private Stream<T> global() {
        return new Stream<>(this.streamBuilder, addNode(new PartitionNode(this.stream, this.node.getOutputFields(), GroupingInfo.global())));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Stream<T> partitionBy(Fields fields, int i) {
        return new Stream<>(this.streamBuilder, addNode(this.node, new PartitionNode(this.stream, this.node.getOutputFields(), GroupingInfo.fields(fields)), i));
    }

    private boolean shouldPartition() {
        return this.node.getParallelism() > 1;
    }

    private <A> Stream<A> combinePartition(CombinerAggregator<? super T, A, ?> combinerAggregator) {
        return new Stream<>(this.streamBuilder, addProcessorNode(new AggregateProcessor(combinerAggregator, true), VALUE, true));
    }

    private <R> Stream<R> merge(CombinerAggregator<?, T, ? extends R> combinerAggregator) {
        return new Stream<>(this.streamBuilder, addProcessorNode(new MergeAggregateProcessor(combinerAggregator), VALUE));
    }

    private <A, R> Stream<R> aggregatePartition(CombinerAggregator<? super T, A, ? extends R> combinerAggregator) {
        return new Stream<>(this.streamBuilder, addProcessorNode(new AggregateProcessor(combinerAggregator), VALUE));
    }

    private Stream<T> reducePartition(Reducer<T> reducer) {
        return new Stream<>(this.streamBuilder, addProcessorNode(new ReduceProcessor(reducer), VALUE));
    }

    private <A, R> Stream<R> combine(CombinerAggregator<? super T, A, ? extends R> combinerAggregator) {
        if (!shouldPartition()) {
            return aggregatePartition(combinerAggregator);
        }
        if (this.node instanceof ProcessorNode) {
            if (this.node.isWindowed()) {
                return combinePartition(combinerAggregator).global().merge(combinerAggregator);
            }
        } else if (this.node instanceof WindowNode) {
            Set<Node> parents = this.node.getParents();
            if (!parents.stream().filter(node -> {
                return !node.isWindowed();
            }).findAny().isPresent()) {
                parents.forEach(node2 -> {
                    this.streamBuilder.insert(node2, makeProcessorNode(new AggregateProcessor(combinerAggregator, true), VALUE, true));
                });
                return global().merge(combinerAggregator);
            }
        }
        return global().aggregatePartition(combinerAggregator);
    }

    private Stream<T> combine(Reducer<T> reducer) {
        if (!shouldPartition()) {
            return reducePartition(reducer);
        }
        if (this.node instanceof ProcessorNode) {
            if (this.node.isWindowed()) {
                return reducePartition(reducer).global().reducePartition(reducer);
            }
        } else if (this.node instanceof WindowNode) {
            for (Node node : this.node.getParents()) {
                if (node.isWindowed()) {
                    this.streamBuilder.insert(node, makeProcessorNode(new ReduceProcessor(reducer), VALUE));
                }
            }
        }
        return global().reducePartition(reducer);
    }
}
