package org.apache.beam.runners.direct;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItemCoder;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItems;
import org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformReplacements;
import org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation;
import org.apache.beam.repackaged.direct_java.runners.core.construction.ReplacementOutputs;
import org.apache.beam.repackaged.direct_java.runners.core.construction.SplittableParDo;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;

@VisibleForTesting
/* loaded from: input_file:org/apache/beam/runners/direct/ParDoMultiOverrideFactory.class */
public class ParDoMultiOverrideFactory<InputT, OutputT> implements PTransformOverrideFactory<PCollection<? extends InputT>, PCollectionTuple, PTransform<PCollection<? extends InputT>, PCollectionTuple>> {
    static final String DIRECT_STATEFUL_PAR_DO_URN = "beam:directrunner:transforms:stateful_pardo:v1";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ParDoMultiOverrideFactory$GbkThenStatefulParDo.class */
    public static class GbkThenStatefulParDo<K, InputT, OutputT> extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> {
        private final transient DoFn<KV<K, InputT>, OutputT> doFn;
        private final TupleTagList additionalOutputTags;
        private final TupleTag<OutputT> mainOutputTag;
        private final List<PCollectionView<?>> sideInputs;
        private final DoFnSchemaInformation doFnSchemaInformation;
        private final Map<String, PCollectionView<?>> sideInputMapping;

        public GbkThenStatefulParDo(DoFn<KV<K, InputT>, OutputT> doFn, TupleTag<OutputT> tupleTag, TupleTagList tupleTagList, List<PCollectionView<?>> list, DoFnSchemaInformation doFnSchemaInformation, Map<String, PCollectionView<?>> map) {
            this.doFn = doFn;
            this.additionalOutputTags = tupleTagList;
            this.mainOutputTag = tupleTag;
            this.sideInputs = list;
            this.doFnSchemaInformation = doFnSchemaInformation;
            this.sideInputMapping = map;
        }

        public Map<TupleTag<?>, PValue> getAdditionalInputs() {
            return PCollectionViews.toAdditionalInputs(this.sideInputs);
        }

