package org.apache.beam.runners.flink.translation.wrappers.streaming;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.runners.flink.translation.functions.AbstractFlinkCombineRunner;
import org.apache.beam.runners.flink.translation.functions.HashingFlinkCombineRunner;
import org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimap;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/PartialReduceBundleOperator.class */
public class PartialReduceBundleOperator<K, InputT, OutputT, AccumT> extends DoFnOperator<KV<K, InputT>, KV<K, InputT>, KV<K, AccumT>> {
    private final CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn;
    private Multimap<K, WindowedValue<KV<K, InputT>>> state;
    private transient ListState<WindowedValue<KV<K, InputT>>> checkpointedState;

    public PartialReduceBundleOperator(CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> globalCombineFn, String str, Coder<WindowedValue<KV<K, InputT>>> coder, TupleTag<KV<K, AccumT>> tupleTag, List<TupleTag<?>> list, DoFnOperator.OutputManagerFactory<KV<K, AccumT>> outputManagerFactory, WindowingStrategy<?, ?> windowingStrategy, Map<Integer, PCollectionView<?>> map, Collection<PCollectionView<?>> collection, PipelineOptions pipelineOptions) {
        super(null, str, coder, Collections.emptyMap(), tupleTag, list, outputManagerFactory, windowingStrategy, map, collection, pipelineOptions, null, null, DoFnSchemaInformation.create(), Collections.emptyMap());
        this.combineFn = globalCombineFn;
        this.state = ArrayListMultimap.create();
        this.checkpointedState = null;
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    public void open() throws Exception {
        clearState();
        setBundleFinishedCallback(this::finishBundle);
        super.open();
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    protected boolean shoudBundleElements() {
        return true;
    }

    private void finishBundle() {
        try {
            AbstractFlinkCombineRunner sortingFlinkCombineRunner = (this.windowingStrategy.needsMerge() && (this.windowingStrategy.getWindowFn() instanceof Sessions)) ? new SortingFlinkCombineRunner() : new HashingFlinkCombineRunner();
            Iterator it = this.state.asMap().entrySet().iterator();
            while (it.hasNext()) {
                sortingFlinkCombineRunner.combine(new AbstractFlinkCombineRunner.PartialFlinkCombiner(this.combineFn), this.windowingStrategy, this.sideInputReader, this.serializedOptions.get(), (Iterable) ((Map.Entry) it.next()).getValue(), new Collector<WindowedValue<KV<K, AccumT>>>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.PartialReduceBundleOperator.1
                    public void collect(WindowedValue<KV<K, AccumT>> windowedValue) {
                        PartialReduceBundleOperator.this.outputManager.output(PartialReduceBundleOperator.this.mainOutputTag, windowedValue);
                    }

                    public void close() {
                    }
                });
            }
            clearState();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void clearState() {
        this.state = ArrayListMultimap.create();
        if (this.checkpointedState != null) {
            this.checkpointedState.clear();
        }
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.checkpointedState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("buffered-elements", new CoderTypeSerializer(this.windowedInputCoder, this.serializedOptions)));
        if (!stateInitializationContext.isRestored() || this.checkpointedState == null) {
            return;
        }
        for (WindowedValue windowedValue : (Iterable) this.checkpointedState.get()) {
            this.state.put(((KV) windowedValue.getValue()).getKey(), windowedValue);
        }
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        if (this.checkpointedState != null) {
            this.checkpointedState.update(new ArrayList(this.state.values()));
        }
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    protected DoFn<KV<K, InputT>, KV<K, AccumT>> getDoFn() {
        return new DoFn<KV<K, InputT>, KV<K, AccumT>>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.PartialReduceBundleOperator.2
            @DoFn.ProcessElement
            public void processElement(DoFn<KV<K, InputT>, KV<K, AccumT>>.ProcessContext processContext, BoundedWindow boundedWindow) throws Exception {
                PartialReduceBundleOperator.this.state.put(((KV) Objects.requireNonNull((KV) processContext.element())).getKey(), WindowedValue.of((KV) processContext.element(), processContext.timestamp(), boundedWindow, processContext.pane()));
            }
        };
    }
}
