package org.apache.flink.test.accumulators;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.Histogram;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.Collector;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorITCase.class */
public class AccumulatorITCase extends JavaProgramTestBase {
    private static final String INPUT = "one\ntwo two\nthree three three\n";
    private static final String EXPECTED = "one 1\ntwo 2\nthree 3\n";
    private String dataPath;
    private String resultPath;
    private JobExecutionResult result;

    /* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorITCase$CountWords.class */
    private static class CountWords extends RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> implements GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
        private IntCounter reduceCalls;
        private IntCounter combineCalls;

        private CountWords() {
        }

        public void open(Configuration configuration) {
            this.reduceCalls = getRuntimeContext().getIntCounter("reduce-calls");
            this.combineCalls = getRuntimeContext().getIntCounter("combine-calls");
        }

        public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) {
            this.reduceCalls.add(1);
            reduceInternal(iterable, collector);
        }

        public void combine(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) {
            this.combineCalls.add(1);
            reduceInternal(iterable, collector);
        }

        private void reduceInternal(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) {
            int i = 0;
            String str = null;
            for (Tuple2<String, Integer> tuple2 : iterable) {
                str = (String) tuple2.f0;
                i += ((Integer) tuple2.f1).intValue();
            }
            collector.collect(new Tuple2(str, Integer.valueOf(i)));
        }
    }

    /* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorITCase$SetAccumulator.class */
    public static class SetAccumulator<T> implements Accumulator<T, HashSet<T>> {
        private static final long serialVersionUID = 1;
        private HashSet<T> set = new HashSet<>();

        public void add(T t) {
            this.set.add(t);
        }

        /* renamed from: getLocalValue, reason: merged with bridge method [inline-methods] */
        public HashSet<T> m730getLocalValue() {
            return this.set;
        }

        public void resetLocal() {
            this.set.clear();
        }

        public void merge(Accumulator<T, HashSet<T>> accumulator) {
            this.set.addAll((Collection) accumulator.getLocalValue());
        }

        /* renamed from: clone, reason: merged with bridge method [inline-methods] */
        public Accumulator<T, HashSet<T>> m729clone() {
            SetAccumulator setAccumulator = new SetAccumulator();
            setAccumulator.set.addAll(this.set);
            return setAccumulator;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/accumulators/AccumulatorITCase$TokenizeLine.class */
    private static class TokenizeLine extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
        private IntCounter cntNumLines;
        private Histogram wordsPerLineDistribution;
        private DoubleCounter openCloseCounter;
        private SetAccumulator<StringValue> distinctWords;

        private TokenizeLine() {
            this.openCloseCounter = new DoubleCounter();
        }

        public void open(Configuration configuration) {
            this.cntNumLines = getRuntimeContext().getIntCounter("num-lines");
            this.wordsPerLineDistribution = getRuntimeContext().getHistogram("words-per-line");
            getRuntimeContext().addAccumulator("open-close-counter", this.openCloseCounter);
            this.distinctWords = new SetAccumulator<>();
            getRuntimeContext().addAccumulator("distinct-words", this.distinctWords);
            IntCounter intCounter = getRuntimeContext().getIntCounter("simple-counter");
            intCounter.add(1);
            Assert.assertEquals(intCounter.getLocalValue().intValue(), 1L);
            Assert.assertEquals(intCounter.getLocalValue(), getRuntimeContext().getIntCounter("simple-counter").getLocalValue());
            try {
                getRuntimeContext().getDoubleCounter("simple-counter");
                Assert.fail("Should not be able to obtain previously created counter with different type");
            } catch (UnsupportedOperationException e) {
            }
            this.openCloseCounter.add(0.5d);
        }

        public void flatMap(String str, Collector<Tuple2<String, Integer>> collector) {
            this.cntNumLines.add(1);
            int i = 0;
            for (String str2 : str.toLowerCase().split("\\W+")) {
                this.distinctWords.add(new StringValue(str2));
                collector.collect(new Tuple2(str2, 1));
                i++;
            }
            this.wordsPerLineDistribution.add(Integer.valueOf(i));
        }

        public void close() throws Exception {
            this.openCloseCounter.add(0.5d);
            Assert.assertEquals(1L, this.openCloseCounter.getLocalValue().intValue());
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((String) obj, (Collector<Tuple2<String, Integer>>) collector);
        }
    }

    protected void preSubmit() throws Exception {
        this.dataPath = createTempFile("datapoints.txt", INPUT);
        this.resultPath = getTempFilePath("result");
    }

    protected void postSubmit() throws Exception {
        TestBaseUtils.compareResultsByLinesInMemory(EXPECTED, this.resultPath);
        System.out.println("Accumulator results:");
        JobExecutionResult jobExecutionResult = this.result;
        System.out.println(AccumulatorHelper.getResultsFormatted(jobExecutionResult.getAllAccumulatorResults()));
        Assert.assertEquals(3, jobExecutionResult.getAccumulatorResult("num-lines"));
        Assert.assertEquals(3, jobExecutionResult.getIntCounterResult("num-lines"));
        Assert.assertEquals(Double.valueOf(getParallelism()), jobExecutionResult.getAccumulatorResult("open-close-counter"));
        HashMap hashMap = new HashMap();
        hashMap.put(1, 1);
        hashMap.put(2, 1);
        hashMap.put(3, 1);
        Assert.assertEquals(hashMap, jobExecutionResult.getAccumulatorResult("words-per-line"));
        HashSet hashSet = new HashSet();
        hashSet.add(new StringValue("one"));
        hashSet.add(new StringValue("two"));
        hashSet.add(new StringValue("three"));
        Assert.assertEquals(hashSet, jobExecutionResult.getAccumulatorResult("distinct-words"));
    }

    protected void testProgram() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readTextFile(this.dataPath).flatMap(new TokenizeLine()).groupBy(new int[]{0}).reduceGroup(new CountWords()).writeAsCsv(this.resultPath, "\n", " ");
        this.result = executionEnvironment.execute();
    }
}
