package org.apache.flink.table.planner.plan.nodes.exec.batch;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.IntStream;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.LongHashJoinGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.fusion.OpFusionCodegenSpecGenerator;
import org.apache.flink.table.planner.plan.fusion.generator.TwoInputOpFusionCodegenSpecGenerator;
import org.apache.flink.table.planner.plan.fusion.spec.HashJoinFusionCodegenSpec;
import org.apache.flink.table.planner.plan.nodes.exec.AdaptiveJoinExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.HashJoinOperatorUtil;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.OperatorType;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.HashJoinType;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@ExecNodeMetadata(name = "batch-exec-join", version = 1, producedTransformations = {"join"}, consumedOptions = {"table.exec.resource.hash-join.memory", "table.exec.resource.external-buffer-memory", "table.exec.resource.sort.memory", "table.exec.spill-compression.enabled", "table.exec.spill-compression.block-size"}, minPlanVersion = FlinkVersion.v2_0, minStateVersion = FlinkVersion.v2_0)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.class */
public class BatchExecHashJoin extends ExecNodeBase<RowData> implements BatchExecNode<RowData>, SingleTransformationTranslator<RowData>, AdaptiveJoinExecNode {
    public static final String JOIN_TRANSFORMATION = "join";
    public static final String FIELD_NAME_JOIN_SPEC = "joinSpec";
    public static final String FIELD_NAME_IS_BROADCAST = "isBroadcast";
    public static final String FIELD_NAME_LEFT_IS_BUILD = "leftIsBuild";
    public static final String FIELD_NAME_ESTIMATED_LEFT_AVG_ROW_SIZE = "estimatedLeftAvgRowSize";
    public static final String FIELD_NAME_ESTIMATED_RIGHT_AVG_ROW_SIZE = "estimatedRightAvgRowSize";
    public static final String FIELD_NAME_ESTIMATED_LEFT_ROW_COUNT = "estimatedLeftRowCount";
    public static final String FIELD_NAME_ESTIMATED_RIGHT_ROW_COUNT = "estimatedRightRowCount";
    public static final String FIELD_NAME_TRY_DISTINCT_BUILD_ROW = "tryDistinctBuildRow";
    public static final String FIELD_NAME_WITH_JOIN_STRATEGY_HINT = "withJobStrategyHint";

    @JsonProperty("joinSpec")
    private final JoinSpec joinSpec;

    @JsonProperty(FIELD_NAME_IS_BROADCAST)
    private final boolean isBroadcast;

    @JsonProperty("leftIsBuild")
    private final boolean leftIsBuild;

    @JsonProperty(FIELD_NAME_ESTIMATED_LEFT_AVG_ROW_SIZE)
    private final int estimatedLeftAvgRowSize;

    @JsonProperty(FIELD_NAME_ESTIMATED_RIGHT_AVG_ROW_SIZE)
    private final int estimatedRightAvgRowSize;

    @JsonProperty(FIELD_NAME_ESTIMATED_LEFT_ROW_COUNT)
    private final long estimatedLeftRowCount;

    @JsonProperty(FIELD_NAME_ESTIMATED_RIGHT_ROW_COUNT)
    private final long estimatedRightRowCount;

    @JsonProperty(FIELD_NAME_WITH_JOIN_STRATEGY_HINT)
    private final boolean withJobStrategyHint;

    @JsonProperty(FIELD_NAME_TRY_DISTINCT_BUILD_ROW)
    private final boolean tryDistinctBuildRow;

    public BatchExecHashJoin(ReadableConfig readableConfig, JoinSpec joinSpec, int i, int i2, long j, long j2, boolean z, boolean z2, boolean z3, InputProperty inputProperty, InputProperty inputProperty2, RowType rowType, boolean z4, String str) {
        super(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(BatchExecHashJoin.class), ExecNodeContext.newPersistedConfig(BatchExecHashJoin.class, readableConfig), Arrays.asList(inputProperty, inputProperty2), rowType, str);
        this.joinSpec = (JoinSpec) Preconditions.checkNotNull(joinSpec);
        this.isBroadcast = z;
        this.leftIsBuild = z2;
        this.estimatedLeftAvgRowSize = i;
        this.estimatedRightAvgRowSize = i2;
        this.estimatedLeftRowCount = j;
        this.estimatedRightRowCount = j2;
        this.tryDistinctBuildRow = z3;
        this.withJobStrategyHint = z4;
    }

