package org.apache.flink.table.planner.plan.rules.logical;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
import scala.Option;

/* loaded from: input_file:org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleBase.class */
public abstract class PushWatermarkIntoTableSourceScanRuleBase extends RelOptRule {

    /* loaded from: input_file:org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleBase$DefaultWatermarkGeneratorSupplier.class */
    private static class DefaultWatermarkGeneratorSupplier implements WatermarkGeneratorSupplier<RowData> {
        private static final long serialVersionUID = 1;
        private final Configuration configuration;
        private final GeneratedWatermarkGenerator generatedWatermarkGenerator;

        /* loaded from: input_file:org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleBase$DefaultWatermarkGeneratorSupplier$DefaultWatermarkGenerator.class */
        private static class DefaultWatermarkGenerator implements WatermarkGenerator<RowData> {
            private static final long serialVersionUID = 1;
            private final org.apache.flink.table.runtime.generated.WatermarkGenerator innerWatermarkGenerator;
            private Long currentWatermark = Long.MIN_VALUE;

            public DefaultWatermarkGenerator(org.apache.flink.table.runtime.generated.WatermarkGenerator watermarkGenerator) {
                this.innerWatermarkGenerator = watermarkGenerator;
            }

            public void onEvent(RowData rowData, long j, WatermarkOutput watermarkOutput) {
                try {
                    Long currentWatermark = this.innerWatermarkGenerator.currentWatermark(rowData);
                    if (currentWatermark != null) {
                        this.currentWatermark = currentWatermark;
                    }
                } catch (Exception e) {
                    throw new RuntimeException(String.format("Generated WatermarkGenerator fails to generate for row: %s.", rowData), e);
                }
            }

            public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                watermarkOutput.emitWatermark(new Watermark(this.currentWatermark.longValue()));
            }
        }

        public DefaultWatermarkGeneratorSupplier(Configuration configuration, GeneratedWatermarkGenerator generatedWatermarkGenerator) {
            this.configuration = configuration;
            this.generatedWatermarkGenerator = generatedWatermarkGenerator;
        }

        public WatermarkGenerator<RowData> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            ArrayList arrayList = new ArrayList(Arrays.asList(this.generatedWatermarkGenerator.getReferences()));
            arrayList.add(context);
            org.apache.flink.table.runtime.generated.WatermarkGenerator watermarkGenerator = (org.apache.flink.table.runtime.generated.WatermarkGenerator) new GeneratedWatermarkGenerator(this.generatedWatermarkGenerator.getClassName(), this.generatedWatermarkGenerator.getCode(), arrayList.toArray()).newInstance(Thread.currentThread().getContextClassLoader());
            try {
                watermarkGenerator.open(this.configuration);
                return new DefaultWatermarkGenerator(watermarkGenerator);
            } catch (Exception e) {
                throw new RuntimeException("Fail to instantiate generated watermark generator.", e);
            }
        }
    }

    public PushWatermarkIntoTableSourceScanRuleBase(RelOptRuleOperand relOptRuleOperand, String str) {
        super(relOptRuleOperand, str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkLogicalTableSourceScan getNewScan(FlinkLogicalWatermarkAssigner flinkLogicalWatermarkAssigner, RexNode rexNode, FlinkLogicalTableSourceScan flinkLogicalTableSourceScan, TableConfig tableConfig, boolean z) {
        GeneratedWatermarkGenerator generateWatermarkGenerator = WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(tableConfig, FlinkTypeFactory.toLogicalRowType(flinkLogicalTableSourceScan.getRowType()), rexNode, Option.apply("context"));
        Configuration configuration = tableConfig.getConfiguration();
        DefaultWatermarkGeneratorSupplier defaultWatermarkGeneratorSupplier = new DefaultWatermarkGeneratorSupplier(configuration, generateWatermarkGenerator);
        String format = String.format("watermark=[%s]", rexNode);
        WatermarkStrategy forGenerator = WatermarkStrategy.forGenerator(defaultWatermarkGeneratorSupplier);
        Duration duration = (Duration) configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
        if (!duration.isZero() && !duration.isNegative()) {
            forGenerator = forGenerator.withIdleness(duration);
            format = String.format("%s, idletimeout=[%s]", format, Long.valueOf(duration.toMillis()));
        }
        TableSourceTable tableSourceTable = (TableSourceTable) flinkLogicalTableSourceScan.getTable().unwrap(TableSourceTable.class);
        SupportsWatermarkPushDown copy = tableSourceTable.tableSource().copy();
        copy.applyWatermark(forGenerator);
        return FlinkLogicalTableSourceScan.create(flinkLogicalTableSourceScan.getCluster(), z ? tableSourceTable.copy((DynamicTableSource) copy, flinkLogicalWatermarkAssigner.getRowType(), new String[]{format}) : tableSourceTable.copy((DynamicTableSource) copy, flinkLogicalTableSourceScan.getRowType(), new String[]{format}));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean supportsWatermarkPushDown(FlinkLogicalTableSourceScan flinkLogicalTableSourceScan) {
        TableSourceTable tableSourceTable = (TableSourceTable) flinkLogicalTableSourceScan.getTable().unwrap(TableSourceTable.class);
        return tableSourceTable != null && (tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown);
    }
}
