package org.apache.flink.table.planner.plan.rules.logical;

import java.util.Collections;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rex.RexLiteral;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.legacy.sources.LimitableTableSource;
import org.apache.flink.table.legacy.sources.TableSource;
import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalLegacyTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort;
import org.apache.flink.table.planner.plan.rules.logical.ImmutablePushLimitIntoLegacyTableSourceScanRule;
import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
import org.immutables.value.Value;

@Internal
@Value.Enclosing
/* loaded from: input_file:flink-table-planner.jar:org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRule.class */
public class PushLimitIntoLegacyTableSourceScanRule extends RelRule<PushLimitIntoLegacyTableSourceScanRuleConfig> {
    public static final PushLimitIntoLegacyTableSourceScanRule INSTANCE = PushLimitIntoLegacyTableSourceScanRuleConfig.DEFAULT.toRule();

    @Value.Immutable(singleton = false)
    /* loaded from: input_file:flink-table-planner.jar:org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRule$PushLimitIntoLegacyTableSourceScanRuleConfig.class */
    public interface PushLimitIntoLegacyTableSourceScanRuleConfig extends RelRule.Config {
        public static final PushLimitIntoLegacyTableSourceScanRuleConfig DEFAULT = (PushLimitIntoLegacyTableSourceScanRuleConfig) ImmutablePushLimitIntoLegacyTableSourceScanRule.PushLimitIntoLegacyTableSourceScanRuleConfig.builder().operandSupplier(operandBuilder -> {
            return operandBuilder.operand(FlinkLogicalSort.class).oneInput(operandBuilder -> {
                return operandBuilder.operand(FlinkLogicalLegacyTableSourceScan.class).noInputs();
            });
        }).description("PushLimitIntoLegacyTableSourceScanRule").build().as(PushLimitIntoLegacyTableSourceScanRuleConfig.class);

        @Override // org.apache.calcite.plan.RelRule.Config
        default PushLimitIntoLegacyTableSourceScanRule toRule() {
            return new PushLimitIntoLegacyTableSourceScanRule(this);
        }
    }

    protected PushLimitIntoLegacyTableSourceScanRule(PushLimitIntoLegacyTableSourceScanRuleConfig pushLimitIntoLegacyTableSourceScanRuleConfig) {
        super(pushLimitIntoLegacyTableSourceScanRuleConfig);
    }

    @Override // org.apache.calcite.plan.RelOptRule
    public boolean matches(RelOptRuleCall relOptRuleCall) {
        LegacyTableSourceTable legacyTableSourceTable;
        Sort sort = (Sort) relOptRuleCall.rel(0);
        if (!(sort.getCollation().getFieldCollations().isEmpty() && sort.fetch != null) || (legacyTableSourceTable = (LegacyTableSourceTable) relOptRuleCall.rel(1).getTable().unwrap(LegacyTableSourceTable.class)) == null) {
            return false;
        }
        LimitableTableSource tableSource = legacyTableSourceTable.tableSource();
        return (tableSource instanceof LimitableTableSource) && !tableSource.isLimitPushedDown();
    }

    @Override // org.apache.calcite.plan.RelOptRule
    public void onMatch(RelOptRuleCall relOptRuleCall) {
        Sort sort = (Sort) relOptRuleCall.rel(0);
        FlinkLogicalLegacyTableSourceScan flinkLogicalLegacyTableSourceScan = (FlinkLogicalLegacyTableSourceScan) relOptRuleCall.rel(1);
        LegacyTableSourceTable legacyTableSourceTable = (LegacyTableSourceTable) flinkLogicalLegacyTableSourceScan.getTable().unwrap(LegacyTableSourceTable.class);
        int intValue = (sort.offset == null ? 0 : RexLiteral.intValue(sort.offset)) + RexLiteral.intValue(sort.fetch);
        relOptRuleCall.builder();
        LegacyTableSourceTable<?> applyLimit = applyLimit(intValue, legacyTableSourceTable);
        FlinkLogicalLegacyTableSourceScan copy = flinkLogicalLegacyTableSourceScan.copy(flinkLogicalLegacyTableSourceScan.getTraitSet(), applyLimit);
        LimitableTableSource tableSource = ((LegacyTableSourceTable) applyLimit.unwrap(LegacyTableSourceTable.class)).tableSource();
        TableSource tableSource2 = ((LegacyTableSourceTable) legacyTableSourceTable.unwrap(LegacyTableSourceTable.class)).tableSource();
        if (tableSource.isLimitPushedDown() && tableSource.explainSource().equals(tableSource2.explainSource())) {
            throw new TableException("Failed to push limit into table source! table source with pushdown capability must override and change explainSource() API to explain the pushdown applied!");
        }
        relOptRuleCall.transformTo(sort.copy(sort.getTraitSet(), Collections.singletonList(copy)));
    }

    private LegacyTableSourceTable applyLimit(long j, FlinkPreparingTableBase flinkPreparingTableBase) {
        LegacyTableSourceTable legacyTableSourceTable = (LegacyTableSourceTable) flinkPreparingTableBase.unwrap(LegacyTableSourceTable.class);
        TableSource<?> applyLimit = legacyTableSourceTable.tableSource().applyLimit(j);
        FlinkStatistic statistic = flinkPreparingTableBase.getStatistic();
        return legacyTableSourceTable.copy(applyLimit, FlinkStatistic.builder().statistic(statistic).tableStats(new TableStats(statistic.getRowCount() != null ? Math.min(j, statistic.getRowCount().longValue()) : j)).build());
    }
}
