/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.MapFnRunners;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.PrecombineGroupingTable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

public class CombineRunners {
    static <KeyT, AccumT> @UnknownKeyFor @NonNull @Initialized ThrowingFunction<@UnknownKeyFor @NonNull @Initialized KV<KeyT, @UnknownKeyFor @NonNull @Initialized Iterable<AccumT>>, @UnknownKeyFor @NonNull @Initialized KV<KeyT, AccumT>> createMergeAccumulatorsMapFunction(@UnknownKeyFor @NonNull @Initialized String pTransformId, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.PTransform pTransform) throws @UnknownKeyFor @NonNull @Initialized IOException {
        RunnerApi.CombinePayload combinePayload = RunnerApi.CombinePayload.parseFrom((ByteString)pTransform.getSpec().getPayload());
        Combine.CombineFn combineFn = (Combine.CombineFn)SerializableUtils.deserializeFromByteArray((byte[])combinePayload.getCombineFn().getPayload().toByteArray(), (String)"CombineFn");
        return input -> KV.of((Object)input.getKey(), (Object)combineFn.mergeAccumulators((Iterable)input.getValue()));
    }

    static <KeyT, AccumT, OutputT> @UnknownKeyFor @NonNull @Initialized ThrowingFunction<@UnknownKeyFor @NonNull @Initialized KV<KeyT, AccumT>, @UnknownKeyFor @NonNull @Initialized KV<KeyT, OutputT>> createExtractOutputsMapFunction(@UnknownKeyFor @NonNull @Initialized String pTransformId, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.PTransform pTransform) throws @UnknownKeyFor @NonNull @Initialized IOException {
        RunnerApi.CombinePayload combinePayload = RunnerApi.CombinePayload.parseFrom((ByteString)pTransform.getSpec().getPayload());
        Combine.CombineFn combineFn = (Combine.CombineFn)SerializableUtils.deserializeFromByteArray((byte[])combinePayload.getCombineFn().getPayload().toByteArray(), (String)"CombineFn");
        return input -> KV.of((Object)input.getKey(), (Object)combineFn.extractOutput(input.getValue()));
    }

    static <KeyT, InputT, AccumT> @UnknownKeyFor @NonNull @Initialized ThrowingFunction<@UnknownKeyFor @NonNull @Initialized KV<KeyT, InputT>, @UnknownKeyFor @NonNull @Initialized KV<KeyT, AccumT>> createConvertToAccumulatorsMapFunction(@UnknownKeyFor @NonNull @Initialized String pTransformId, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.PTransform pTransform) throws @UnknownKeyFor @NonNull @Initialized IOException {
        RunnerApi.CombinePayload combinePayload = RunnerApi.CombinePayload.parseFrom((ByteString)pTransform.getSpec().getPayload());
        Combine.CombineFn combineFn = (Combine.CombineFn)SerializableUtils.deserializeFromByteArray((byte[])combinePayload.getCombineFn().getPayload().toByteArray(), (String)"CombineFn");
        return input -> KV.of((Object)input.getKey(), (Object)combineFn.addInput(combineFn.createAccumulator(), input.getValue()));
    }

    static <KeyT, InputT, AccumT, OutputT> @UnknownKeyFor @NonNull @Initialized ThrowingFunction<@UnknownKeyFor @NonNull @Initialized KV<KeyT, @UnknownKeyFor @NonNull @Initialized Iterable<InputT>>, @UnknownKeyFor @NonNull @Initialized KV<KeyT, OutputT>> createCombineGroupedValuesMapFunction(@UnknownKeyFor @NonNull @Initialized String pTransformId, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.PTransform pTransform) throws @UnknownKeyFor @NonNull @Initialized IOException {
        RunnerApi.CombinePayload combinePayload = RunnerApi.CombinePayload.parseFrom((ByteString)pTransform.getSpec().getPayload());
        Combine.CombineFn combineFn = (Combine.CombineFn)SerializableUtils.deserializeFromByteArray((byte[])combinePayload.getCombineFn().getPayload().toByteArray(), (String)"CombineFn");
        return input -> KV.of((Object)input.getKey(), (Object)combineFn.apply((Iterable)input.getValue()));
    }

