package org.apache.flink.table.runtime.strategy;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
import org.apache.flink.streaming.api.graph.StreamGraphContext;
import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/strategy/AdaptiveBroadcastJoinOptimizationStrategy.class */
public class AdaptiveBroadcastJoinOptimizationStrategy extends BaseAdaptiveJoinOperatorOptimizationStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(AdaptiveBroadcastJoinOptimizationStrategy.class);
    private Long broadcastThreshold;
    private Map<Integer, Map<Integer, Long>> aggregatedInputBytesByTypeNumberAndNodeId;

    public void initialize(StreamGraphContext streamGraphContext) {
        this.broadcastThreshold = (Long) streamGraphContext.getStreamGraph().getConfiguration().get(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD);
        this.aggregatedInputBytesByTypeNumberAndNodeId = new HashMap();
    }

    public boolean onOperatorsFinished(OperatorsFinished operatorsFinished, StreamGraphContext streamGraphContext) {
        visitDownstreamAdaptiveJoinNode(operatorsFinished, streamGraphContext);
        return true;
    }

    @Override // org.apache.flink.table.runtime.strategy.BaseAdaptiveJoinOperatorOptimizationStrategy
    protected void tryOptimizeAdaptiveJoin(OperatorsFinished operatorsFinished, StreamGraphContext streamGraphContext, ImmutableStreamNode immutableStreamNode, List<ImmutableStreamEdge> list, AdaptiveJoin adaptiveJoin) {
        boolean z;
        boolean z2;
        if (canPerformOptimization(immutableStreamNode, streamGraphContext.getStreamGraph().getConfiguration())) {
            for (ImmutableStreamEdge immutableStreamEdge : list) {
                IntermediateDataSetID consumedIntermediateDataSetId = streamGraphContext.getConsumedIntermediateDataSetId(immutableStreamEdge.getEdgeId());
                aggregatedInputBytesByTypeNumber(immutableStreamNode, immutableStreamEdge.getTypeNumber(), ((List) operatorsFinished.getResultInfoMap().get(Integer.valueOf(immutableStreamEdge.getSourceId()))).stream().filter(blockingResultInfo -> {
                    return consumedIntermediateDataSetId.equals(blockingResultInfo.getResultId());
                }).mapToLong((v0) -> {
                    return v0.getNumBytesProduced();
                }).sum());
            }
            if (streamGraphContext.areAllUpstreamNodesFinished(immutableStreamNode)) {
                Long l = this.aggregatedInputBytesByTypeNumberAndNodeId.get(Integer.valueOf(immutableStreamNode.getId())).get(1);
                Preconditions.checkState(l != null, "Left input bytes of adaptive join [%s] is unknown, which is unexpected.", new Object[]{Integer.valueOf(immutableStreamNode.getId())});
                Long l2 = this.aggregatedInputBytesByTypeNumberAndNodeId.get(Integer.valueOf(immutableStreamNode.getId())).get(2);
                Preconditions.checkState(l2 != null, "Right input bytes of adaptive join [%s] is unknown, which is unexpected.", new Object[]{Integer.valueOf(immutableStreamNode.getId())});
                boolean z3 = l.longValue() <= this.broadcastThreshold.longValue();
                boolean z4 = l2.longValue() <= this.broadcastThreshold.longValue();
                boolean z5 = l.longValue() < l2.longValue();
                FlinkJoinType joinType = adaptiveJoin.getJoinType();
                switch (joinType) {
                    case RIGHT:
                        z = z3;
                        z2 = true;
                        break;
                    case INNER:
                        z = z3 || z4;
                        z2 = z5;
                        break;
                    case LEFT:
                    case SEMI:
                    case ANTI:
                        z = z4;
                        z2 = false;
                        break;
                    case FULL:
                    default:
                        throw new RuntimeException(String.format("Unexpected join type %s.", joinType));
                }
                boolean z6 = false;
                if (z) {
                    z6 = tryModifyStreamEdgesForBroadcastJoin(immutableStreamNode.getInEdges(), streamGraphContext, z2);
                    if (z6) {
                        Logger logger = LOG;
                        Object[] objArr = new Object[5];
                        objArr[0] = z2 ? "left" : "right";
                        objArr[1] = Integer.valueOf(immutableStreamNode.getId());
                        objArr[2] = this.broadcastThreshold;
                        objArr[3] = l;
                        objArr[4] = l2;
                        logger.info("The {} input data size of the join node [{}] is small enough, adaptively convert it to a broadcast hash join. Broadcast threshold bytes: {}, left input bytes: {}, right input bytes: {}.", objArr);
                    }
                }
                adaptiveJoin.markAsBroadcastJoin(z6, z6 ? z2 : z5);
                this.aggregatedInputBytesByTypeNumberAndNodeId.remove(Integer.valueOf(immutableStreamNode.getId()));
            }
        }
    }

    private boolean canPerformOptimization(ImmutableStreamNode immutableStreamNode, ReadableConfig readableConfig) {
        return (AdaptiveJoinOptimizationUtils.isBroadcastJoinDisabled(readableConfig) || AdaptiveJoinOptimizationUtils.isBroadcastJoin(immutableStreamNode)) ? false : true;
    }

    private void aggregatedInputBytesByTypeNumber(ImmutableStreamNode immutableStreamNode, int i, long j) {
        this.aggregatedInputBytesByTypeNumberAndNodeId.computeIfAbsent(Integer.valueOf(immutableStreamNode.getId()), num -> {
            return new HashMap();
        }).merge(Integer.valueOf(i), Long.valueOf(j), (v0, v1) -> {
            return Long.sum(v0, v1);
        });
    }

    private List<StreamEdgeUpdateRequestInfo> generateStreamEdgeUpdateRequestInfos(List<ImmutableStreamEdge> list, StreamPartitioner<?> streamPartitioner) {
        ArrayList arrayList = new ArrayList();
        for (ImmutableStreamEdge immutableStreamEdge : list) {
            arrayList.add(new StreamEdgeUpdateRequestInfo(immutableStreamEdge.getEdgeId(), Integer.valueOf(immutableStreamEdge.getSourceId()), Integer.valueOf(immutableStreamEdge.getTargetId())).withOutputPartitioner(streamPartitioner));
        }
        return arrayList;
    }

    private boolean tryModifyStreamEdgesForBroadcastJoin(List<ImmutableStreamEdge> list, StreamGraphContext streamGraphContext, boolean z) {
        List<StreamEdgeUpdateRequestInfo> generateStreamEdgeUpdateRequestInfos = generateStreamEdgeUpdateRequestInfos(AdaptiveJoinOptimizationUtils.filterEdges(list, z ? 1 : 2), new BroadcastPartitioner());
        generateStreamEdgeUpdateRequestInfos.addAll(generateStreamEdgeUpdateRequestInfos(AdaptiveJoinOptimizationUtils.filterEdges(list, z ? 2 : 1), new ForwardPartitioner()));
        return streamGraphContext.modifyStreamEdge(generateStreamEdgeUpdateRequestInfos);
    }
}
