/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.translators;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.transformations.StubTransformation;
import org.apache.flink.util.Preconditions;

@Internal
public class StubTransformationTranslator<OUT>
extends SimpleTransformationTranslator<OUT, StubTransformation<OUT>> {
    @Override
    protected Collection<Integer> translateForBatchInternal(StubTransformation<OUT> transformation, TransformationTranslator.Context context) {
        return this.translateInternal(transformation, context);
    }

    @Override
    protected Collection<Integer> translateForStreamingInternal(StubTransformation<OUT> transformation, TransformationTranslator.Context context) {
        return this.translateInternal(transformation, context);
    }

    private Collection<Integer> translateInternal(StubTransformation<OUT> stubTransformation, TransformationTranslator.Context context) {
        Preconditions.checkNotNull(stubTransformation);
        Preconditions.checkNotNull((Object)context);
        Collection<Transformation<?>> upstreams = this.findUpstreams(stubTransformation, context.getSinkTransformations());
        List<Transformation<?>> inputs = this.extractInputs(upstreams, stubTransformation);
        return inputs.stream().flatMap(input -> context.transform((Transformation<?>)input).stream()).collect(Collectors.toList());
    }

    private List<Transformation<?>> extractInputs(Collection<Transformation<?>> upstreams, StubTransformation<OUT> stubTransformation) {
        List<Transformation<?>> inputs = upstreams.stream().map(stubTransformation.getInputAdjuster()).collect(Collectors.toList());
        for (Transformation transformation : inputs) {
            Preconditions.checkState((boolean)transformation.getOutputType().equals((Object)stubTransformation.getOutputType()), (String)"The output type of the input transformation does not match the expected output type of the StubTransformation. StubTransformation: %s, Input Transformation: %s, Expected Output Type: %s, Actual Output Type: %s", (Object[])new Object[]{stubTransformation, transformation, stubTransformation.getOutputType(), transformation.getOutputType()});
        }
        return inputs;
    }

    private Collection<Transformation<?>> findUpstreams(StubTransformation<?> stubTransformation, Collection<Transformation<?>> sinks) {
        Predicate<Transformation<?>> upstreamFinder = stubTransformation.getUpstreamFinder();
        HashMap<Integer, Transformation> upstreams = new HashMap<Integer, Transformation>();
        for (Transformation<?> sink : sinks) {
            for (Transformation transformation : sink.getTransitivePredecessors()) {
                if (!upstreamFinder.test(transformation)) continue;
                upstreams.put(transformation.getId(), transformation);
            }
        }
        if (upstreams.isEmpty()) {
            throw new IllegalStateException("No upstream transformation found for StubTransformation: " + stubTransformation);
        }
        return upstreams.values();
    }
}

