package com.ververica.cdc.composer.flink.translator;

import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.sink.DataSink;
import com.ververica.cdc.common.sink.FlinkSinkProvider;
import com.ververica.cdc.composer.definition.SinkDef;
import com.ververica.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory;
import java.lang.invoke.SerializedLambda;
import java.util.Objects;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;

@Internal
/* loaded from: input_file:com/ververica/cdc/composer/flink/translator/DataSinkTranslator.class */
public class DataSinkTranslator {
    private static final String SINK_WRITER_PREFIX = "Sink Writer: ";
    private static final String SINK_COMMITTER_PREFIX = "Sink Committer: ";

    public void translate(SinkDef sinkDef, DataStream<Event> dataStream, DataSink dataSink, OperatorID operatorID) {
        FlinkSinkProvider eventSinkProvider = dataSink.getEventSinkProvider();
        String generateSinkName = generateSinkName(sinkDef);
        if (eventSinkProvider instanceof FlinkSinkProvider) {
            sinkTo(dataStream, eventSinkProvider.getSink(), generateSinkName, operatorID);
        }
    }

    private void sinkTo(DataStream<Event> dataStream, Sink<Event> sink, String str, OperatorID operatorID) {
        DataStream<Event> dataStream2 = dataStream;
        if (sink instanceof WithPreWriteTopology) {
            dataStream2 = ((WithPreWriteTopology) sink).addPreWriteTopology(dataStream2);
        }
        if (sink instanceof TwoPhaseCommittingSink) {
            addCommittingTopology(sink, dataStream2, str, operatorID);
        } else {
            dataStream.transform(SINK_WRITER_PREFIX + str, CommittableMessageTypeInfo.noOutput(), new DataSinkWriterOperatorFactory(sink, operatorID));
        }
    }

    private <CommT> void addCommittingTopology(Sink<Event> sink, DataStream<Event> dataStream, String str, OperatorID operatorID) {
        TwoPhaseCommittingSink twoPhaseCommittingSink = (TwoPhaseCommittingSink) sink;
        Objects.requireNonNull(twoPhaseCommittingSink);
        TypeInformation of = CommittableMessageTypeInfo.of(twoPhaseCommittingSink::getCommittableSerializer);
        DataStream transform = dataStream.transform(SINK_WRITER_PREFIX + str, of, new DataSinkWriterOperatorFactory(sink, operatorID));
        DataStream dataStream2 = transform;
        if (sink instanceof WithPreCommitTopology) {
            dataStream2 = ((WithPreCommitTopology) sink).addPreCommitTopology(transform);
        }
        SingleOutputStreamOperator transform2 = dataStream2.transform(SINK_COMMITTER_PREFIX + str, of, new CommitterOperatorFactory(twoPhaseCommittingSink, false, true));
        if (sink instanceof WithPostCommitTopology) {
            ((WithPostCommitTopology) sink).addPostCommitTopology(transform2);
        }
    }

    private String generateSinkName(SinkDef sinkDef) {
        return sinkDef.getName().orElse(String.format("Flink CDC Event Sink: %s", sinkDef.getType()));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -731654093:
                if (implMethodName.equals("getCommittableSerializer")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/core/io/SimpleVersionedSerializer;")) {
                    TwoPhaseCommittingSink twoPhaseCommittingSink = (TwoPhaseCommittingSink) serializedLambda.getCapturedArg(0);
                    return twoPhaseCommittingSink::getCommittableSerializer;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
