package org.apache.flink.iteration;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.iteration.IterationConfig;
import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
import org.apache.flink.iteration.operator.HeadOperator;
import org.apache.flink.iteration.operator.HeadOperatorFactory;
import org.apache.flink.iteration.operator.InputOperator;
import org.apache.flink.iteration.operator.OperatorWrapper;
import org.apache.flink.iteration.operator.OutputOperator;
import org.apache.flink.iteration.operator.ReplayOperator;
import org.apache.flink.iteration.operator.TailOperator;
import org.apache.flink.iteration.operator.allround.AllRoundOperatorWrapper;
import org.apache.flink.iteration.operator.perround.PerRoundOperatorWrapper;
import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

@Experimental
/* loaded from: input_file:org/apache/flink/iteration/Iterations.class */
public class Iterations {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/iteration/Iterations$CriteriaMergeProcessor.class */
    public static class CriteriaMergeProcessor extends CoProcessFunction<Object, Object, Object> {
        private CriteriaMergeProcessor() {
        }

        public void processElement1(Object obj, CoProcessFunction<Object, Object, Object>.Context context, Collector<Object> collector) throws Exception {
        }

        public void processElement2(Object obj, CoProcessFunction<Object, Object, Object>.Context context, Collector<Object> collector) throws Exception {
            collector.collect(obj);
        }
    }

    public static DataStreamList iterateUnboundedStreams(DataStreamList dataStreamList, DataStreamList dataStreamList2, IterationBody iterationBody) {
        return createIteration(dataStreamList, dataStreamList2, Collections.emptySet(), iterationBody, new AllRoundOperatorWrapper(), false);
    }

    public static DataStreamList iterateBoundedStreamsUntilTermination(DataStreamList dataStreamList, ReplayableDataStreamList replayableDataStreamList, IterationConfig iterationConfig, IterationBody iterationBody) {
        OperatorWrapper allRoundOperatorWrapper = iterationConfig.getOperatorLifeCycle() == IterationConfig.OperatorLifeCycle.ALL_ROUND ? new AllRoundOperatorWrapper() : new PerRoundOperatorWrapper();
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(replayableDataStreamList.getReplayedDataStreams());
        arrayList.addAll(replayableDataStreamList.getNonReplayedStreams());
        return createIteration(dataStreamList, new DataStreamList(arrayList), (Set) IntStream.range(0, replayableDataStreamList.getReplayedDataStreams().size()).boxed().collect(Collectors.toSet()), iterationBody, allRoundOperatorWrapper, true);
    }

    private static DataStreamList createIteration(DataStreamList dataStreamList, DataStreamList dataStreamList2, Set<Integer> set, IterationBody iterationBody, OperatorWrapper<?, IterationRecord<?>> operatorWrapper, boolean z) {
        Preconditions.checkState(dataStreamList.size() > 0, "There should be at least one variable stream");
        IterationID iterationID = new IterationID();
        List<TypeInformation<?>> typeInfos = getTypeInfos(dataStreamList);
        List<TypeInformation<?>> typeInfos2 = getTypeInfos(dataStreamList2);
        int sum = map(dataStreamList, dataStream -> {
            return Integer.valueOf(dataStream.getParallelism() > 0 ? dataStream.getParallelism() : dataStream.getExecutionEnvironment().getConfig().getParallelism());
        }).stream().mapToInt(num -> {
            return num.intValue();
        }).sum();
        DataStreamList addHeads = addHeads(dataStreamList, addInputs(dataStreamList), iterationID, sum, false, 0);
        DataStreamList addInputs = addInputs(dataStreamList2);
        if (set.size() > 0) {
            addInputs = addReplayer(addHeads.get(0), dataStreamList2, addInputs, set);
        }
        StreamExecutionEnvironment executionEnvironment = dataStreamList.get(0).getExecutionEnvironment();
        DraftExecutionEnvironment draftExecutionEnvironment = new DraftExecutionEnvironment(executionEnvironment, operatorWrapper);
        IterationBodyResult process = iterationBody.process(addDraftSources(addHeads, draftExecutionEnvironment, typeInfos), addDraftSources(addInputs, draftExecutionEnvironment, typeInfos2));
        ensuresTransformationAdded(process.getFeedbackVariableStreams(), draftExecutionEnvironment);
        ensuresTransformationAdded(process.getOutputStreams(), draftExecutionEnvironment);
        draftExecutionEnvironment.copyToActualEnvironment();
        DataStreamList actualDataStreams = getActualDataStreams(process.getFeedbackVariableStreams(), draftExecutionEnvironment);
        Preconditions.checkState(actualDataStreams.size() == dataStreamList.size(), "The number of feedback streams " + actualDataStreams.size() + " does not match the initialized one " + dataStreamList.size());
        for (int i = 0; i < actualDataStreams.size(); i++) {
            Preconditions.checkState(actualDataStreams.get(i).getParallelism() == addHeads.get(i).getParallelism(), String.format("The feedback stream %d have different parallelism %d with the initial stream, which is %d", Integer.valueOf(i), Integer.valueOf(actualDataStreams.get(i).getParallelism()), Integer.valueOf(addHeads.get(i).getParallelism())));
        }
        DataStreamList addTails = addTails(actualDataStreams, iterationID, 0);
        for (int i2 = 0; i2 < addHeads.size(); i2++) {
            String str = "co-" + iterationID.toHexString() + "-" + i2;
            addHeads.get(i2).getTransformation().setCoLocationGroupKey(str);
            addTails.get(i2).getTransformation().setCoLocationGroupKey(str);
        }
        ArrayList arrayList = new ArrayList(addTails.getDataStreams());
        Preconditions.checkState(z || process.getTerminationCriteria() == null, "The current iteration type does not support the termination criteria.");
        if (process.getTerminationCriteria() != null) {
            arrayList.addAll(addCriteriaStream(process.getTerminationCriteria(), iterationID, executionEnvironment, draftExecutionEnvironment, dataStreamList, addHeads, sum).getDataStreams());
        }
        return addOutputs(getActualDataStreams(process.getOutputStreams(), draftExecutionEnvironment), unionAllTails(executionEnvironment, new DataStreamList(arrayList)));
    }

