package org.apache.storm.streams;

import java.util.ArrayList;
import java.util.Set;
import org.apache.storm.annotation.InterfaceStability;
import org.apache.storm.streams.operations.BiFunction;
import org.apache.storm.streams.operations.CombinerAggregator;
import org.apache.storm.streams.operations.Consumer;
import org.apache.storm.streams.operations.FlatMapFunction;
import org.apache.storm.streams.operations.Function;
import org.apache.storm.streams.operations.PairValueJoiner;
import org.apache.storm.streams.operations.Predicate;
import org.apache.storm.streams.operations.Reducer;
import org.apache.storm.streams.operations.StateUpdater;
import org.apache.storm.streams.operations.ValueJoiner;
import org.apache.storm.streams.operations.aggregators.Count;
import org.apache.storm.streams.processors.AggregateByKeyProcessor;
import org.apache.storm.streams.processors.CoGroupByKeyProcessor;
import org.apache.storm.streams.processors.FlatMapValuesProcessor;
import org.apache.storm.streams.processors.JoinProcessor;
import org.apache.storm.streams.processors.MapValuesProcessor;
import org.apache.storm.streams.processors.MergeAggregateByKeyProcessor;
import org.apache.storm.streams.processors.ReduceByKeyProcessor;
import org.apache.storm.streams.processors.UpdateStateByKeyProcessor;
import org.apache.storm.streams.windowing.Window;
import org.apache.storm.tuple.Fields;

@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/storm/streams/PairStream.class */
public class PairStream<K, V> extends Stream<Pair<K, V>> {

    /* loaded from: input_file:org/apache/storm/streams/PairStream$MergeValues.class */
    private static class MergeValues<V> implements CombinerAggregator<V, ArrayList<V>, ArrayList<V>> {
        private MergeValues() {
        }

        @Override // org.apache.storm.streams.operations.CombinerAggregator
        public ArrayList<V> init() {
            return new ArrayList<>();
        }

        public ArrayList<V> apply(ArrayList<V> arrayList, V v) {
            arrayList.add(v);
            return arrayList;
        }

        @Override // org.apache.storm.streams.operations.CombinerAggregator
        public ArrayList<V> merge(ArrayList<V> arrayList, ArrayList<V> arrayList2) {
            ArrayList<V> arrayList3 = new ArrayList<>();
            arrayList3.addAll(arrayList);
            arrayList3.addAll(arrayList2);
            return arrayList3;
        }

