/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.hadoopcompatibility.mapred;

import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReducerWrappedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.test.hadoopcompatibility.mapred.HadoopTestData;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={ParameterizedTestExtension.class})
class HadoopReduceFunctionITCase
extends MultipleProgramsTestBase {
    HadoopReduceFunctionITCase() {
    }

    @TestTemplate
    void testStandardGrouping(@TempDir java.nio.file.Path tempFolder) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        SingleOutputStreamOperator ds = HadoopTestData.getKVPairDataStream(env).map((MapFunction)new Mapper1());
        SingleOutputStreamOperator commentCnts = ds.keyBy((KeySelector & Serializable)x -> (IntWritable)x.f0).window((WindowAssigner)GlobalWindows.createWithEndOfStreamTrigger()).apply((WindowFunction)new HadoopReducerWrappedFunction((Reducer)new CommentCntReducer()));
        String resultPath = tempFolder.toUri().toString();
        commentCnts.sinkTo((Sink)FileSink.forRowFormat((Path)new Path(resultPath), (Encoder)new SimpleStringEncoder()).build());
        env.execute();
        String expected = "(0,0)\n(1,3)\n(2,5)\n(3,5)\n(4,2)\n";
        TestBaseUtils.compareResultsByLinesInMemory((String)expected, (String)resultPath);
    }

    @TestTemplate
    void testUngroupedHadoopReducer(@TempDir java.nio.file.Path tempFolder) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataStream(env);
        SingleOutputStreamOperator commentCnts = ds.windowAll((WindowAssigner)GlobalWindows.createWithEndOfStreamTrigger()).apply((AllWindowFunction)new HadoopReducerWrappedFunction((Reducer)new AllCommentCntReducer()));
        String resultPath = tempFolder.toUri().toString();
        commentCnts.sinkTo((Sink)FileSink.forRowFormat((Path)new Path(resultPath), (Encoder)new SimpleStringEncoder()).build());
        env.execute();
        String expected = "(42,15)\n";
        TestBaseUtils.compareResultsByLinesInMemory((String)expected, (String)resultPath);
    }

    @TestTemplate
    void testConfigurationViaJobConf(@TempDir java.nio.file.Path tempFolder) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        JobConf conf = new JobConf();
        conf.set("my.cntPrefix", "Hello");
        SingleOutputStreamOperator ds = HadoopTestData.getKVPairDataStream(env).map((MapFunction)new Mapper2());
        SingleOutputStreamOperator helloCnts = ds.keyBy((KeySelector & Serializable)x -> (IntWritable)x.f0).window((WindowAssigner)GlobalWindows.createWithEndOfStreamTrigger()).apply((WindowFunction)new HadoopReducerWrappedFunction((Reducer)new ConfigurableCntReducer(), conf));
        String resultPath = tempFolder.toUri().toString();
        helloCnts.sinkTo((Sink)FileSink.forRowFormat((Path)new Path(resultPath), (Encoder)new SimpleStringEncoder()).build());
        env.execute();
        String expected = "(0,0)\n(1,0)\n(2,1)\n(3,1)\n(4,1)\n";
        TestBaseUtils.compareResultsByLinesInMemory((String)expected, (String)resultPath);
    }

    public static class Mapper2
    implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> {
        private static final long serialVersionUID = 1L;

        public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v) throws Exception {
            v.f0 = new IntWritable(((IntWritable)v.f0).get() % 5);
            return v;
        }
    }

    public static class Mapper1
    implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> {
        private static final long serialVersionUID = 1L;

        public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v) throws Exception {
            v.f0 = new IntWritable(((IntWritable)v.f0).get() / 5);
            return v;
        }
    }

    public static class ConfigurableCntReducer
    implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
        private String countPrefix;

        public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) throws IOException {
            int commentCnt = 0;
            while (vs.hasNext()) {
                String v = vs.next().toString();
                if (!v.startsWith(this.countPrefix)) continue;
                ++commentCnt;
            }
            out.collect((Object)k, (Object)new IntWritable(commentCnt));
        }

        public void configure(JobConf c) {
            this.countPrefix = c.get("my.cntPrefix");
        }

        public void close() throws IOException {
        }
    }

    public static class AllCommentCntReducer
    implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
        public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) throws IOException {
            int commentCnt = 0;
            while (vs.hasNext()) {
                String v = vs.next().toString();
                if (!v.startsWith("Comment")) continue;
                ++commentCnt;
            }
            out.collect((Object)new IntWritable(42), (Object)new IntWritable(commentCnt));
        }

        public void configure(JobConf arg0) {
        }

        public void close() throws IOException {
        }
    }

    public static class CommentCntReducer
    implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
        public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) throws IOException {
            int commentCnt = 0;
            while (vs.hasNext()) {
                String v = vs.next().toString();
                if (!v.startsWith("Comment")) continue;
                ++commentCnt;
            }
            out.collect((Object)k, (Object)new IntWritable(commentCnt));
        }

        public void configure(JobConf arg0) {
        }

        public void close() throws IOException {
        }
    }
}

