package org.apache.paimon.flink.source;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.Projection;
import org.apache.paimon.flink.log.LogSourceProvider;
import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource;
import org.apache.paimon.flink.source.operator.MonitorFunction;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/source/FlinkSourceBuilder.class */
public class FlinkSourceBuilder {
    private final ObjectIdentifier tableIdentifier;
    private final Table table;
    private final Options conf;
    private boolean isContinuous = false;
    private StreamExecutionEnvironment env;

    @Nullable
    private int[][] projectedFields;

    @Nullable
    private Predicate predicate;

    @Nullable
    private LogSourceProvider logSourceProvider;

    @Nullable
    private Integer parallelism;

    @Nullable
    private Long limit;

    @Nullable
    private WatermarkStrategy<RowData> watermarkStrategy;

    @Nullable
    private DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo;

    public FlinkSourceBuilder(ObjectIdentifier objectIdentifier, Table table) {
        this.tableIdentifier = objectIdentifier;
        this.table = table;
        this.conf = Options.fromMap(table.options());
    }

    public FlinkSourceBuilder withContinuousMode(boolean z) {
        this.isContinuous = z;
        return this;
    }

    public FlinkSourceBuilder withEnv(StreamExecutionEnvironment streamExecutionEnvironment) {
        this.env = streamExecutionEnvironment;
        return this;
    }

    public FlinkSourceBuilder withProjection(int[][] iArr) {
        this.projectedFields = iArr;
        return this;
    }

    public FlinkSourceBuilder withPredicate(Predicate predicate) {
        this.predicate = predicate;
        return this;
    }

    public FlinkSourceBuilder withLimit(@Nullable Long l) {
        this.limit = l;
        return this;
    }

    public FlinkSourceBuilder withLogSourceProvider(LogSourceProvider logSourceProvider) {
        this.logSourceProvider = logSourceProvider;
        return this;
    }

    public FlinkSourceBuilder withParallelism(@Nullable Integer num) {
        this.parallelism = num;
        return this;
    }

    public FlinkSourceBuilder withWatermarkStrategy(@Nullable WatermarkStrategy<RowData> watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
        return this;
    }

    public FlinkSourceBuilder withDynamicPartitionFilteringFields(List<String> list) {
        if (list != null && !list.isEmpty()) {
            Preconditions.checkState(this.table instanceof FileStoreTable, "Only Paimon FileStoreTable supports dynamic filtering but get %s.", this.table.getClass().getName());
            this.dynamicPartitionFilteringInfo = new DynamicPartitionFilteringInfo(((FileStoreTable) this.table).schema().logicalPartitionType(), list);
        }
        return this;
    }

    private ReadBuilder createReadBuilder() {
        ReadBuilder withFilter = this.table.newReadBuilder().withProjection(this.projectedFields).withFilter(this.predicate);
        if (this.limit != null) {
            withFilter.withLimit(this.limit.intValue());
        }
        return withFilter;
    }