    private static DataStreamList addReplayer(DataStream<?> dataStream, DataStreamList dataStreamList, DataStreamList dataStreamList2, Set<Integer> set) {
        ArrayList arrayList = new ArrayList(dataStreamList2.size());
        for (int i = 0; i < dataStreamList2.size(); i++) {
            if (set.contains(Integer.valueOf(i))) {
                arrayList.add(dataStreamList2.get(i).connect(((SingleOutputStreamOperator) dataStream).getSideOutput(HeadOperator.ALIGN_NOTIFY_OUTPUT_TAG).broadcast()).transform("Replayer-" + dataStreamList.get(i).getTransformation().getName(), dataStreamList2.get(i).getType(), new ReplayOperator()).setParallelism(dataStreamList2.get(i).getParallelism()));
            } else {
                arrayList.add(dataStreamList2.get(i));
            }
        }
        return new DataStreamList(arrayList);
    }

    private static DataStreamList addCriteriaStream(DataStream<?> dataStream, IterationID iterationID, StreamExecutionEnvironment streamExecutionEnvironment, DraftExecutionEnvironment draftExecutionEnvironment, DataStreamList dataStreamList, DataStreamList dataStreamList2, int i) {
        DataStream actualStream = draftExecutionEnvironment.getActualStream(dataStream.getId());
        Preconditions.checkState(actualStream.getType().getClass().equals(IterationRecordTypeInfo.class), "The termination criteria should always return IterationRecord.");
        TypeInformation innerTypeInfo = ((IterationRecordTypeInfo) actualStream.getType()).getInnerTypeInfo();
        DataStreamList of = DataStreamList.of(streamExecutionEnvironment.addSource(new DraftExecutionEnvironment.EmptySource()).returns(innerTypeInfo).name(actualStream.getTransformation().getName()).setParallelism(actualStream.getParallelism()));
        DataStreamList addHeads = addHeads(of, addInputs(of), iterationID, i, true, dataStreamList.size());
        DataStreamList addTails = addTails(DataStreamList.of(mergeCriteriaHeadAndCriteriaStream(streamExecutionEnvironment, addHeads.get(0), actualStream, innerTypeInfo)), iterationID, dataStreamList.size());
        String str = "co-" + iterationID.toHexString() + "-cri";
        addHeads.get(0).getTransformation().setCoLocationGroupKey(str);
        addTails.get(0).getTransformation().setCoLocationGroupKey(str);
        setCriteriaParallelism(dataStreamList2, actualStream.getParallelism());
        setCriteriaParallelism(addHeads, actualStream.getParallelism());
        return addTails;
    }

    private static DataStream<?> mergeCriteriaHeadAndCriteriaStream(StreamExecutionEnvironment streamExecutionEnvironment, DataStream<?> dataStream, DataStream<?> dataStream2, TypeInformation<?> typeInformation) {
        DraftExecutionEnvironment draftExecutionEnvironment = new DraftExecutionEnvironment(streamExecutionEnvironment, new AllRoundOperatorWrapper());
        SingleOutputStreamOperator name = draftExecutionEnvironment.addDraftSource(dataStream, typeInformation).connect(draftExecutionEnvironment.addDraftSource(dataStream2, typeInformation)).process(new CriteriaMergeProcessor()).returns(typeInformation).setParallelism(dataStream2.getParallelism() > 0 ? dataStream2.getParallelism() : streamExecutionEnvironment.getConfig().getParallelism()).name("criteria-merge");
        draftExecutionEnvironment.copyToActualEnvironment();
        return draftExecutionEnvironment.getActualStream(name.getId());
    }