    @JsonCreator
    public BatchExecHashJoin(@JsonProperty("id") int i, @JsonProperty("type") ExecNodeContext execNodeContext, @JsonProperty("configuration") ReadableConfig readableConfig, @JsonProperty("joinSpec") JoinSpec joinSpec, @JsonProperty("estimatedLeftAvgRowSize") int i2, @JsonProperty("estimatedRightAvgRowSize") int i3, @JsonProperty("estimatedLeftRowCount") long j, @JsonProperty("estimatedRightRowCount") long j2, @JsonProperty("isBroadcast") boolean z, @JsonProperty("leftIsBuild") boolean z2, @JsonProperty("tryDistinctBuildRow") boolean z3, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str, @JsonProperty("withJobStrategyHint") boolean z4) {
        super(i, execNodeContext, readableConfig, list, rowType, str);
        Preconditions.checkArgument(list.size() == 2);
        this.joinSpec = (JoinSpec) Preconditions.checkNotNull(joinSpec);
        this.isBroadcast = z;
        this.leftIsBuild = z2;
        this.estimatedLeftAvgRowSize = i2;
        this.estimatedRightAvgRowSize = i3;
        this.estimatedLeftRowCount = j;
        this.estimatedRightRowCount = j2;
        this.tryDistinctBuildRow = z3;
        this.withJobStrategyHint = z4;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        Transformation<?> transformation;
        Transformation<?> transformation2;
        ExecEdge execEdge = getInputEdges().get(0);
        ExecEdge execEdge2 = getInputEdges().get(1);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        Transformation<?> translateToPlan2 = execEdge2.translateToPlan(plannerBase);
        RowType outputType = execEdge.getOutputType();
        RowType outputType2 = execEdge2.getOutputType();
        long managedMemory = JoinUtil.getManagedMemory(this.joinSpec.getJoinType(), execNodeConfig);
        StreamOperatorFactory<RowData> generateOperatorFactory = HashJoinOperatorUtil.generateOperatorFactory(this.joinSpec.getLeftKeys(), this.joinSpec.getRightKeys(), this.joinSpec.getJoinType(), this.joinSpec.getFilterNulls(), outputType, outputType2, JoinUtil.generateConditionFunction((ReadableConfig) execNodeConfig, plannerBase.getFlinkContext().getClassLoader(), this.joinSpec.getNonEquiCondition().orElse(null), (LogicalType) outputType, (LogicalType) outputType2), this.leftIsBuild, this.estimatedLeftAvgRowSize, this.estimatedRightAvgRowSize, this.estimatedLeftRowCount, this.estimatedRightRowCount, this.tryDistinctBuildRow, managedMemory, execNodeConfig, plannerBase.getFlinkContext().getClassLoader());
        if (this.leftIsBuild) {
            transformation = translateToPlan;
            transformation2 = translateToPlan2;
        } else {
            transformation = translateToPlan2;
            transformation2 = translateToPlan;
        }
        return ExecNodeUtil.createTwoInputTransformation((Transformation) transformation, (Transformation) transformation2, createTransformationMeta("join", execNodeConfig), (StreamOperatorFactory) generateOperatorFactory, (TypeInformation) InternalTypeInfo.of(getOutputType()), transformation2.getParallelism(), managedMemory, false);
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase, org.apache.flink.table.planner.plan.nodes.exec.FusionCodegenExecNode
    public boolean supportFusionCodegen() {
        RowType outputType = getInputEdges().get(0).getOutputType();
        IntStream of = IntStream.of(this.joinSpec.getLeftKeys());
        Objects.requireNonNull(outputType);
        RowType of2 = RowType.of((LogicalType[]) of.mapToObj(outputType::getTypeAt).toArray(i -> {
            return new LogicalType[i];
        }));
        FlinkJoinType joinType = this.joinSpec.getJoinType();
        return LongHashJoinGenerator.support(HashJoinType.of(this.leftIsBuild, joinType.isLeftOuter(), joinType.isRightOuter(), joinType == FlinkJoinType.SEMI, joinType == FlinkJoinType.ANTI), of2, this.joinSpec.getFilterNulls());
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected OpFusionCodegenSpecGenerator translateToFusionCodegenSpecInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig, CodeGeneratorContext codeGeneratorContext) {
        OpFusionCodegenSpecGenerator translateToFusionCodegenSpec = getInputEdges().get(0).translateToFusionCodegenSpec(plannerBase, codeGeneratorContext);
        OpFusionCodegenSpecGenerator translateToFusionCodegenSpec2 = getInputEdges().get(1).translateToFusionCodegenSpec(plannerBase, codeGeneratorContext);
        TwoInputOpFusionCodegenSpecGenerator twoInputOpFusionCodegenSpecGenerator = new TwoInputOpFusionCodegenSpecGenerator(translateToFusionCodegenSpec, translateToFusionCodegenSpec2, JoinUtil.getManagedMemory(this.joinSpec.getJoinType(), execNodeConfig), getOutputType(), new HashJoinFusionCodegenSpec(new CodeGeneratorContext(execNodeConfig, plannerBase.getFlinkContext().getClassLoader(), codeGeneratorContext), this.isBroadcast, this.leftIsBuild, this.joinSpec, this.estimatedLeftAvgRowSize, this.estimatedRightAvgRowSize, this.estimatedLeftRowCount, this.estimatedRightRowCount, ((Boolean) execNodeConfig.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED)).booleanValue(), (int) ((MemorySize) execNodeConfig.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)).getBytes()));
        translateToFusionCodegenSpec.addOutput(1, twoInputOpFusionCodegenSpecGenerator);
        translateToFusionCodegenSpec2.addOutput(2, twoInputOpFusionCodegenSpecGenerator);
        return twoInputOpFusionCodegenSpecGenerator;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.AdaptiveJoinExecNode
    public boolean canBeTransformedToAdaptiveJoin() {
        return (this.withJobStrategyHint || this.joinSpec.getJoinType() == FlinkJoinType.FULL) ? false : true;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.AdaptiveJoinExecNode
    public BatchExecAdaptiveJoin toAdaptiveJoinNode() {
        return new BatchExecAdaptiveJoin(getPersistedConfig(), this.joinSpec, this.estimatedLeftAvgRowSize, this.estimatedRightAvgRowSize, this.estimatedLeftRowCount, this.estimatedRightRowCount, this.leftIsBuild, this.tryDistinctBuildRow, getInputProperties(), getOutputType(), getDescription(), OperatorType.ShuffleHashJoin);
    }
}
