package org.apache.flink.streaming.runtime.translators;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkOperatorFactory;
import org.apache.flink.streaming.util.graph.StreamGraphUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.class */
public class SinkTransformationTranslator<InputT, CommT, WriterStateT, GlobalCommT> implements TransformationTranslator<Object, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT>> {
    protected static final Logger LOG = LoggerFactory.getLogger(SinkTransformationTranslator.class);

    @Override // org.apache.flink.streaming.api.graph.TransformationTranslator
    public Collection<Integer> translateForBatch(SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, TransformationTranslator.Context context) {
        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), sinkTransformation);
        try {
            internalTranslate(sinkTransformation, getParallelism(sinkTransformation, context), true, context);
            return Collections.emptyList();
        } catch (IOException e) {
            throw new FlinkRuntimeException("Could not add the Committer or GlobalCommitter to the stream graph.", e);
        }
    }

    @Override // org.apache.flink.streaming.api.graph.TransformationTranslator
    public Collection<Integer> translateForStreaming(SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, TransformationTranslator.Context context) {
        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), sinkTransformation);
        try {
            internalTranslate(sinkTransformation, getParallelism(sinkTransformation, context), false, context);
            return Collections.emptyList();
        } catch (IOException e) {
            throw new FlinkRuntimeException("Could not add the Committer or GlobalCommitter to the stream graph.", e);
        }
    }

    private void internalTranslate(SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, int i, boolean z, TransformationTranslator.Context context) throws IOException {
        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), sinkTransformation);
        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = sinkTransformation.getSink();
        boolean z2 = (z && sink.getCommittableSerializer().isPresent()) || sink.getGlobalCommittableSerializer().isPresent();
        int addWriterAndCommitter = addWriterAndCommitter(sinkTransformation, i, z, z2, context);
        if (z2) {
            addGlobalCommitter(addWriterAndCommitter, sinkTransformation, z, context);
        }
    }

    private int addWriterAndCommitter(SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, int i, boolean z, boolean z2, TransformationTranslator.Context context) {
        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = sinkTransformation.getSink();
        Preconditions.checkState(sinkTransformation.getInputs().size() == 1);
        Transformation<?> transformation = sinkTransformation.getInputs().get(0);
        TypeInformation<IN> outputType = transformation.getOutputType();
        SinkOperatorFactory sinkOperatorFactory = new SinkOperatorFactory(sink, z, z2);
        ChainingStrategy chainingStrategy = sinkTransformation.getChainingStrategy();
        if (chainingStrategy != null) {
            sinkOperatorFactory.setChainingStrategy(chainingStrategy);
        }
        return addOperatorToStreamGraph(sinkOperatorFactory, context.getStreamNodeIds(transformation), outputType, TypeInformation.of(byte[].class), String.format((z && z2) ? "Sink %s Writer" : "Sink %s", sinkTransformation.getName()), sinkTransformation.getUid(), i, sinkTransformation.getMaxParallelism(), sinkTransformation, context);
    }

    private void addGlobalCommitter(int i, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, boolean z, TransformationTranslator.Context context) {
        Sink<InputT, CommT, WriterStateT, GlobalCommT> sink = sinkTransformation.getSink();
        String str = z ? "Sink %s Committer" : "Sink %s Global Committer";
        addOperatorToStreamGraph(new CommitterOperatorFactory(sink, z), Collections.singletonList(Integer.valueOf(i)), TypeInformation.of(byte[].class), null, String.format(str, sinkTransformation.getName()), sinkTransformation.getUid() == null ? null : String.format(str, sinkTransformation.getUid()), 1, 1, sinkTransformation, context);
    }

    private int getParallelism(SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, TransformationTranslator.Context context) {
        return sinkTransformation.getParallelism() != -1 ? sinkTransformation.getParallelism() : context.getStreamGraph().getExecutionConfig().getParallelism();
    }

    private <IN, OUT> int addOperatorToStreamGraph(StreamOperatorFactory<OUT> streamOperatorFactory, Collection<Integer> collection, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, String str, @Nullable String str2, int i, int i2, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, TransformationTranslator.Context context) {
        StreamGraph streamGraph = context.getStreamGraph();
        String slotSharingGroup = context.getSlotSharingGroup();
        int newNodeId = Transformation.getNewNodeId();
        streamGraph.addOperator(Integer.valueOf(newNodeId), slotSharingGroup, sinkTransformation.getCoLocationGroupKey(), streamOperatorFactory, typeInformation, typeInformation2, str);
        streamGraph.setParallelism(Integer.valueOf(newNodeId), i);
        streamGraph.setMaxParallelism(newNodeId, i2);
        StreamGraphUtils.configureBufferTimeout(streamGraph, newNodeId, sinkTransformation, context.getDefaultBufferTimeout());
        if (str2 != null) {
            streamGraph.setTransformationUID(Integer.valueOf(newNodeId), str2);
        }
        Iterator<Integer> it = collection.iterator();
        while (it.hasNext()) {
            streamGraph.addEdge(Integer.valueOf(it.next().intValue()), Integer.valueOf(newNodeId), 0);
        }
        return newNodeId;
    }
}
