package org.apache.flink.table.runtime.strategy;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
import org.apache.flink.streaming.api.graph.StreamGraphContext;
import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
import org.apache.flink.streaming.api.graph.util.StreamNodeUpdateRequestInfo;
import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/strategy/PostProcessAdaptiveJoinStrategy.class */
public class PostProcessAdaptiveJoinStrategy extends BaseAdaptiveJoinOperatorOptimizationStrategy {
    public boolean onOperatorsFinished(OperatorsFinished operatorsFinished, StreamGraphContext streamGraphContext) {
        visitDownstreamAdaptiveJoinNode(operatorsFinished, streamGraphContext);
        return true;
    }

    @Override // org.apache.flink.table.runtime.strategy.BaseAdaptiveJoinOperatorOptimizationStrategy
    protected void tryOptimizeAdaptiveJoin(OperatorsFinished operatorsFinished, StreamGraphContext streamGraphContext, ImmutableStreamNode immutableStreamNode, List<ImmutableStreamEdge> list, AdaptiveJoin adaptiveJoin) {
        if (streamGraphContext.areAllUpstreamNodesFinished(immutableStreamNode)) {
            if (adaptiveJoin.shouldReorderInputs() && (!streamGraphContext.modifyStreamEdge(generateStreamEdgeUpdateRequestInfosForInputsReordered(immutableStreamNode)) || !streamGraphContext.modifyStreamNode(generateStreamNodeUpdateRequestInfosForInputsReordered(immutableStreamNode)))) {
                throw new RuntimeException("Unexpected error occurs while reordering the inputs of the adaptive join node, potentially leading to data inaccuracies. Exceptions will be thrown.");
            }
            adaptiveJoin.genOperatorFactory(streamGraphContext.getStreamGraph().getUserClassLoader(), streamGraphContext.getStreamGraph().getConfiguration());
        }
    }

    private static List<StreamEdgeUpdateRequestInfo> generateStreamEdgeUpdateRequestInfosForInputsReordered(ImmutableStreamNode immutableStreamNode) {
        ArrayList arrayList = new ArrayList();
        for (ImmutableStreamEdge immutableStreamEdge : immutableStreamNode.getInEdges()) {
            StreamEdgeUpdateRequestInfo streamEdgeUpdateRequestInfo = new StreamEdgeUpdateRequestInfo(immutableStreamEdge.getEdgeId(), Integer.valueOf(immutableStreamEdge.getSourceId()), Integer.valueOf(immutableStreamEdge.getTargetId()));
            streamEdgeUpdateRequestInfo.withTypeNumber(immutableStreamEdge.getTypeNumber() == 1 ? 2 : 1);
            arrayList.add(streamEdgeUpdateRequestInfo);
        }
        return arrayList;
    }

    private List<StreamNodeUpdateRequestInfo> generateStreamNodeUpdateRequestInfosForInputsReordered(ImmutableStreamNode immutableStreamNode) {
        ArrayList arrayList = new ArrayList();
        TypeSerializer[] typeSerializersIn = immutableStreamNode.getTypeSerializersIn();
        Preconditions.checkState(typeSerializersIn.length == 2, String.format("Adaptive join currently only supports two inputs, but the join node [%s] has received %s inputs.", Integer.valueOf(immutableStreamNode.getId()), Integer.valueOf(typeSerializersIn.length)));
        arrayList.add(new StreamNodeUpdateRequestInfo(Integer.valueOf(immutableStreamNode.getId())).withTypeSerializersIn(new TypeSerializer[]{typeSerializersIn[1], typeSerializersIn[0]}));
        return arrayList;
    }
}
