package org.apache.flink.streaming.runtime.translators;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.GlobalCommitterTransform;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.GlobalCommitterOperator;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.class */
public class GlobalCommitterTransformationTranslator<CommT> implements TransformationTranslator<Void, GlobalCommitterTransform<CommT>> {
    @Override // org.apache.flink.streaming.api.graph.TransformationTranslator
    public Collection<Integer> translateForBatch(GlobalCommitterTransform<CommT> globalCommitterTransform, TransformationTranslator.Context context) {
        return translateInternal(globalCommitterTransform, true);
    }

    @Override // org.apache.flink.streaming.api.graph.TransformationTranslator
    public Collection<Integer> translateForStreaming(GlobalCommitterTransform<CommT> globalCommitterTransform, TransformationTranslator.Context context) {
        return translateInternal(globalCommitterTransform, false);
    }

    private Collection<Integer> translateInternal(GlobalCommitterTransform<CommT> globalCommitterTransform, boolean z) {
        DataStream<CommittableMessage<CommT>> inputStream = globalCommitterTransform.getInputStream();
        boolean z2 = z || !inputStream.getExecutionEnvironment().getCheckpointConfig().isCheckpointingEnabled() || hasUpstreamCommitter(inputStream);
        DataStream<CommittableMessage<CommT>> global = inputStream.global();
        PhysicalTransformation physicalTransformation = (PhysicalTransformation) global.transform(StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME, Types.VOID, new GlobalCommitterOperator(globalCommitterTransform.getCommitterFactory(), globalCommitterTransform.getCommittableSerializer(), z2)).getTransformation();
        physicalTransformation.setChainingStrategy(ChainingStrategy.ALWAYS);
        physicalTransformation.setParallelism(1);
        physicalTransformation.setMaxParallelism(1);
        Objects.requireNonNull(physicalTransformation);
        Consumer consumer = physicalTransformation::setName;
        Objects.requireNonNull(globalCommitterTransform);
        copySafely(consumer, globalCommitterTransform::getName);
        Objects.requireNonNull(physicalTransformation);
        Consumer consumer2 = physicalTransformation::setUid;
        Objects.requireNonNull(globalCommitterTransform);
        copySafely(consumer2, globalCommitterTransform::getUid);
        Objects.requireNonNull(physicalTransformation);
        Consumer consumer3 = physicalTransformation::setUidHash;
        Objects.requireNonNull(globalCommitterTransform);
        copySafely(consumer3, globalCommitterTransform::getUserProvidedNodeHash);
        return Arrays.asList(Integer.valueOf(global.getId()), Integer.valueOf(physicalTransformation.getId()));
    }

    private static <T> void copySafely(Consumer<T> consumer, Supplier<T> supplier) {
        T t = supplier.get();
        if (t != null) {
            consumer.accept(t);
        }
    }

    private static boolean hasUpstreamCommitter(DataStream<?> dataStream) {
        Transformation<?> transformation = dataStream.getTransformation();
        HashSet hashSet = new HashSet();
        ArrayDeque arrayDeque = new ArrayDeque(Collections.singleton(transformation));
        while (!arrayDeque.isEmpty()) {
            Transformation transformation2 = (Transformation) arrayDeque.poll();
            if (transformation2 instanceof OneInputTransformation) {
                StreamOperatorFactory operatorFactory = ((OneInputTransformation) transformation2).getOperatorFactory();
                if (operatorFactory instanceof CommitterOperatorFactory) {
                    return true;
                }
                if (operatorFactory instanceof SinkWriterOperatorFactory) {
                }
            }
            for (Transformation transformation3 : transformation2.getInputs()) {
                if (hashSet.add(Integer.valueOf(transformation3.getId()))) {
                    arrayDeque.add(transformation3);
                }
            }
        }
        return false;
    }
}