    private static DataStream<Object> unionAllTails(StreamExecutionEnvironment streamExecutionEnvironment, DataStreamList dataStreamList) {
        return (DataStream) map(dataStreamList, dataStream -> {
            return dataStream.filter(obj -> {
                return false;
            }).name("filter-tail").returns(new GenericTypeInfo(Object.class)).setParallelism(dataStream.getParallelism() > 0 ? dataStream.getParallelism() : streamExecutionEnvironment.getConfig().getParallelism());
        }).stream().reduce((obj, dataStream2) -> {
            return ((DataStream) obj).union(new DataStream[]{dataStream2});
        }).get();
    }

    private static List<TypeInformation<?>> getTypeInfos(DataStreamList dataStreamList) {
        return map(dataStreamList, (v0) -> {
            return v0.getType();
        });
    }

    private static DataStreamList addInputs(DataStreamList dataStreamList) {
        return new DataStreamList(map(dataStreamList, dataStream -> {
            return dataStream.transform("input-" + dataStream.getTransformation().getName(), new IterationRecordTypeInfo(dataStream.getType()), new InputOperator()).setParallelism(dataStream.getParallelism());
        }));
    }

    private static DataStreamList addHeads(DataStreamList dataStreamList, DataStreamList dataStreamList2, IterationID iterationID, int i, boolean z, int i2) {
        return new DataStreamList(map(dataStreamList2, (num, dataStream) -> {
            return ((SingleOutputStreamOperator) dataStream).transform("head-" + dataStreamList.get(num.intValue()).getTransformation().getName(), (IterationRecordTypeInfo) dataStream.getType(), new HeadOperatorFactory(iterationID, i2 + num.intValue(), z, i)).setParallelism(dataStream.getParallelism());
        }));
    }

    private static DataStreamList addTails(DataStreamList dataStreamList, IterationID iterationID, int i) {
        return new DataStreamList(map(dataStreamList, (num, dataStream) -> {
            return dataStream.transform("tail-" + dataStream.getTransformation().getName(), new IterationRecordTypeInfo(dataStream.getType()), new TailOperator(iterationID, i + num.intValue())).setParallelism(dataStream.getParallelism());
        }));
    }

    private static DataStreamList addOutputs(DataStreamList dataStreamList, DataStream dataStream) {
        return new DataStreamList(map(dataStreamList, (num, dataStream2) -> {
            IterationRecordTypeInfo iterationRecordTypeInfo = (IterationRecordTypeInfo) dataStream2.getType();
            return dataStream2.union(new DataStream[]{dataStream.map(obj -> {
                return obj;
            }).name("tail-map-" + dataStream2.getTransformation().getName()).returns(iterationRecordTypeInfo).setParallelism(1)}).transform("output-" + dataStream2.getTransformation().getName(), iterationRecordTypeInfo.getInnerTypeInfo(), new OutputOperator()).setParallelism(dataStream2.getParallelism());
        }));
    }

    private static DataStreamList addDraftSources(DataStreamList dataStreamList, DraftExecutionEnvironment draftExecutionEnvironment, List<TypeInformation<?>> list) {
        return new DataStreamList(map(dataStreamList, (num, dataStream) -> {
            return draftExecutionEnvironment.addDraftSource(dataStream, (TypeInformation) list.get(num.intValue()));
        }));
    }

    private static void ensuresTransformationAdded(DataStreamList dataStreamList, DraftExecutionEnvironment draftExecutionEnvironment) {
        map(dataStreamList, dataStream -> {
            draftExecutionEnvironment.addOperatorIfNotExists(dataStream.getTransformation());
            return null;
        });
    }

    private static void setCriteriaParallelism(DataStreamList dataStreamList, int i) {
        map(dataStreamList, dataStream -> {
            dataStream.getTransformation().getOperatorFactory().setCriteriaStreamParallelism(i);
            return null;
        });
    }

    private static DataStreamList getActualDataStreams(DataStreamList dataStreamList, DraftExecutionEnvironment draftExecutionEnvironment) {
        return new DataStreamList(map(dataStreamList, dataStream -> {
            return draftExecutionEnvironment.getActualStream(dataStream.getId());
        }));
    }

    private static <R> List<R> map(DataStreamList dataStreamList, Function<DataStream<?>, R> function) {
        return map(dataStreamList, (num, dataStream) -> {
            return function.apply(dataStream);
        });
    }

    private static <R> List<R> map(DataStreamList dataStreamList, BiFunction<Integer, DataStream<?>, R> biFunction) {
        ArrayList arrayList = new ArrayList(dataStreamList.size());
        for (int i = 0; i < dataStreamList.size(); i++) {
            arrayList.add(biFunction.apply(Integer.valueOf(i), dataStreamList.get(i)));
        }
        return arrayList;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 751028190:
                if (implMethodName.equals("lambda$null$79e42a0a$1")) {
                    z = false;
                    break;
                }
                break;
            case 816510597:
                if (implMethodName.equals("lambda$null$f3fdc132$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/iteration/Iterations") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    return obj -> {
                        return obj;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("filter") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/flink/iteration/Iterations") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    return obj2 -> {
                        return false;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
