package org.apache.flink.connector.kafka.sink;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
import org.apache.flink.connector.kafka.lineage.LineageUtil;
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/connector/kafka/sink/KafkaSink.class */
public class KafkaSink<IN> implements LineageVertexProvider, TwoPhaseCommittingStatefulSink<IN, KafkaWriterState, KafkaCommittable>, SupportsPostCommitTopology<KafkaCommittable> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
    private final DeliveryGuarantee deliveryGuarantee;
    private final KafkaRecordSerializationSchema<IN> recordSerializer;
    private final Properties kafkaProducerConfig;
    private final String transactionalIdPrefix;
    private final TransactionNamingStrategy transactionNamingStrategy;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaSink(DeliveryGuarantee deliveryGuarantee, Properties properties, String str, KafkaRecordSerializationSchema<IN> kafkaRecordSerializationSchema, TransactionNamingStrategy transactionNamingStrategy) {
        this.deliveryGuarantee = deliveryGuarantee;
        this.kafkaProducerConfig = properties;
        this.transactionalIdPrefix = str;
        this.recordSerializer = kafkaRecordSerializationSchema;
        this.transactionNamingStrategy = transactionNamingStrategy;
    }

    public static <IN> KafkaSinkBuilder<IN> builder() {
        return new KafkaSinkBuilder<>();
    }

    @Internal
    public Committer<KafkaCommittable> createCommitter(CommitterInitContext committerInitContext) {
        return new KafkaCommitter(this.kafkaProducerConfig, this.transactionalIdPrefix, committerInitContext.getTaskInfo().getIndexOfThisSubtask(), committerInitContext.getTaskInfo().getAttemptNumber(), this.transactionNamingStrategy == TransactionNamingStrategy.POOLING, FlinkKafkaInternalProducer::new);
    }

    @Internal
    public SimpleVersionedSerializer<KafkaCommittable> getCommittableSerializer() {
        return new KafkaCommittableSerializer();
    }

    @Override // org.apache.flink.connector.kafka.sink.TwoPhaseCommittingStatefulSink
    @Internal
    /* renamed from: createWriter */
    public KafkaWriter<IN> mo17createWriter(WriterInitContext writerInitContext) throws IOException {
        return mo16restoreWriter(writerInitContext, (Collection<KafkaWriterState>) Collections.emptyList());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.connector.kafka.sink.TwoPhaseCommittingStatefulSink
    @Internal
    /* renamed from: restoreWriter */
    public KafkaWriter<IN> mo16restoreWriter(WriterInitContext writerInitContext, Collection<KafkaWriterState> collection) {
        KafkaWriter<IN> exactlyOnceKafkaWriter = this.deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE ? new ExactlyOnceKafkaWriter(this.deliveryGuarantee, this.kafkaProducerConfig, this.transactionalIdPrefix, writerInitContext, this.recordSerializer, writerInitContext.asSerializationSchemaInitializationContext(), this.transactionNamingStrategy.getAbortImpl(), this.transactionNamingStrategy.getImpl(), collection) : new KafkaWriter<>(this.deliveryGuarantee, this.kafkaProducerConfig, writerInitContext, this.recordSerializer, writerInitContext.asSerializationSchemaInitializationContext());
        exactlyOnceKafkaWriter.initialize();
        return exactlyOnceKafkaWriter;
    }

    @Internal
    public SimpleVersionedSerializer<KafkaWriterState> getWriterStateSerializer() {
        return new KafkaWriterStateSerializer();
    }

    public void addPostCommitTopology(DataStream<CommittableMessage<KafkaCommittable>> dataStream) {
        if (this.deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE || this.transactionalIdPrefix == null) {
            return;
        }
        Transformation transformation = dataStream.getTransformation();
        while (true) {
            Transformation transformation2 = transformation;
            if (!(transformation2.getOutputType() instanceof CommittableMessageTypeInfo) || transformation2.getCoLocationGroupKey() != null) {
                return;
            }
            transformation2.setCoLocationGroupKey(this.transactionalIdPrefix);
            transformation = (Transformation) transformation2.getInputs().get(0);
        }
    }

    @VisibleForTesting
    protected Properties getKafkaProducerConfig() {
        return this.kafkaProducerConfig;
    }

    public LineageVertex getLineageVertex() {
        if (!(this.recordSerializer instanceof KafkaDatasetFacetProvider)) {
            LOG.info("recordSerializer does not implement KafkaDatasetFacetProvider: {}", this.recordSerializer);
            return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
        }
        Optional<KafkaDatasetFacet> kafkaDatasetFacet = ((KafkaDatasetFacetProvider) this.recordSerializer).getKafkaDatasetFacet();
        if (!kafkaDatasetFacet.isPresent()) {
            LOG.info("Provider did not return kafka dataset facet");
            return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
        }
        kafkaDatasetFacet.get().setProperties(this.kafkaProducerConfig);
        String namespaceOf = LineageUtil.namespaceOf(this.kafkaProducerConfig);
        Optional<TypeDatasetFacet> empty = Optional.empty();
        if (this.recordSerializer instanceof TypeDatasetFacetProvider) {
            empty = ((TypeDatasetFacetProvider) this.recordSerializer).getTypeDatasetFacet();
        }
        return empty.isPresent() ? LineageUtil.sourceLineageVertexOf(Collections.singleton(LineageUtil.datasetOf(namespaceOf, kafkaDatasetFacet.get(), empty.get()))) : LineageUtil.sourceLineageVertexOf(Collections.singleton(LineageUtil.datasetOf(namespaceOf, kafkaDatasetFacet.get())));
    }

    @Override // org.apache.flink.connector.kafka.sink.TwoPhaseCommittingStatefulSink
    @Internal
    /* renamed from: restoreWriter, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StatefulSinkWriter mo16restoreWriter(WriterInitContext writerInitContext, Collection collection) throws IOException {
        return mo16restoreWriter(writerInitContext, (Collection<KafkaWriterState>) collection);
    }
}