        public PCollectionTuple expand(PCollection<KV<K, InputT>> pCollection) {
            WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy();
            Preconditions.checkState(pCollection.getCoder() instanceof KvCoder, "Input to a %s using state requires a %s, but the coder was %s", ParDo.class.getSimpleName(), KvCoder.class.getSimpleName(), pCollection.getCoder());
            KvCoder coder = pCollection.getCoder();
            Coder keyCoder = coder.getKeyCoder();
            Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
            return pCollection.apply("Reify timestamps", ParDo.of(new ReifyWindowedValueFn())).setCoder(KvCoder.of(keyCoder, WindowedValue.getFullCoder(coder, windowCoder))).apply(Window.configure().triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes().withAllowedLateness(windowingStrategy.getAllowedLateness()).withTimestampCombiner(TimestampCombiner.EARLIEST)).apply("Group by key", GroupByKey.create()).apply("To KeyedWorkItem", ParDo.of(new ToKeyedWorkItem())).setCoder(KeyedWorkItemCoder.of(keyCoder, coder, windowCoder)).setWindowingStrategyInternal(windowingStrategy).apply("Stateful ParDo", new StatefulParDo(this.doFn, this.mainOutputTag, this.additionalOutputTags, this.sideInputs, this.doFnSchemaInformation, this.sideInputMapping));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ParDoMultiOverrideFactory$ReifyWindowedValueFn.class */
    public static class ReifyWindowedValueFn<K, V> extends DoFn<KV<K, V>, KV<K, WindowedValue<KV<K, V>>>> {
        ReifyWindowedValueFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<K, V>, KV<K, WindowedValue<KV<K, V>>>>.ProcessContext processContext, BoundedWindow boundedWindow) {
            processContext.output(KV.of(((KV) processContext.element()).getKey(), WindowedValue.of((KV) processContext.element(), processContext.timestamp(), boundedWindow, processContext.pane())));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ParDoMultiOverrideFactory$StatefulParDo.class */
    public static class StatefulParDo<K, InputT, OutputT> extends PTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple> {
        private final transient DoFn<KV<K, InputT>, OutputT> doFn;
        private final TupleTagList additionalOutputTags;
        private final TupleTag<OutputT> mainOutputTag;
        private final List<PCollectionView<?>> sideInputs;
        private final DoFnSchemaInformation doFnSchemaInformation;
        private final Map<String, PCollectionView<?>> sideInputMapping;

        public StatefulParDo(DoFn<KV<K, InputT>, OutputT> doFn, TupleTag<OutputT> tupleTag, TupleTagList tupleTagList, List<PCollectionView<?>> list, DoFnSchemaInformation doFnSchemaInformation, Map<String, PCollectionView<?>> map) {
            this.doFn = doFn;
            this.mainOutputTag = tupleTag;
            this.additionalOutputTags = tupleTagList;
            this.sideInputs = list;
            this.doFnSchemaInformation = doFnSchemaInformation;
            this.sideInputMapping = map;
        }

        public DoFn<KV<K, InputT>, OutputT> getDoFn() {
            return this.doFn;
        }

        public TupleTag<OutputT> getMainOutputTag() {
            return this.mainOutputTag;
        }

        public List<PCollectionView<?>> getSideInputs() {
            return this.sideInputs;
        }

        public TupleTagList getAdditionalOutputTags() {
            return this.additionalOutputTags;
        }

        public DoFnSchemaInformation getSchemaInformation() {
            return this.doFnSchemaInformation;
        }

        public Map<String, PCollectionView<?>> getSideInputMapping() {
            return this.sideInputMapping;
        }

        public Map<TupleTag<?>, PValue> getAdditionalInputs() {
            return PCollectionViews.toAdditionalInputs(this.sideInputs);
        }

        public PCollectionTuple expand(PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>> pCollection) {
            return PCollectionTuple.ofPrimitiveOutputsInternal(pCollection.getPipeline(), TupleTagList.of(getMainOutputTag()).and(getAdditionalOutputTags().getAll()), Collections.emptyMap(), pCollection.getWindowingStrategy(), pCollection.isBounded());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ParDoMultiOverrideFactory$ToKeyedWorkItem.class */
    public static class ToKeyedWorkItem<K, V> extends DoFn<KV<K, Iterable<WindowedValue<KV<K, V>>>>, KeyedWorkItem<K, KV<K, V>>> {
        ToKeyedWorkItem() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<K, Iterable<WindowedValue<KV<K, V>>>>, KeyedWorkItem<K, KV<K, V>>>.ProcessContext processContext, BoundedWindow boundedWindow) {
            processContext.output(KeyedWorkItems.elementsWorkItem(((KV) processContext.element()).getKey(), (Iterable) ((KV) processContext.element()).getValue()));
        }
    }

    public PTransformOverrideFactory.PTransformReplacement<PCollection<? extends InputT>, PCollectionTuple> getReplacementTransform(AppliedPTransform<PCollection<? extends InputT>, PCollectionTuple, PTransform<PCollection<? extends InputT>, PCollectionTuple>> appliedPTransform) {
        try {
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(appliedPTransform), getReplacementForApplication(appliedPTransform));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementForApplication(AppliedPTransform<PCollection<? extends InputT>, PCollectionTuple, PTransform<PCollection<? extends InputT>, PCollectionTuple>> appliedPTransform) throws IOException {
        DoFn<?, ?> doFn = ParDoTranslation.getDoFn((AppliedPTransform<?, ?, ?>) appliedPTransform);
        DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
        return signature.processElement().isSplittable() ? SplittableParDo.forAppliedParDo(appliedPTransform) : (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) ? new GbkThenStatefulParDo(doFn, ParDoTranslation.getMainOutputTag((AppliedPTransform<?, ?, ?>) appliedPTransform), ParDoTranslation.getAdditionalOutputTags(appliedPTransform), ParDoTranslation.getSideInputs(appliedPTransform), ParDoTranslation.getSchemaInformation((AppliedPTransform<?, ?, ?>) appliedPTransform), ParDoTranslation.getSideInputMapping((AppliedPTransform<?, ?, ?>) appliedPTransform)) : appliedPTransform.getTransform();
    }

    public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> map, PCollectionTuple pCollectionTuple) {
        return ReplacementOutputs.tagged(map, pCollectionTuple);
    }

    public /* bridge */ /* synthetic */ Map mapOutputs(Map map, POutput pOutput) {
        return mapOutputs((Map<TupleTag<?>, PValue>) map, (PCollectionTuple) pOutput);
    }
}
