package org.apache.flink.test.iterative;

import java.io.BufferedReader;
import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.test.iterative.ConnectedComponentsITCase;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.class */
public class ConnectedComponentsWithDeferredUpdateITCase extends JavaProgramTestBase {
    private static final long SEED = 3287269182979823L;
    private static final int NUM_VERTICES = 1000;
    private static final int NUM_EDGES = 10000;
    private final boolean extraMapper;
    protected String verticesPath;
    protected String edgesPath;
    protected String resultPath;

    /* loaded from: input_file:org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase$UpdateComponentIdMatchNonPreserving.class */
    private static final class UpdateComponentIdMatchNonPreserving implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1;

        private UpdateComponentIdMatchNonPreserving() {
        }

        public void join(Tuple2<Long, Long> tuple2, Tuple2<Long, Long> tuple22, Collector<Tuple2<Long, Long>> collector) throws Exception {
            if (((Long) tuple2.f1).longValue() < ((Long) tuple22.f1).longValue()) {
                collector.collect(tuple2);
            }
        }

        public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
            join((Tuple2<Long, Long>) obj, (Tuple2<Long, Long>) obj2, (Collector<Tuple2<Long, Long>>) collector);
        }
    }

    public ConnectedComponentsWithDeferredUpdateITCase(boolean z) {
        this.extraMapper = z;
    }

    protected void preSubmit() throws Exception {
        this.verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
        this.edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
        this.resultPath = getTempFilePath("results");
    }

    protected void testProgram() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        DataSource types = executionEnvironment.readCsvFile(this.verticesPath).types(Long.class);
        FlatMapOperator flatMap = executionEnvironment.readCsvFile(this.edgesPath).fieldDelimiter(" ").types(Long.class, Long.class).flatMap(new ConnectedComponents.UndirectEdge());
        MapOperator map = types.map(new ConnectedComponentsITCase.DuplicateValue());
        DeltaIteration iterateDelta = map.iterateDelta(map, 100, new int[]{0});
        MapOperator with = iterateDelta.getWorkset().join(flatMap).where(new int[]{0}).equalTo(new int[]{0}).with(new ConnectedComponents.NeighborWithComponentIDJoin()).groupBy(new int[]{0}).aggregate(Aggregations.MIN, 1).join(iterateDelta.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with(new UpdateComponentIdMatchNonPreserving());
        iterateDelta.closeWith(this.extraMapper ? with.map(new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { // from class: org.apache.flink.test.iterative.ConnectedComponentsWithDeferredUpdateITCase.1
            private static final long serialVersionUID = -3929364091829757322L;

            public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple2) throws Exception {
                return tuple2;
            }
        }) : with, with).writeAsCsv(this.resultPath, "\n", " ");
        executionEnvironment.execute("Connected Components Example");
    }

    protected void postSubmit() throws Exception {
        for (BufferedReader bufferedReader : getResultReader(this.resultPath)) {
            ConnectedComponentsData.checkOddEvenResult(bufferedReader);
        }
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() {
        return Arrays.asList(new Object[]{false}, new Object[]{true});
    }
}