    private DataStream<RowData> buildStaticFileSource() {
        Options fromMap = Options.fromMap(this.table.options());
        return toDataStream(new StaticFileStoreSource(createReadBuilder(), this.limit, ((Integer) fromMap.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE)).intValue(), (FlinkConnectorOptions.SplitAssignMode) fromMap.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE), this.dynamicPartitionFilteringInfo));
    }

    private DataStream<RowData> buildContinuousFileSource() {
        return toDataStream(new ContinuousFileStoreSource(createReadBuilder(), this.table.options(), this.limit, this.table instanceof FileStoreTable ? ((FileStoreTable) this.table).bucketMode() : BucketMode.FIXED));
    }

    private DataStream<RowData> buildAlignedContinuousFileSource() {
        assertStreamingConfigurationForAlignMode(this.env);
        return toDataStream(new AlignedContinuousFileStoreSource(createReadBuilder(), this.table.options(), this.limit, this.table instanceof FileStoreTable ? ((FileStoreTable) this.table).bucketMode() : BucketMode.FIXED));
    }

    private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
        DataStreamSource fromSource = this.env.fromSource(source, this.watermarkStrategy == null ? WatermarkStrategy.noWatermarks() : this.watermarkStrategy, this.tableIdentifier.asSummaryString(), produceTypeInfo());
        if (this.parallelism != null) {
            fromSource.setParallelism(this.parallelism.intValue());
        }
        return fromSource;
    }

    private TypeInformation<RowData> produceTypeInfo() {
        RowType logicalType = LogicalTypeConversion.toLogicalType(this.table.rowType());
        return InternalTypeInfo.of((LogicalType) Optional.ofNullable(this.projectedFields).map(Projection::of).map(projection -> {
            return projection.project(logicalType);
        }).orElse(logicalType));
    }

    public DataStream<RowData> build() {
        if (this.env == null) {
            throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
        }
        if (!this.isContinuous) {
            return buildStaticFileSource();
        }
        TableScanUtils.streamingReadingValidate(this.table);
        return (this.logSourceProvider == null || CoreOptions.streamReadType(this.conf) == CoreOptions.StreamingReadMode.FILE) ? ((Boolean) this.conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)).booleanValue() ? buildAlignedContinuousFileSource() : (this.conf.contains(CoreOptions.CONSUMER_ID) && this.conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE) == CoreOptions.ConsumerMode.EXACTLY_ONCE) ? buildContinuousStreamOperator() : buildContinuousFileSource() : CoreOptions.startupMode(this.conf) != CoreOptions.StartupMode.LATEST_FULL ? toDataStream(this.logSourceProvider.mo192createSource(null)) : toDataStream(HybridSource.builder(LogHybridSourceFactory.buildHybridFirstSource(this.table, this.projectedFields, this.predicate)).addSource(new LogHybridSourceFactory(this.logSourceProvider), Boundedness.CONTINUOUS_UNBOUNDED).build());
    }

    private DataStream<RowData> buildContinuousStreamOperator() {
        if (this.limit != null) {
            throw new IllegalArgumentException("Cannot limit streaming source, please use batch execution mode.");
        }
        DataStream buildSource = MonitorFunction.buildSource(this.env, this.tableIdentifier.asSummaryString(), produceTypeInfo(), createReadBuilder(), ((Duration) this.conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL)).toMillis(), this.watermarkStrategy == null);
        if (this.parallelism != null) {
            buildSource.getTransformation().setParallelism(this.parallelism.intValue());
        }
        if (this.watermarkStrategy != null) {
            buildSource = buildSource.assignTimestampsAndWatermarks(this.watermarkStrategy);
        }
        return buildSource;
    }

    public void assertStreamingConfigurationForAlignMode(StreamExecutionEnvironment streamExecutionEnvironment) {
        CheckpointConfig checkpointConfig = streamExecutionEnvironment.getCheckpointConfig();
        Preconditions.checkArgument(checkpointConfig.isCheckpointingEnabled(), "The align mode of paimon source is only supported when checkpoint enabled. Please set " + ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key() + "larger than 0");
        Preconditions.checkArgument(checkpointConfig.getMaxConcurrentCheckpoints() == 1, "The align mode of paimon source supports at most one ongoing checkpoint at the same time. Please set " + ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.key() + " to 1");
        Preconditions.checkArgument(checkpointConfig.getCheckpointTimeout() > ((Duration) this.conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT)).toMillis(), "The align mode of paimon source requires that the timeout of checkpoint is greater than the timeout of the source's snapshot alignment. Please increase " + ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key() + " or decrease " + FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT.key());
        Preconditions.checkArgument(!streamExecutionEnvironment.getCheckpointConfig().isUnalignedCheckpointsEnabled(), "The align mode of paimon source currently does not support unaligned checkpoints. Please set " + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key() + " to false.");
        Preconditions.checkArgument(streamExecutionEnvironment.getCheckpointConfig().getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE, "The align mode of paimon source currently only supports EXACTLY_ONCE checkpoint mode. Please set " + ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key() + " to exactly-once");
    }
}
