package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.tools.RelBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.collector.ListenableCollector;
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
import org.apache.flink.table.runtime.generated.GeneratedCollector;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
import org.apache.flink.table.runtime.types.PlannerTypeUtils;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.sources.LookupableTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.class */
public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> {
    public static final String LOOKUP_JOIN_TRANSFORMATION = "lookup-join";
    public static final String LOOKUP_JOIN_MATERIALIZE_TRANSFORMATION = "lookup-join-materialize";
    public static final String FIELD_NAME_JOIN_TYPE = "joinType";
    public static final String FIELD_NAME_JOIN_CONDITION = "joinCondition";
    public static final String FIELD_NAME_TEMPORAL_TABLE = "temporalTable";
    public static final String FIELD_NAME_LOOKUP_KEYS = "lookupKeys";
    public static final String FIELD_NAME_PROJECTION_ON_TEMPORAL_TABLE = "projectionOnTemporalTable";
    public static final String FIELD_NAME_FILTER_ON_TEMPORAL_TABLE = "filterOnTemporalTable";
    public static final String FIELD_NAME_INPUT_CHANGELOG_MODE = "inputChangelogMode";
    public static final String FIELD_NAME_ASYNC_OPTIONS = "asyncOptions";
    public static final String FIELD_NAME_RETRY_OPTIONS = "retryOptions";

    @JsonProperty("joinType")
    private final FlinkJoinType joinType;

    @JsonProperty(FIELD_NAME_LOOKUP_KEYS)
    private final Map<Integer, LookupJoinUtil.LookupKey> lookupKeys;

    @JsonProperty(FIELD_NAME_TEMPORAL_TABLE)
    private final TemporalTableSourceSpec temporalTableSourceSpec;

    @Nullable
    @JsonProperty(FIELD_NAME_PROJECTION_ON_TEMPORAL_TABLE)
    private final List<RexNode> projectionOnTemporalTable;

    @Nullable
    @JsonProperty(FIELD_NAME_FILTER_ON_TEMPORAL_TABLE)
    private final RexNode filterOnTemporalTable;

    @Nullable
    @JsonProperty(FIELD_NAME_JOIN_CONDITION)
    private final RexNode joinCondition;

    @JsonProperty("inputChangelogMode")
    private final ChangelogMode inputChangelogMode;

    @JsonInclude(JsonInclude.Include.NON_NULL)
    @Nullable
    @JsonProperty(FIELD_NAME_ASYNC_OPTIONS)
    private final LookupJoinUtil.AsyncLookupOptions asyncLookupOptions;

    @JsonInclude(JsonInclude.Include.NON_NULL)
    @Nullable
    @JsonProperty(FIELD_NAME_RETRY_OPTIONS)
    private final LookupJoinUtil.RetryLookupOptions retryOptions;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: protected */
    public CommonExecLookupJoin(int i, ExecNodeContext execNodeContext, ReadableConfig readableConfig, FlinkJoinType flinkJoinType, @Nullable RexNode rexNode, TemporalTableSourceSpec temporalTableSourceSpec, Map<Integer, LookupJoinUtil.LookupKey> map, @Nullable List<RexNode> list, @Nullable RexNode rexNode2, @Nullable LookupJoinUtil.AsyncLookupOptions asyncLookupOptions, @Nullable LookupJoinUtil.RetryLookupOptions retryLookupOptions, ChangelogMode changelogMode, List<InputProperty> list2, RowType rowType, String str) {
        super(i, execNodeContext, readableConfig, list2, rowType, str);
        Preconditions.checkArgument(list2.size() == 1);
        this.joinType = (FlinkJoinType) Preconditions.checkNotNull(flinkJoinType);
        this.joinCondition = rexNode;
        this.lookupKeys = Collections.unmodifiableMap((Map) Preconditions.checkNotNull(map));
        this.temporalTableSourceSpec = (TemporalTableSourceSpec) Preconditions.checkNotNull(temporalTableSourceSpec);
        this.projectionOnTemporalTable = list;
        this.filterOnTemporalTable = rexNode2;
        this.inputChangelogMode = changelogMode;
        this.asyncLookupOptions = asyncLookupOptions;
        this.retryOptions = retryLookupOptions;
    }

    public TemporalTableSourceSpec getTemporalTableSourceSpec() {
        return this.temporalTableSourceSpec;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Transformation<RowData> createJoinTransformation(PlannerBase plannerBase, ExecNodeConfig execNodeConfig, boolean z, boolean z2) {
        RelOptTable temporalTable = this.temporalTableSourceSpec.getTemporalTable(plannerBase.getFlinkContext(), ShortcutUtils.unwrapTypeFactory(plannerBase));
        validate(temporalTable);
        ExecEdge execEdge = getInputEdges().get(0);
        RowType rowType = (RowType) execEdge.getOutputType();
        RowType logicalRowType = FlinkTypeFactory.toLogicalRowType(temporalTable.getRowType());
        RowType rowType2 = (RowType) getOutputType();
        validateLookupKeyType(this.lookupKeys, rowType, logicalRowType);
        boolean z3 = null != this.asyncLookupOptions;
        UserDefinedFunction lookupFunction = LookupJoinUtil.getLookupFunction(temporalTable, this.lookupKeys.keySet(), plannerBase.getFlinkContext().getClassLoader(), z3, this.retryOptions != null ? this.retryOptions.toRetryStrategy() : null);
        UserDefinedFunctionHelper.prepareInstance(execNodeConfig, lookupFunction);
        boolean z4 = this.joinType == FlinkJoinType.LEFT;
        if (z3 && !$assertionsDisabled && !(lookupFunction instanceof AsyncTableFunction)) {
            throw new AssertionError();
        }
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        if (!z) {
            return ExecNodeUtil.createOneInputTransformation((Transformation) translateToPlan, createTransformationMeta(LOOKUP_JOIN_TRANSFORMATION, execNodeConfig), (StreamOperatorFactory) (z3 ? createAsyncLookupJoin(temporalTable, execNodeConfig, plannerBase.getFlinkContext().getClassLoader(), this.lookupKeys, (AsyncTableFunction) lookupFunction, plannerBase.createRelBuilder(), rowType, logicalRowType, rowType2, z4, this.asyncLookupOptions) : createSyncLookupJoin(temporalTable, execNodeConfig, plannerBase.getFlinkContext().getClassLoader(), this.lookupKeys, (TableFunction) lookupFunction, plannerBase.createRelBuilder(), rowType, logicalRowType, rowType2, z4, plannerBase.getExecEnv().getConfig().isObjectReuseEnabled())), (TypeInformation) InternalTypeInfo.of(rowType2), translateToPlan.getParallelism(), false);
        }
        if ($assertionsDisabled || !(z3 || this.inputChangelogMode.containsOnly(RowKind.INSERT))) {
            return createSyncLookupJoinWithState(translateToPlan, temporalTable, execNodeConfig, plannerBase.getFlinkContext().getClassLoader(), this.lookupKeys, (TableFunction) lookupFunction, plannerBase.createRelBuilder(), rowType, logicalRowType, rowType2, z4, plannerBase.getExecEnv().getConfig().isObjectReuseEnabled(), z2);
        }
        throw new AssertionError();
    }

    protected abstract Transformation<RowData> createSyncLookupJoinWithState(Transformation<RowData> transformation, RelOptTable relOptTable, ExecNodeConfig execNodeConfig, ClassLoader classLoader, Map<Integer, LookupJoinUtil.LookupKey> map, TableFunction<?> tableFunction, RelBuilder relBuilder, RowType rowType, RowType rowType2, RowType rowType3, boolean z, boolean z2, boolean z3);

    protected void validateLookupKeyType(Map<Integer, LookupJoinUtil.LookupKey> map, RowType rowType, RowType rowType2) {
        LinkedList linkedList = new LinkedList();
        map.entrySet().stream().filter(entry -> {
            return entry.getValue() instanceof LookupJoinUtil.FieldRefLookupKey;
        }).forEach(entry2 -> {
            int intValue = ((Integer) entry2.getKey()).intValue();
            int i = ((LookupJoinUtil.FieldRefLookupKey) entry2.getValue()).index;
            LogicalType typeAt = rowType.getTypeAt(i);
            LogicalType typeAt2 = rowType2.getTypeAt(intValue);
            if (PlannerTypeUtils.isInteroperable(typeAt, typeAt2)) {
                return;
            }
            linkedList.add(String.format("%s[%s]=%s[%s]", (String) rowType.getFieldNames().get(i), typeAt, (String) rowType2.getFieldNames().get(intValue), typeAt2));
        });
        if (!linkedList.isEmpty()) {
            throw new TableException("Temporal table join requires equivalent condition of the same type, but the condition is " + StringUtils.join(linkedList, ","));
        }
    }

    private StreamOperatorFactory<RowData> createAsyncLookupJoin(RelOptTable relOptTable, ExecNodeConfig execNodeConfig, ClassLoader classLoader, Map<Integer, LookupJoinUtil.LookupKey> map, AsyncTableFunction<Object> asyncTableFunction, RelBuilder relBuilder, RowType rowType, RowType rowType2, RowType rowType3, boolean z, LookupJoinUtil.AsyncLookupOptions asyncLookupOptions) {
        AsyncLookupJoinWithCalcRunner asyncLookupJoinRunner;
        LookupJoinCodeGenerator.GeneratedTableFunctionWithDataType<AsyncFunction<RowData, Object>> generateAsyncLookupFunction = LookupJoinCodeGenerator.generateAsyncLookupFunction(execNodeConfig, classLoader, ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory(), rowType, rowType2, rowType3, map, LookupJoinUtil.getOrderedLookupKeys(map.keySet()), asyncTableFunction, StringUtils.join(relOptTable.getQualifiedName(), "."));
        RelDataType projectionOutputRelDataType = getProjectionOutputRelDataType(relBuilder);
        RowType rightOutputRowType = getRightOutputRowType(projectionOutputRelDataType, rowType2);
        GeneratedResultFuture<TableFunctionResultFuture<RowData>> generateTableAsyncCollector = LookupJoinCodeGenerator.generateTableAsyncCollector(execNodeConfig, classLoader, "TableFunctionResultFuture", rowType, rightOutputRowType, JavaScalaConversionUtil.toScala(Optional.ofNullable(this.joinCondition)));
        DataStructureConverter converter = DataStructureConverters.getConverter(generateAsyncLookupFunction.dataType());
        if (this.projectionOnTemporalTable != null) {
            asyncLookupJoinRunner = new AsyncLookupJoinWithCalcRunner(generateAsyncLookupFunction.tableFunc(), converter, LookupJoinCodeGenerator.generateCalcMapFunction(execNodeConfig, classLoader, JavaScalaConversionUtil.toScala(this.projectionOnTemporalTable), this.filterOnTemporalTable, projectionOutputRelDataType, rowType2), generateTableAsyncCollector, InternalSerializers.create(rightOutputRowType), z, asyncLookupOptions.asyncBufferCapacity);
        } else {
            asyncLookupJoinRunner = new AsyncLookupJoinRunner(generateAsyncLookupFunction.tableFunc(), converter, generateTableAsyncCollector, InternalSerializers.create(rightOutputRowType), z, asyncLookupOptions.asyncBufferCapacity);
        }
        return new AsyncWaitOperatorFactory(asyncLookupJoinRunner, asyncLookupOptions.asyncTimeout, asyncLookupOptions.asyncBufferCapacity, asyncLookupOptions.asyncOutputMode);
    }

    private StreamOperatorFactory<RowData> createSyncLookupJoin(RelOptTable relOptTable, ExecNodeConfig execNodeConfig, ClassLoader classLoader, Map<Integer, LookupJoinUtil.LookupKey> map, TableFunction<?> tableFunction, RelBuilder relBuilder, RowType rowType, RowType rowType2, RowType rowType3, boolean z, boolean z2) {
        return SimpleOperatorFactory.of(new ProcessOperator(createSyncLookupJoinFunction(relOptTable, execNodeConfig, classLoader, map, tableFunction, relBuilder, rowType, rowType2, rowType3, z, z2)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RelDataType getProjectionOutputRelDataType(RelBuilder relBuilder) {
        if (this.projectionOnTemporalTable != null) {
            return RexUtil.createStructType(ShortcutUtils.unwrapTypeFactory(relBuilder), this.projectionOnTemporalTable);
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RowType getRightOutputRowType(RelDataType relDataType, RowType rowType) {
        return relDataType != null ? FlinkTypeFactory.toLogicalType(relDataType) : rowType;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ProcessFunction<RowData, RowData> createSyncLookupJoinFunction(RelOptTable relOptTable, ExecNodeConfig execNodeConfig, ClassLoader classLoader, Map<Integer, LookupJoinUtil.LookupKey> map, TableFunction<?> tableFunction, RelBuilder relBuilder, RowType rowType, RowType rowType2, RowType rowType3, boolean z, boolean z2) {
        GeneratedFunction<FlatMapFunction<RowData, RowData>> generateSyncLookupFunction = LookupJoinCodeGenerator.generateSyncLookupFunction(execNodeConfig, classLoader, ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory(), rowType, rowType2, rowType3, map, LookupJoinUtil.getOrderedLookupKeys(map.keySet()), tableFunction, StringUtils.join(relOptTable.getQualifiedName(), "."), z2);
        RelDataType projectionOutputRelDataType = getProjectionOutputRelDataType(relBuilder);
        RowType rightOutputRowType = getRightOutputRowType(projectionOutputRelDataType, rowType2);
        GeneratedCollector<ListenableCollector<RowData>> generateCollector = LookupJoinCodeGenerator.generateCollector(new CodeGeneratorContext(execNodeConfig, classLoader), rowType, rightOutputRowType, rowType3, JavaScalaConversionUtil.toScala(Optional.ofNullable(this.joinCondition)), JavaScalaConversionUtil.toScala(Optional.empty()), true);
        return this.projectionOnTemporalTable != null ? new LookupJoinWithCalcRunner(generateSyncLookupFunction, LookupJoinCodeGenerator.generateCalcMapFunction(execNodeConfig, classLoader, JavaScalaConversionUtil.toScala(this.projectionOnTemporalTable), this.filterOnTemporalTable, projectionOutputRelDataType, rowType2), generateCollector, z, rightOutputRowType.getFieldCount()) : new LookupJoinRunner(generateSyncLookupFunction, generateCollector, z, rightOutputRowType.getFieldCount());
    }

    private void validate(RelOptTable relOptTable) {
        validateTableSource(relOptTable);
        if (this.lookupKeys.isEmpty()) {
            throw new TableException(String.format("Temporal table join requires an equality condition on fields of %s.", getTableSourceDescription(relOptTable)));
        }
        if (this.joinType != FlinkJoinType.LEFT && this.joinType != FlinkJoinType.INNER) {
            throw new TableException(String.format("Temporal table join currently only support INNER JOIN and LEFT JOIN, but was %s JOIN.", this.joinType.toString()));
        }
    }

    private String getTableSourceDescription(RelOptTable relOptTable) {
        return relOptTable instanceof TableSourceTable ? String.format("table [%s]", ((TableSourceTable) relOptTable).contextResolvedTable().getIdentifier().asSummaryString()) : relOptTable instanceof LegacyTableSourceTable ? String.format("table [%s]", ((LegacyTableSourceTable) relOptTable).tableIdentifier().asSummaryString()) : "";
    }

    private void validateTableSource(RelOptTable relOptTable) {
        if (relOptTable instanceof TableSourceTable) {
            if (!(((TableSourceTable) relOptTable).tableSource() instanceof LookupTableSource)) {
                throw new TableException(String.format("%s must implement LookupTableSource interface if it is used in temporal table join.", getTableSourceDescription(relOptTable)));
            }
        } else {
            if (!(relOptTable instanceof LegacyTableSourceTable)) {
                throw new TableException(String.format("table [%s] is neither TableSourceTable not LegacyTableSourceTable.", StringUtils.join(relOptTable.getQualifiedName(), ".")));
            }
            TableSource tableSource = ((LegacyTableSourceTable) relOptTable).tableSource();
            if (!(tableSource instanceof LookupableTableSource)) {
                throw new TableException(String.format("%s must implement LookupableTableSource interface if it is used in temporal table join.", getTableSourceDescription(relOptTable)));
            }
            TypeInformation fromDataTypeToTypeInfo = TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(tableSource.getProducedDataType());
            if ((!(fromDataTypeToTypeInfo instanceof InternalTypeInfo) || !fromDataTypeToTypeInfo.getTypeClass().isAssignableFrom(RowData.class)) && !(fromDataTypeToTypeInfo instanceof RowTypeInfo)) {
                throw new TableException(String.format("Temporal table join only support Row or RowData type as return type of temporal table. But was %s.", fromDataTypeToTypeInfo));
            }
        }
    }

    static {
        $assertionsDisabled = !CommonExecLookupJoin.class.desiredAssertionStatus();
    }
}
