package org.apache.beam.runners.flink;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.adapter.FlinkKey;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToFlinkKeyKeySelector;
import org.apache.beam.runners.flink.translation.wrappers.streaming.PartialReduceBundleOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingAggregationsTranslators.class */
public class FlinkStreamingAggregationsTranslators {

    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingAggregationsTranslators$ConcatenateAsIterable.class */
    public static class ConcatenateAsIterable<T> extends Combine.CombineFn<T, Iterable<T>, Iterable<T>> {
        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Iterable<T> m17createAccumulator() {
            return new ArrayList();
        }

        public Iterable<T> addInput(Iterable<T> iterable, T t) {
            ArrayList newArrayList = Lists.newArrayList(iterable);
            newArrayList.add(t);
            return newArrayList;
        }

        /* renamed from: mergeAccumulators, reason: merged with bridge method [inline-methods] */
        public Iterable<T> m16mergeAccumulators(Iterable<Iterable<T>> iterable) {
            return Iterables.concat(iterable);
        }

        public Iterable<T> extractOutput(Iterable<T> iterable) {
            return iterable;
        }

        public Coder<Iterable<T>> getAccumulatorCoder(CoderRegistry coderRegistry, Coder<T> coder) {
            return IterableCoder.of(coder);
        }

        public Coder<Iterable<T>> getDefaultOutputCoder(CoderRegistry coderRegistry, Coder<T> coder) {
            return IterableCoder.of(coder);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ Object addInput(Object obj, Object obj2) {
            return addInput((Iterable<Iterable<T>>) obj, (Iterable<T>) obj2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingAggregationsTranslators$FlattenIterable.class */
    public static class FlattenIterable<K, InputT> implements FlatMapFunction<WindowedValue<KV<K, Iterable<Iterable<InputT>>>>, WindowedValue<KV<K, Iterable<InputT>>>> {
        private FlattenIterable() {
        }

        public void flatMap(WindowedValue<KV<K, Iterable<Iterable<InputT>>>> windowedValue, Collector<WindowedValue<KV<K, Iterable<InputT>>>> collector) throws Exception {
            collector.collect(windowedValue.withValue(KV.of(((KV) windowedValue.getValue()).getKey(), Iterables.concat((Iterable) ((KV) windowedValue.getValue()).getValue()))));
        }
    }

    private static <InputT, OutputT> CombineFnBase.GlobalCombineFn<Object, Object, OutputT> toFinalFlinkCombineFn(final CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> globalCombineFn, final Coder<InputT> coder) {
        if (globalCombineFn instanceof Combine.CombineFn) {
            return new Combine.CombineFn<Object, Object, OutputT>() { // from class: org.apache.beam.runners.flink.FlinkStreamingAggregationsTranslators.1
                final Combine.CombineFn<InputT, Object, OutputT> fn;

                {
                    this.fn = (Combine.CombineFn) globalCombineFn;
                }

                public Object createAccumulator() {
                    return this.fn.createAccumulator();
                }

                public Coder<Object> getAccumulatorCoder(CoderRegistry coderRegistry, Coder<Object> coder2) throws CannotProvideCoderException {
                    return this.fn.getAccumulatorCoder(coderRegistry, coder);
                }

                public Object addInput(Object obj, Object obj2) {
                    return this.fn.mergeAccumulators(ImmutableList.of(obj, obj2));
                }

                public Object mergeAccumulators(Iterable<Object> iterable) {
                    return this.fn.mergeAccumulators(iterable);
                }

                public OutputT extractOutput(Object obj) {
                    return (OutputT) this.fn.extractOutput(obj);
                }
            };
        }
        if (globalCombineFn instanceof CombineWithContext.CombineFnWithContext) {
            return new CombineWithContext.CombineFnWithContext<Object, Object, OutputT>() { // from class: org.apache.beam.runners.flink.FlinkStreamingAggregationsTranslators.2
                final CombineWithContext.CombineFnWithContext<InputT, Object, OutputT> fn;

                {
                    this.fn = (CombineWithContext.CombineFnWithContext) globalCombineFn;
                }

                public Object createAccumulator(CombineWithContext.Context context) {
                    return this.fn.createAccumulator(context);
                }

                public Coder<Object> getAccumulatorCoder(CoderRegistry coderRegistry, Coder<Object> coder2) throws CannotProvideCoderException {
                    return this.fn.getAccumulatorCoder(coderRegistry, coder);
                }

                public Object addInput(Object obj, Object obj2, CombineWithContext.Context context) {
                    return this.fn.mergeAccumulators(ImmutableList.of(obj, obj2), context);
                }

                public Object mergeAccumulators(Iterable<Object> iterable, CombineWithContext.Context context) {
                    return this.fn.mergeAccumulators(iterable, context);
                }

                public OutputT extractOutput(Object obj, CombineWithContext.Context context) {
                    return (OutputT) this.fn.extractOutput(obj, context);
                }
            };
        }
        throw new IllegalArgumentException("Unsupported CombineFn implementation: " + globalCombineFn.getClass());
    }

    public static <K, InputAccumT, OutputAccumT, InputT, OutputT> WindowDoFnOperator<K, InputAccumT, OutputAccumT> getWindowedAggregateDoFnOperator(FlinkStreamingTranslationContext flinkStreamingTranslationContext, PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> pTransform, KvCoder<K, InputAccumT> kvCoder, Coder<WindowedValue<KV<K, OutputAccumT>>> coder, SystemReduceFn<K, InputAccumT, ?, OutputAccumT, BoundedWindow> systemReduceFn, Map<Integer, PCollectionView<?>> map, List<PCollectionView<?>> list) {
        String currentTransformName = FlinkStreamingTransformTranslators.getCurrentTransformName(flinkStreamingTranslationContext);
        TupleTag tupleTag = new TupleTag("main output");
        WindowingStrategy windowingStrategy = flinkStreamingTranslationContext.getInput(pTransform).getWindowingStrategy();
        SerializablePipelineOptions serializablePipelineOptions = new SerializablePipelineOptions(flinkStreamingTranslationContext.getPipelineOptions());
        Coder keyCoder = kvCoder.getKeyCoder();
        return new WindowDoFnOperator<>(systemReduceFn, currentTransformName, WindowedValue.getFullCoder(SingletonKeyedWorkItemCoder.of(keyCoder, kvCoder.getValueCoder(), windowingStrategy.getWindowFn().windowCoder()), windowingStrategy.getWindowFn().windowCoder()), tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, coder, serializablePipelineOptions), windowingStrategy, map, list, flinkStreamingTranslationContext.getPipelineOptions(), keyCoder, new WorkItemKeySelector(keyCoder));
    }

    public static <K, InputAccumT, OutputAccumT, InputT, OutputT> WindowDoFnOperator<K, InputAccumT, OutputAccumT> getWindowedAggregateDoFnOperator(FlinkStreamingTranslationContext flinkStreamingTranslationContext, PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> pTransform, KvCoder<K, InputAccumT> kvCoder, Coder<WindowedValue<KV<K, OutputAccumT>>> coder, CombineFnBase.GlobalCombineFn<? super InputAccumT, ?, OutputAccumT> globalCombineFn, Map<Integer, PCollectionView<?>> map, List<PCollectionView<?>> list) {
        return getWindowedAggregateDoFnOperator(flinkStreamingTranslationContext, pTransform, kvCoder, coder, SystemReduceFn.combining(kvCoder.getKeyCoder(), AppliedCombineFn.withInputCoder(globalCombineFn, flinkStreamingTranslationContext.getInput(pTransform).getPipeline().getCoderRegistry(), kvCoder)), map, list);
    }

    public static <K, InputT, AccumT, OutputT> SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> getBatchCombinePerKeyOperator(FlinkStreamingTranslationContext flinkStreamingTranslationContext, PCollection<KV<K, InputT>> pCollection, Map<Integer, PCollectionView<?>> map, List<PCollectionView<?>> list, Coder<WindowedValue<KV<K, AccumT>>> coder, CombineFnBase.GlobalCombineFn<InputT, AccumT, ?> globalCombineFn, WindowDoFnOperator<K, AccumT, OutputT> windowDoFnOperator, TypeInformation<WindowedValue<KV<K, OutputT>>> typeInformation) {
        String currentTransformName = FlinkStreamingTransformTranslators.getCurrentTransformName(flinkStreamingTranslationContext);
        DataStream inputDataStream = flinkStreamingTranslationContext.getInputDataStream(pCollection);
        KvCoder coder2 = pCollection.getCoder();
        SerializablePipelineOptions serializablePipelineOptions = new SerializablePipelineOptions(flinkStreamingTranslationContext.getPipelineOptions());
        TupleTag tupleTag = new TupleTag("main output");
        String str = "Combine: " + currentTransformName;
        KvToFlinkKeyKeySelector kvToFlinkKeyKeySelector = new KvToFlinkKeyKeySelector(coder2.getKeyCoder());
        CoderTypeInformation coderTypeInformation = new CoderTypeInformation(coder, flinkStreamingTranslationContext.getPipelineOptions());
        PartialReduceBundleOperator partialReduceBundleOperator = new PartialReduceBundleOperator(globalCombineFn, currentTransformName, flinkStreamingTranslationContext.getWindowedInputCoder(pCollection), tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, coder, serializablePipelineOptions), pCollection.getWindowingStrategy(), map, list, flinkStreamingTranslationContext.getPipelineOptions());
        if (list.isEmpty()) {
            return inputDataStream.transform(str, coderTypeInformation, partialReduceBundleOperator).uid(str).name(str).keyBy(kvToFlinkKeyKeySelector).transform(currentTransformName, typeInformation, windowDoFnOperator).uid(currentTransformName).name(currentTransformName);
        }
        Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformSideInputs = FlinkStreamingTransformTranslators.transformSideInputs(list, flinkStreamingTranslationContext);
        TwoInputTransformation twoInputTransformation = new TwoInputTransformation(inputDataStream.getTransformation(), ((DataStream) transformSideInputs.f1).broadcast().getTransformation(), str, partialReduceBundleOperator, coderTypeInformation, inputDataStream.getParallelism());
        SingleOutputStreamOperator<WindowedValue<KV<K, AccumT>>> singleOutputStreamOperator = new SingleOutputStreamOperator<WindowedValue<KV<K, AccumT>>>(inputDataStream.getExecutionEnvironment(), twoInputTransformation) { // from class: org.apache.beam.runners.flink.FlinkStreamingAggregationsTranslators.3
        };
        inputDataStream.getExecutionEnvironment().addOperator(twoInputTransformation);
        return buildTwoInputStream(singleOutputStreamOperator.keyBy(kvToFlinkKeyKeySelector), (DataStream) transformSideInputs.f1, currentTransformName, windowDoFnOperator, typeInformation);
    }

    public static <K, InputT> SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> batchGroupByKey(FlinkStreamingTranslationContext flinkStreamingTranslationContext, PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> pTransform) {
        HashMap hashMap = new HashMap();
        List emptyList = Collections.emptyList();
        PCollection input = flinkStreamingTranslationContext.getInput(pTransform);
        KvCoder coder = input.getCoder();
        SerializablePipelineOptions serializablePipelineOptions = new SerializablePipelineOptions(flinkStreamingTranslationContext.getPipelineOptions());
        TypeInformation typeInfo = flinkStreamingTranslationContext.getTypeInfo(flinkStreamingTranslationContext.getOutput(pTransform));
        IterableCoder of = IterableCoder.of(coder.getValueCoder());
        KvCoder of2 = KvCoder.of(coder.getKeyCoder(), of);
        WindowedValue.FullWindowedValueCoder fullCoder = WindowedValue.getFullCoder(of2, input.getWindowingStrategy().getWindowFn().windowCoder());
        WindowedValue.FullWindowedValueCoder fullCoder2 = WindowedValue.getFullCoder(KvCoder.of(coder.getKeyCoder(), IterableCoder.of(of)), input.getWindowingStrategy().getWindowFn().windowCoder());
        CoderTypeInformation coderTypeInformation = new CoderTypeInformation((Coder) WindowedValue.getFullCoder(KvCoder.of(coder.getKeyCoder(), IterableCoder.of(IterableCoder.of(coder.getValueCoder()))), input.getWindowingStrategy().getWindowFn().windowCoder()), serializablePipelineOptions);
        return getBatchCombinePerKeyOperator(flinkStreamingTranslationContext, input, hashMap, emptyList, fullCoder, new ConcatenateAsIterable(), getWindowedAccumulateDoFnOperator(flinkStreamingTranslationContext, pTransform, of2, fullCoder2, hashMap, emptyList), coderTypeInformation).flatMap(new FlattenIterable(), typeInfo).name("concatenate");
    }

    private static <InputT, K> WindowDoFnOperator<K, Iterable<InputT>, Iterable<Iterable<InputT>>> getWindowedAccumulateDoFnOperator(FlinkStreamingTranslationContext flinkStreamingTranslationContext, PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> pTransform, KvCoder<K, Iterable<InputT>> kvCoder, Coder<WindowedValue<KV<K, Iterable<Iterable<InputT>>>>> coder, Map<Integer, PCollectionView<?>> map, List<PCollectionView<?>> list) {
        return getWindowedAggregateDoFnOperator(flinkStreamingTranslationContext, pTransform, kvCoder, coder, SystemReduceFn.buffering(kvCoder.getValueCoder()), map, list);
    }

    public static <K, InputT, AccumT, OutputT> SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> batchCombinePerKey(FlinkStreamingTranslationContext flinkStreamingTranslationContext, PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> pTransform, CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> globalCombineFn, Map<Integer, PCollectionView<?>> map, List<PCollectionView<?>> list) {
        PCollection input = flinkStreamingTranslationContext.getInput(pTransform);
        KvCoder coder = input.getCoder();
        TypeInformation typeInfo = flinkStreamingTranslationContext.getTypeInfo(flinkStreamingTranslationContext.getOutput(pTransform));
        Coder windowedInputCoder = flinkStreamingTranslationContext.getWindowedInputCoder(flinkStreamingTranslationContext.getOutput(pTransform));
        try {
            KvCoder of = KvCoder.of(coder.getKeyCoder(), globalCombineFn.getAccumulatorCoder(input.getPipeline().getCoderRegistry(), coder.getValueCoder()));
            return getBatchCombinePerKeyOperator(flinkStreamingTranslationContext, flinkStreamingTranslationContext.getInput(pTransform), map, list, WindowedValue.getFullCoder(of, input.getWindowingStrategy().getWindowFn().windowCoder()), globalCombineFn, getWindowedAggregateDoFnOperator(flinkStreamingTranslationContext, pTransform, of, windowedInputCoder, toFinalFlinkCombineFn(globalCombineFn, coder.getValueCoder()), map, list), typeInfo);
        } catch (CannotProvideCoderException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public static <K, InputT, OutputT> SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> buildTwoInputStream(KeyedStream<WindowedValue<KV<K, InputT>>, FlinkKey> keyedStream, DataStream<RawUnionValue> dataStream, String str, WindowDoFnOperator<K, InputT, OutputT> windowDoFnOperator, TypeInformation<WindowedValue<KV<K, OutputT>>> typeInformation) {
        TwoInputTransformation twoInputTransformation = new TwoInputTransformation(keyedStream.getTransformation(), dataStream.broadcast().getTransformation(), str, windowDoFnOperator, typeInformation, keyedStream.getParallelism());
        twoInputTransformation.setStateKeyType(keyedStream.getKeyType());
        twoInputTransformation.setStateKeySelectors(keyedStream.getKeySelector(), (KeySelector) null);
        SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> singleOutputStreamOperator = new SingleOutputStreamOperator(keyedStream.getExecutionEnvironment(), twoInputTransformation) { // from class: org.apache.beam.runners.flink.FlinkStreamingAggregationsTranslators.4
        };
        keyedStream.getExecutionEnvironment().addOperator(twoInputTransformation);
        return singleOutputStreamOperator;
    }

    public static <K, InputT, AccumT, OutputT> SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> batchCombinePerKeyNoSideInputs(FlinkStreamingTranslationContext flinkStreamingTranslationContext, PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> pTransform, CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> globalCombineFn) {
        return batchCombinePerKey(flinkStreamingTranslationContext, pTransform, globalCombineFn, new HashMap(), Collections.emptyList());
    }
}
