/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.time.Duration;
import java.util.List;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.plan.stats.TableStats;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.PaimonDataStreamScanProvider;
import org.apache.paimon.flink.log.LogSourceProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.flink.source.FlinkTableSource;
import org.apache.paimon.flink.source.WatermarkAlignUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.ChangelogValueCountFileStoreTable;
import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Projection;

public class DataTableSource
extends FlinkTableSource {
    private final ObjectIdentifier tableIdentifier;
    protected final boolean streaming;
    private final DynamicTableFactory.Context context;
    @Nullable
    private final LogStoreTableFactory logStoreTableFactory;
    @Nullable
    private WatermarkStrategy<RowData> watermarkStrategy;
    protected SplitStatistics splitStatistics;

    public DataTableSource(ObjectIdentifier tableIdentifier, Table table, boolean streaming, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory) {
        this(tableIdentifier, table, streaming, context, logStoreTableFactory, null, null, null, null);
    }

    public DataTableSource(ObjectIdentifier tableIdentifier, Table table, boolean streaming, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory, @Nullable Predicate predicate, @Nullable int[][] projectFields, @Nullable Long limit, @Nullable WatermarkStrategy<RowData> watermarkStrategy) {
        super(table, predicate, projectFields, limit);
        this.tableIdentifier = tableIdentifier;
        this.streaming = streaming;
        this.context = context;
        this.logStoreTableFactory = logStoreTableFactory;
        this.predicate = predicate;
        this.projectFields = projectFields;
        this.limit = limit;
        this.watermarkStrategy = watermarkStrategy;
    }

    @Override
    public ChangelogMode getChangelogMode() {
        if (!this.streaming) {
            return ChangelogMode.insertOnly();
        }
        if (this.table instanceof AppendOnlyFileStoreTable) {
            return ChangelogMode.insertOnly();
        }
        if (this.table instanceof ChangelogValueCountFileStoreTable) {
            return ChangelogMode.all();
        }
        if (this.table instanceof ChangelogWithKeyFileStoreTable) {
            Options options = Options.fromMap(this.table.options());
            if (new CoreOptions(options).mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW) {
                return ChangelogMode.insertOnly();
            }
            if (options.get(CoreOptions.LOG_SCAN_REMOVE_NORMALIZE).booleanValue()) {
                return ChangelogMode.all();
            }
            if (this.logStoreTableFactory == null && options.get(CoreOptions.CHANGELOG_PRODUCER) != CoreOptions.ChangelogProducer.NONE) {
                return ChangelogMode.all();
            }
            return options.get(CoreOptions.LOG_CONSISTENCY) == CoreOptions.LogConsistency.TRANSACTIONAL && options.get(CoreOptions.LOG_CHANGELOG_MODE) == CoreOptions.LogChangelogMode.ALL ? ChangelogMode.all() : ChangelogMode.upsert();
        }
        throw new UnsupportedOperationException("Unsupported Table subclass " + this.table.getClass().getName() + " for streaming mode.");
    }

    @Override
    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        LogSourceProvider logSourceProvider = null;
        if (this.logStoreTableFactory != null) {
            logSourceProvider = this.logStoreTableFactory.createSourceProvider(this.context, (DynamicTableSource.Context)scanContext, this.projectFields);
        }
        WatermarkStrategy<RowData> watermarkStrategy = this.watermarkStrategy;
        Options options = Options.fromMap(this.table.options());
        if (watermarkStrategy != null) {
            String watermarkAlignGroup;
            Duration idleTimeout;
            FlinkConnectorOptions.WatermarkEmitStrategy emitStrategy = options.get(FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY);
            if (emitStrategy == FlinkConnectorOptions.WatermarkEmitStrategy.ON_EVENT) {
                watermarkStrategy = new WatermarkStrategy<RowData>(watermarkStrategy);
            }
            if ((idleTimeout = options.get(FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT)) != null) {
                watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout);
            }
            if ((watermarkAlignGroup = options.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP)) != null) {
                try {
                    watermarkStrategy = WatermarkAlignUtils.withWatermarkAlignment(watermarkStrategy, watermarkAlignGroup, options.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT), options.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL));
                }
                catch (NoSuchMethodError error) {
                    throw new RuntimeException("Flink 1.14 does not support watermark alignment, please check your Flink version.", error);
                }
            }
        }
        FlinkSourceBuilder sourceBuilder = new FlinkSourceBuilder(this.tableIdentifier, this.table).withContinuousMode(this.streaming).withLogSourceProvider(logSourceProvider).withProjection(this.projectFields).withPredicate(this.predicate).withLimit(this.limit).withWatermarkStrategy(watermarkStrategy);
        return new PaimonDataStreamScanProvider(!this.streaming, env -> this.configureSource(sourceBuilder, (StreamExecutionEnvironment)env));
    }

    private DataStream<RowData> configureSource(FlinkSourceBuilder sourceBuilder, StreamExecutionEnvironment env) {
        Options options = Options.fromMap(this.table.options());
        Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
        if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM).booleanValue()) {
            if (this.streaming) {
                parallelism = options.get(CoreOptions.BUCKET);
            } else {
                this.scanSplitsForInference();
                parallelism = this.splitStatistics.splitNumber();
                if (null != this.limit && this.limit > 0L) {
                    int limitCount = this.limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : this.limit.intValue();
                    parallelism = Math.min(parallelism, limitCount);
                }
                parallelism = Math.max(1, parallelism);
            }
        }
        return sourceBuilder.withParallelism(parallelism).withEnv(env).build();
    }

    private void scanSplitsForInference() {
        if (this.splitStatistics == null) {
            List<Split> splits = this.table.newReadBuilder().withFilter(this.predicate).newScan().plan().splits();
            this.splitStatistics = new SplitStatistics(splits);
        }
    }

    @Override
    public DataTableSource copy() {
        return new DataTableSource(this.tableIdentifier, this.table, this.streaming, this.context, this.logStoreTableFactory, this.predicate, this.projectFields, this.limit, this.watermarkStrategy);
    }

    @Override
    public void pushWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
    }

    @Override
    public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext context) {
        if (this.limit != null) {
            throw new RuntimeException("Limit push down should not happen in Lookup source, but it is " + this.limit);
        }
        int[] projection = this.projectFields == null ? IntStream.range(0, this.table.rowType().getFieldCount()).toArray() : Projection.of(this.projectFields).toTopLevelIndexes();
        int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes();
        Options options = new Options(this.table.options());
        boolean enableAsync = options.get(FlinkConnectorOptions.LOOKUP_ASYNC);
        int asyncThreadNumber = options.get(FlinkConnectorOptions.LOOKUP_ASYNC_THREAD_NUMBER);
        return LookupRuntimeProviderFactory.create(new FileStoreLookupFunction(this.table, projection, joinKey, this.predicate), enableAsync, asyncThreadNumber);
    }

    @Override
    public TableStats reportStatistics() {
        if (this.streaming) {
            return TableStats.UNKNOWN;
        }
        this.scanSplitsForInference();
        return new TableStats(this.splitStatistics.totalRowCount());
    }

    @Override
    public String asSummaryString() {
        return "Paimon-DataSource";
    }

    protected static class SplitStatistics {
        private final int splitNumber;
        private final long totalRowCount;

        private SplitStatistics(List<Split> splits) {
            this.splitNumber = splits.size();
            this.totalRowCount = splits.stream().mapToLong(Split::rowCount).sum();
        }

        public int splitNumber() {
            return this.splitNumber;
        }

        public long totalRowCount() {
            return this.totalRowCount;
        }
    }
}

