package org.apache.flink.streaming.connectors.kafka.v2;

import java.util.List;
import java.util.Properties;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.v2.common.SourceFunctionTableSource;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/v2/KafkaBaseTableSource.class */
public abstract class KafkaBaseTableSource extends SourceFunctionTableSource<GenericRow> {
    protected List<String> topic;
    protected String topicPattern;
    protected Properties properties;
    protected StartupMode startupMode;
    protected long startTimeStamp;
    protected boolean isFinite;
    protected BaseRowTypeInfo baseRowTypeInfo;

    public KafkaBaseTableSource(List<String> list, String str, Properties properties, StartupMode startupMode, long j, boolean z, BaseRowTypeInfo baseRowTypeInfo) {
        this.startupMode = StartupMode.GROUP_OFFSETS;
        this.startTimeStamp = -1L;
        this.isFinite = false;
        this.topic = list;
        this.topicPattern = str;
        this.properties = properties;
        this.startupMode = startupMode;
        this.startTimeStamp = j;
        this.isFinite = z;
        this.baseRowTypeInfo = baseRowTypeInfo;
    }

    @Override // org.apache.flink.streaming.connectors.kafka.v2.common.SourceFunctionTableSource
    public DataStream<GenericRow> getBoundedStream(StreamExecutionEnvironment streamExecutionEnvironment) {
        DataStreamSource addSource = streamExecutionEnvironment.addSource(getSourceFunction(), String.format("%s-%s", explainSource(), SourceFunctionTableSource.BATCH_TAG), getProducedType());
        int topicPartitionSize = getTopicPartitionSize();
        addSource.setParallelism(topicPartitionSize);
        addSource.getTransformation().setMaxParallelism(topicPartitionSize);
        return addSource;
    }

    @Override // org.apache.flink.streaming.connectors.kafka.v2.common.SourceFunctionTableSource
    public SourceFunction<GenericRow> getSourceFunction() {
        FlinkKafkaConsumerBase createKafkaConsumer = createKafkaConsumer();
        switch (this.startupMode) {
            case LATEST:
                createKafkaConsumer.setStartFromLatest();
                break;
            case EARLIEST:
                createKafkaConsumer.setStartFromEarliest();
                break;
            case GROUP_OFFSETS:
                createKafkaConsumer.setStartFromGroupOffsets();
                break;
        }
        createKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
        return createKafkaConsumer;
    }

    public abstract FlinkKafkaConsumerBase createKafkaConsumer();

    public abstract int getTopicPartitionSize();
}
