package org.apache.flink.test.iterative;

import java.io.IOException;
import java.util.Iterator;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/iterative/DanglingPageRankITCase.class */
public class DanglingPageRankITCase extends MultipleProgramsTestBase {
    private static final String AGGREGATOR_NAME = "pagerank.aggregator";

    /* loaded from: input_file:org/apache/flink/test/iterative/DanglingPageRankITCase$DiffL1NormConvergenceCriterion.class */
    private static class DiffL1NormConvergenceCriterion implements ConvergenceCriterion<PageRankStats> {
        private static final double EPSILON = 5.0E-5d;

        private DiffL1NormConvergenceCriterion() {
        }

        public boolean isConverged(int i, PageRankStats pageRankStats) {
            return pageRankStats.diff() < EPSILON;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/DanglingPageRankITCase$PageRankStats.class */
    public static class PageRankStats implements Value {
        private double diff;
        private double rank;
        private double danglingRank;
        private long numDanglingVertices;
        private long numVertices;
        private long edges;

        public PageRankStats() {
        }

        public PageRankStats(double d, double d2, double d3, long j, long j2, long j3) {
            this.diff = d;
            this.rank = d2;
            this.danglingRank = d3;
            this.numDanglingVertices = j;
            this.numVertices = j2;
            this.edges = j3;
        }

        public double diff() {
            return this.diff;
        }

        public double rank() {
            return this.rank;
        }

        public double danglingRank() {
            return this.danglingRank;
        }

        public long numDanglingVertices() {
            return this.numDanglingVertices;
        }

        public long numVertices() {
            return this.numVertices;
        }

        public long edges() {
            return this.edges;
        }

        public void write(DataOutputView dataOutputView) throws IOException {
            dataOutputView.writeDouble(this.diff);
            dataOutputView.writeDouble(this.rank);
            dataOutputView.writeDouble(this.danglingRank);
            dataOutputView.writeLong(this.numDanglingVertices);
            dataOutputView.writeLong(this.numVertices);
            dataOutputView.writeLong(this.edges);
        }

        public void read(DataInputView dataInputView) throws IOException {
            this.diff = dataInputView.readDouble();
            this.rank = dataInputView.readDouble();
            this.danglingRank = dataInputView.readDouble();
            this.numDanglingVertices = dataInputView.readLong();
            this.numVertices = dataInputView.readLong();
            this.edges = dataInputView.readLong();
        }

        public String toString() {
            return "PageRankStats: diff [" + this.diff + "], rank [" + this.rank + "], danglingRank [" + this.danglingRank + "], numDanglingVertices [" + this.numDanglingVertices + "], numVertices [" + this.numVertices + "], edges [" + this.edges + "]";
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/DanglingPageRankITCase$PageRankStatsAggregator.class */
    private static class PageRankStatsAggregator implements Aggregator<PageRankStats> {
        private double diff;
        private double rank;
        private double danglingRank;
        private long numDanglingVertices;
        private long numVertices;
        private long edges;

        private PageRankStatsAggregator() {
        }

        /* renamed from: getAggregate, reason: merged with bridge method [inline-methods] */
        public PageRankStats m856getAggregate() {
            return new PageRankStats(this.diff, this.rank, this.danglingRank, this.numDanglingVertices, this.numVertices, this.edges);
        }

        public void aggregate(double d, double d2, double d3, long j, long j2, long j3) {
            this.diff += d;
            this.rank += d2;
            this.danglingRank += d3;
            this.numDanglingVertices += j;
            this.numVertices += j2;
            this.edges += j3;
        }

        public void aggregate(PageRankStats pageRankStats) {
            this.diff += pageRankStats.diff();
            this.rank += pageRankStats.rank();
            this.danglingRank += pageRankStats.danglingRank();
            this.numDanglingVertices += pageRankStats.numDanglingVertices();
            this.numVertices += pageRankStats.numVertices();
            this.edges += pageRankStats.edges();
        }

        public void reset() {
            this.diff = 0.0d;
            this.rank = 0.0d;
            this.danglingRank = 0.0d;
            this.numDanglingVertices = 0L;
            this.numVertices = 0L;
            this.edges = 0L;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/DanglingPageRankITCase$PageWithLinks.class */
    public static class PageWithLinks {
        public long pageId;
        public long[] targets;

        public PageWithLinks() {
        }

        public PageWithLinks(long j, long[] jArr) {
            this.pageId = j;
            this.targets = jArr;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/DanglingPageRankITCase$PageWithRank.class */
    public static class PageWithRank {
        public long pageId;
        public double rank;

        public PageWithRank() {
        }

        public PageWithRank(long j, double d) {
            this.pageId = j;
            this.rank = d;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/DanglingPageRankITCase$PageWithRankAndDangling.class */
    public static class PageWithRankAndDangling {
        public long pageId;
        public double rank;
        public boolean dangling;

        public PageWithRankAndDangling() {
        }

        public PageWithRankAndDangling(long j, double d, boolean z) {
            this.pageId = j;
            this.rank = d;
            this.dangling = z;
        }

        public String toString() {
            return "PageWithRankAndDangling{pageId=" + this.pageId + ", rank=" + this.rank + ", dangling=" + this.dangling + '}';
        }
    }

    public DanglingPageRankITCase(MultipleProgramsTestBase.TestExecutionMode testExecutionMode) {
        super(testExecutionMode);
    }

    @Test
    public void testDanglingPageRank() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            DataSource fromElements = executionEnvironment.fromElements(new Tuple2[]{new Tuple2(1L, false), new Tuple2(2L, false), new Tuple2(5L, false), new Tuple2(3L, true), new Tuple2(4L, false)});
            DataSource fromElements2 = executionEnvironment.fromElements(new PageWithLinks[]{new PageWithLinks(2L, new long[]{1}), new PageWithLinks(5L, new long[]{2, 4}), new PageWithLinks(4L, new long[]{3, 2}), new PageWithLinks(1L, new long[]{4, 2, 3})});
            final long count = fromElements.count();
            final long count2 = fromElements.filter(new FilterFunction<Tuple2<Long, Boolean>>() { // from class: org.apache.flink.test.iterative.DanglingPageRankITCase.1
                public boolean filter(Tuple2<Long, Boolean> tuple2) {
                    return ((Boolean) tuple2.f1).booleanValue();
                }
            }).count();
            IterativeDataSet iterate = fromElements.map(new MapFunction<Tuple2<Long, Boolean>, PageWithRankAndDangling>() { // from class: org.apache.flink.test.iterative.DanglingPageRankITCase.2
                public PageWithRankAndDangling map(Tuple2<Long, Boolean> tuple2) {
                    return new PageWithRankAndDangling(((Long) tuple2.f0).longValue(), 1.0d / count, ((Boolean) tuple2.f1).booleanValue());
                }
            }).iterate(25);
            iterate.getAggregators().registerAggregationConvergenceCriterion(AGGREGATOR_NAME, new PageRankStatsAggregator(), new DiffL1NormConvergenceCriterion());
            double d = 0.0d;
            for (PageWithRankAndDangling pageWithRankAndDangling : iterate.closeWith(iterate.coGroup(iterate.join(fromElements2).where(new String[]{"pageId"}).equalTo(new String[]{"pageId"}).with(new FlatJoinFunction<PageWithRankAndDangling, PageWithLinks, PageWithRank>() { // from class: org.apache.flink.test.iterative.DanglingPageRankITCase.3
                public void join(PageWithRankAndDangling pageWithRankAndDangling2, PageWithLinks pageWithLinks, Collector<PageWithRank> collector) {
                    PageWithRank pageWithRank = new PageWithRank(0L, pageWithRankAndDangling2.rank / pageWithLinks.targets.length);
                    for (long j : pageWithLinks.targets) {
                        pageWithRank.pageId = j;
                        collector.collect(pageWithRank);
                    }
                }

                public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
                    join((PageWithRankAndDangling) obj, (PageWithLinks) obj2, (Collector<PageWithRank>) collector);
                }
            })).where(new String[]{"pageId"}).equalTo(new String[]{"pageId"}).with(new RichCoGroupFunction<PageWithRankAndDangling, PageWithRank, PageWithRankAndDangling>() { // from class: org.apache.flink.test.iterative.DanglingPageRankITCase.4
                private static final double BETA = 0.85d;
                private final double randomJump;
                private PageRankStatsAggregator aggregator;
                private double danglingRankFactor;

                {
                    this.randomJump = 0.15000000000000002d / count;
                }

                public void open(Configuration configuration) throws Exception {
                    int superstepNumber = getIterationRuntimeContext().getSuperstepNumber();
                    this.aggregator = (PageRankStatsAggregator) getIterationRuntimeContext().getIterationAggregator(DanglingPageRankITCase.AGGREGATOR_NAME);
                    if (superstepNumber == 1) {
                        this.danglingRankFactor = (BETA * count2) / (count * count);
                    } else {
                        this.danglingRankFactor = (BETA * ((PageRankStats) getIterationRuntimeContext().getPreviousIterationAggregate(DanglingPageRankITCase.AGGREGATOR_NAME)).danglingRank()) / count;
                    }
                }

                public void coGroup(Iterable<PageWithRankAndDangling> iterable, Iterable<PageWithRank> iterable2, Collector<PageWithRankAndDangling> collector) {
                    long j = 0;
                    double d2 = 0.0d;
                    Iterator<PageWithRank> it = iterable2.iterator();
                    while (it.hasNext()) {
                        d2 += it.next().rank;
                        j++;
                    }
                    double d3 = (BETA * d2) + this.randomJump + this.danglingRankFactor;
                    PageWithRankAndDangling next = iterable.iterator().next();
                    double d4 = next.rank;
                    boolean z = next.dangling;
                    this.aggregator.aggregate(Math.abs(d4 - d3), d3, z ? d3 : 0.0d, z ? 1L : 0L, 1L, j);
                    next.rank = d3;
                    collector.collect(next);
                }
            })).collect()) {
                d += pageWithRankAndDangling.rank;
                Assert.assertTrue(pageWithRankAndDangling.pageId >= 1 && pageWithRankAndDangling.pageId <= 5);
                Assert.assertTrue(pageWithRankAndDangling.pageId != 3 || pageWithRankAndDangling.dangling);
            }
            Assert.assertEquals(1.0d, d, 0.001d);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