        @Override // org.apache.storm.streams.operations.CombinerAggregator
        public ArrayList<V> result(ArrayList<V> arrayList) {
            return arrayList;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.storm.streams.operations.CombinerAggregator
        public /* bridge */ /* synthetic */ Object apply(Object obj, Object obj2) {
            return apply((ArrayList<ArrayList<V>>) obj, (ArrayList<V>) obj2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PairStream(StreamBuilder streamBuilder, Node node) {
        super(streamBuilder, node);
        node.setEmitsPair(true);
    }

    public <R> PairStream<K, R> mapValues(Function<? super V, ? extends R> function) {
        return new PairStream<>(this.streamBuilder, addProcessorNode(new MapValuesProcessor(function), KEY_VALUE, true));
    }

    public <R> PairStream<K, R> flatMapValues(FlatMapFunction<? super V, ? extends R> flatMapFunction) {
        return new PairStream<>(this.streamBuilder, addProcessorNode(new FlatMapValuesProcessor(flatMapFunction), KEY_VALUE, true));
    }

    public <R> PairStream<K, R> aggregateByKey(R r, BiFunction<? super R, ? super V, ? extends R> biFunction, BiFunction<? super R, ? super R, ? extends R> biFunction2) {
        return combineByKey(CombinerAggregator.of(r, biFunction, biFunction2));
    }

    public <A, R> PairStream<K, R> aggregateByKey(CombinerAggregator<? super V, A, ? extends R> combinerAggregator) {
        return combineByKey(combinerAggregator);
    }

    public PairStream<K, Long> countByKey() {
        return (PairStream<K, Long>) aggregateByKey(new Count());
    }

    public PairStream<K, V> reduceByKey(Reducer<V> reducer) {
        return combineByKey(reducer);
    }

    public PairStream<K, Iterable<V>> groupByKey() {
        return (PairStream<K, Iterable<V>>) partitionByKey().aggregatePartition((CombinerAggregator) new MergeValues());
    }

    public PairStream<K, Iterable<V>> groupByKeyAndWindow(Window<?, ?> window) {
        return (PairStream<K, Iterable<V>>) partitionByKey().window(window).aggregatePartition((CombinerAggregator) new MergeValues());
    }

    public PairStream<K, V> reduceByKeyAndWindow(Reducer<V> reducer, Window<?, ?> window) {
        return partitionByKey().window(window).reduceByKey(reducer);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.storm.streams.Stream
    public PairStream<K, V> peek(Consumer<? super Pair<K, V>> consumer) {
        return toPairStream(super.peek((Consumer) consumer));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.storm.streams.Stream
    public PairStream<K, V> filter(Predicate<? super Pair<K, V>> predicate) {
        return toPairStream(super.filter((Predicate) predicate));
    }

    public <V1> PairStream<K, Pair<V, V1>> join(PairStream<K, V1> pairStream) {
        return (PairStream<K, Pair<V, V1>>) join(pairStream, new PairValueJoiner());
    }

    public <R, V1> PairStream<K, R> join(PairStream<K, V1> pairStream, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner) {
        return partitionByKey().joinPartition(pairStream.partitionByKey(), valueJoiner, JoinProcessor.JoinType.INNER, JoinProcessor.JoinType.INNER);
    }

    public <V1> PairStream<K, Pair<V, V1>> leftOuterJoin(PairStream<K, V1> pairStream) {
        return (PairStream<K, Pair<V, V1>>) leftOuterJoin(pairStream, new PairValueJoiner());
    }

    public <R, V1> PairStream<K, R> leftOuterJoin(PairStream<K, V1> pairStream, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner) {
        return partitionByKey().joinPartition(pairStream.partitionByKey(), valueJoiner, JoinProcessor.JoinType.OUTER, JoinProcessor.JoinType.INNER);
    }

    public <V1> PairStream<K, Pair<V, V1>> rightOuterJoin(PairStream<K, V1> pairStream) {
        return (PairStream<K, Pair<V, V1>>) rightOuterJoin(pairStream, new PairValueJoiner());
    }

    public <R, V1> PairStream<K, R> rightOuterJoin(PairStream<K, V1> pairStream, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner) {
        return partitionByKey().joinPartition(pairStream.partitionByKey(), valueJoiner, JoinProcessor.JoinType.INNER, JoinProcessor.JoinType.OUTER);
    }

    public <V1> PairStream<K, Pair<V, V1>> fullOuterJoin(PairStream<K, V1> pairStream) {
        return (PairStream<K, Pair<V, V1>>) fullOuterJoin(pairStream, new PairValueJoiner());
    }

    public <R, V1> PairStream<K, R> fullOuterJoin(PairStream<K, V1> pairStream, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner) {
        return partitionByKey().joinPartition(pairStream.partitionByKey(), valueJoiner, JoinProcessor.JoinType.OUTER, JoinProcessor.JoinType.OUTER);
    }

    @Override // org.apache.storm.streams.Stream
    public PairStream<K, V> window(Window<?, ?> window) {
        return toPairStream(super.window(window));
    }

    @Override // org.apache.storm.streams.Stream
    public PairStream<K, V> repartition(int i) {
        return toPairStream(super.repartition(i));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.storm.streams.Stream
    public PairStream<K, V>[] branch(Predicate<? super Pair<K, V>>... predicateArr) {
        ArrayList arrayList = new ArrayList();
        for (Stream<Pair<K, V>> stream : super.branch((Predicate[]) predicateArr)) {
            arrayList.add(toPairStream(stream));
        }
        return (PairStream[]) arrayList.toArray(new PairStream[arrayList.size()]);
    }

    public <R> StreamState<K, R> updateStateByKey(R r, BiFunction<? super R, ? super V, ? extends R> biFunction) {
        return updateStateByKey(StateUpdater.of(r, biFunction));
    }

    public <R> StreamState<K, R> updateStateByKey(StateUpdater<? super V, ? extends R> stateUpdater) {
        return partitionBy(KEY).updateStateByKeyPartition(stateUpdater);
    }

    public <V1> PairStream<K, Pair<Iterable<V>, Iterable<V1>>> coGroupByKey(PairStream<K, V1> pairStream) {
        return (PairStream<K, Pair<Iterable<V>, Iterable<V1>>>) partitionByKey().coGroupByKeyPartition(pairStream);
    }

    private <R> StreamState<K, R> updateStateByKeyPartition(StateUpdater<? super V, ? extends R> stateUpdater) {
        return new StreamState<>(new PairStream(this.streamBuilder, addProcessorNode(new UpdateStateByKeyProcessor(stateUpdater), KEY_VALUE, true)));
    }

    private <R, V1> PairStream<K, R> joinPartition(PairStream<K, V1> pairStream, ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner, JoinProcessor.JoinType joinType, JoinProcessor.JoinType joinType2) {
        Node addProcessorNode = addProcessorNode(new JoinProcessor(this.stream, pairStream.stream, valueJoiner, joinType, joinType2), KEY_VALUE, true);
        addNode(pairStream.getNode(), addProcessorNode, addProcessorNode.getParallelism());
        return new PairStream<>(this.streamBuilder, addProcessorNode);
    }

    private <R, V1> PairStream<K, R> coGroupByKeyPartition(PairStream<K, V1> pairStream) {
        Node addProcessorNode = addProcessorNode(new CoGroupByKeyProcessor(this.stream, pairStream.stream), KEY_VALUE, true);
        addNode(pairStream.getNode(), addProcessorNode, addProcessorNode.getParallelism());
        return new PairStream<>(this.streamBuilder, addProcessorNode);
    }

    private PairStream<K, V> partitionByKey() {
        return shouldPartitionByKey() ? partitionBy(KEY) : this;
    }

    private boolean shouldPartitionByKey() {
        if (this.node.getParallelism() == 1) {
            return false;
        }
        if (!(this.node instanceof ProcessorNode)) {
            return true;
        }
        ProcessorNode processorNode = (ProcessorNode) this.node;
        Fields fields = processorNode.getGroupingInfo() == null ? null : processorNode.getGroupingInfo().getFields();
        return (processorNode.isPreservesKey() && fields != null && fields.equals(KEY)) ? false : true;
    }

    private PairStream<K, V> partitionBy(Fields fields) {
        return toPairStream(partitionBy(fields, this.node.parallelism));
    }

    private PairStream<K, V> toPairStream(Stream<Pair<K, V>> stream) {
        return new PairStream<>(stream.streamBuilder, stream.node);
    }

    private <A, R> PairStream<K, R> aggregatePartition(CombinerAggregator<? super V, A, ? extends R> combinerAggregator) {
        return new PairStream<>(this.streamBuilder, addProcessorNode(new AggregateByKeyProcessor(combinerAggregator), KEY_VALUE, true));
    }

    private <A> PairStream<K, A> combinePartition(CombinerAggregator<? super V, A, ?> combinerAggregator) {
        return new PairStream<>(this.streamBuilder, addProcessorNode(new AggregateByKeyProcessor(combinerAggregator, true), KEY_VALUE, true));
    }

    private <R> PairStream<K, R> merge(CombinerAggregator<?, V, ? extends R> combinerAggregator) {
        return new PairStream<>(this.streamBuilder, addProcessorNode(new MergeAggregateByKeyProcessor(combinerAggregator), KEY_VALUE, true));
    }

    private PairStream<K, V> reducePartition(Reducer<V> reducer) {
        return new PairStream<>(this.streamBuilder, addProcessorNode(new ReduceByKeyProcessor(reducer), KEY_VALUE, true));
    }

    private <A, R> PairStream<K, R> combineByKey(CombinerAggregator<? super V, A, ? extends R> combinerAggregator) {
        if (!shouldPartitionByKey()) {
            return aggregatePartition((CombinerAggregator) combinerAggregator);
        }
        if (this.node instanceof ProcessorNode) {
            if (this.node.isWindowed()) {
                return combinePartition((CombinerAggregator) combinerAggregator).partitionBy(KEY).merge((CombinerAggregator<?, A, ? extends R>) combinerAggregator);
            }
        } else if (this.node instanceof WindowNode) {
            Set<Node> parents = this.node.getParents();
            if (!parents.stream().filter(node -> {
                return !node.isWindowed();
            }).findAny().isPresent()) {
                parents.forEach(node2 -> {
                    this.streamBuilder.insert(node2, makeProcessorNode(new AggregateByKeyProcessor(combinerAggregator, true), KEY_VALUE, true));
                });
                return partitionBy(KEY).merge((CombinerAggregator) combinerAggregator);
            }
        }
        return partitionBy(KEY).aggregatePartition((CombinerAggregator) combinerAggregator);
    }

    private PairStream<K, V> combineByKey(Reducer<V> reducer) {
        if (!shouldPartitionByKey()) {
            return reducePartition((Reducer) reducer);
        }
        if (this.node instanceof ProcessorNode) {
            if (this.node.isWindowed()) {
                return reducePartition((Reducer) reducer).partitionBy(KEY).reducePartition((Reducer) reducer);
            }
        } else if (this.node instanceof WindowNode) {
            for (Node node : this.node.getParents()) {
                if (node.isWindowed()) {
                    this.streamBuilder.insert(node, makeProcessorNode(new ReduceByKeyProcessor(reducer), KEY_VALUE, true));
                }
            }
        }
        return partitionBy(KEY).reducePartition((Reducer) reducer);
    }

    @Override // org.apache.storm.streams.Stream
    public /* bridge */ /* synthetic */ Stream window(Window window) {
        return window((Window<?, ?>) window);
    }
}