    @VisibleForTesting
    public static class PrecombineFactory<@UnknownKeyFor KeyT, @UnknownKeyFor InputT, @UnknownKeyFor AccumT>
    implements PTransformRunnerFactory<PrecombineRunner<KeyT, InputT, AccumT>> {
        @Override
        public @UnknownKeyFor @NonNull @Initialized PrecombineRunner<KeyT, InputT, AccumT> createRunnerForPTransform(@UnknownKeyFor @NonNull @Initialized PTransformRunnerFactory.Context context) throws @UnknownKeyFor @NonNull @Initialized IOException {
            RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents(RunnerApi.Components.newBuilder().putAllCoders(context.getCoders()).putAllWindowingStrategies(context.getWindowingStrategies()).build());
            String mainInputTag = (String)Iterables.getOnlyElement(context.getPTransform().getInputsMap().keySet());
            RunnerApi.PCollection mainInput = context.getPCollections().get(context.getPTransform().getInputsOrThrow(mainInputTag));
            Coder<?> uncastInputCoder = rehydratedComponents.getCoder(mainInput.getCoderId());
            boolean isGloballyWindowed = rehydratedComponents.getWindowingStrategy(mainInput.getWindowingStrategyId()).getWindowFn().equals(new GlobalWindows());
            KvCoder inputCoder = uncastInputCoder instanceof WindowedValue.WindowedValueCoder ? (KvCoder)((WindowedValue.WindowedValueCoder)uncastInputCoder).getValueCoder() : (KvCoder)rehydratedComponents.getCoder(mainInput.getCoderId());
            Coder keyCoder = inputCoder.getKeyCoder();
            RunnerApi.CombinePayload combinePayload = RunnerApi.CombinePayload.parseFrom((ByteString)context.getPTransform().getSpec().getPayload());
            Combine.CombineFn combineFn = (Combine.CombineFn)SerializableUtils.deserializeFromByteArray((byte[])combinePayload.getCombineFn().getPayload().toByteArray(), (String)"CombineFn");
            FnDataReceiver consumer = context.getPCollectionConsumer((String)Iterables.getOnlyElement(context.getPTransform().getOutputsMap().values()));
            PrecombineRunner runner = new PrecombineRunner(context.getPipelineOptions(), context.getPTransformId(), context.getBundleCacheSupplier(), combineFn, consumer, keyCoder, isGloballyWindowed);
            context.addStartBundleFunction(runner::startBundle);
            context.addPCollectionConsumer((String)Iterables.getOnlyElement(context.getPTransform().getInputsMap().values()), runner::processElement);
            context.addFinishBundleFunction(runner::finishBundle);
            return runner;
        }
    }

    private static class PrecombineRunner<@UnknownKeyFor KeyT, @UnknownKeyFor InputT, @UnknownKeyFor AccumT> {
        private final @UnknownKeyFor @NonNull @Initialized PipelineOptions options;
        private final @UnknownKeyFor @NonNull @Initialized String ptransformId;
        private final /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> bundleCache;
        private final /*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized Combine.CombineFn<InputT, AccumT, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> combineFn;
        private final @UnknownKeyFor @NonNull @Initialized FnDataReceiver<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<KeyT, AccumT>>> output;
        private final @UnknownKeyFor @NonNull @Initialized Coder<KeyT> keyCoder;
        private @UnknownKeyFor @NonNull @Initialized PrecombineGroupingTable<KeyT, InputT, AccumT> groupingTable;
        private @UnknownKeyFor @NonNull @Initialized boolean isGloballyWindowed;

        PrecombineRunner(@UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized String ptransformId, /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> bundleCache, /*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized Combine.CombineFn<InputT, AccumT, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> combineFn, @UnknownKeyFor @NonNull @Initialized FnDataReceiver<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<KeyT, AccumT>>> output, @UnknownKeyFor @NonNull @Initialized Coder<KeyT> keyCoder) {
            this(options, ptransformId, bundleCache, combineFn, output, keyCoder, false);
        }

        PrecombineRunner(@UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized String ptransformId, /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Supplier<@UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> bundleCache, /*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized Combine.CombineFn<InputT, AccumT, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> combineFn, @UnknownKeyFor @NonNull @Initialized FnDataReceiver<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<KeyT, AccumT>>> output, @UnknownKeyFor @NonNull @Initialized Coder<KeyT> keyCoder, @UnknownKeyFor @NonNull @Initialized boolean isGloballyWindowed) {
            this.options = options;
            this.ptransformId = ptransformId;
            this.bundleCache = bundleCache;
            this.combineFn = combineFn;
            this.output = output;
            this.keyCoder = keyCoder;
            this.isGloballyWindowed = isGloballyWindowed;
        }

        void startBundle() {
            this.groupingTable = PrecombineGroupingTable.combiningAndSampling(this.options, Caches.subCache(this.bundleCache.get(), this.ptransformId, new Object[0]), this.combineFn, this.keyCoder, 0.001, this.isGloballyWindowed);
        }

        void processElement(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<KeyT, InputT>> elem) throws @UnknownKeyFor @NonNull @Initialized Exception {
            this.groupingTable.put(elem, this.output::accept);
        }

        void finishBundle() throws @UnknownKeyFor @NonNull @Initialized Exception {
            this.groupingTable.flush(this.output::accept);
            this.groupingTable = null;
        }
    }

    @AutoService(value={PTransformRunnerFactory.Registrar.class})
    public static class Registrar
    implements PTransformRunnerFactory.Registrar {
        @Override
        public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of((Object)"beam:transform:combine_per_key_precombine:v1", new PrecombineFactory(), (Object)"beam:transform:combine_per_key_merge_accumulators:v1", MapFnRunners.forValueMapFnFactory(CombineRunners::createMergeAccumulatorsMapFunction), (Object)"beam:transform:combine_per_key_extract_outputs:v1", MapFnRunners.forValueMapFnFactory(CombineRunners::createExtractOutputsMapFunction), (Object)"beam:transform:combine_per_key_convert_to_accumulators:v1", MapFnRunners.forValueMapFnFactory(CombineRunners::createConvertToAccumulatorsMapFunction), (Object)"beam:transform:combine_grouped_values:v1", MapFnRunners.forValueMapFnFactory(CombineRunners::createCombineGroupedValuesMapFunction));
        }
    }
}

