/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.hadoopcompatibility.mapred.example;

import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReducerWrappedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class HadoopMapredCompatWordCount {
    public static JobExecutionResult run(String[] args) throws Exception {
        if (args.length < 2) {
            System.err.println("Usage: WordCount <input path> <result path>");
            return null;
        }
        String inputPath = args[0];
        String outputPath = args[1];
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        HadoopInputFormat hadoopInputFormat = new HadoopInputFormat((InputFormat)new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
        TextInputFormat.addInputPath((JobConf)hadoopInputFormat.getJobConf(), (Path)new Path(inputPath));
        DataStreamSource text = env.createInput((org.apache.flink.api.common.io.InputFormat)hadoopInputFormat);
        SingleOutputStreamOperator words = text.flatMap((FlatMapFunction)new HadoopMapFunction((Mapper)new Tokenizer())).keyBy((KeySelector & Serializable)x -> (Text)x.f0).window((WindowAssigner)GlobalWindows.createWithEndOfStreamTrigger()).apply((WindowFunction)new HadoopReducerWrappedFunction((Reducer)new Counter()));
        HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat((OutputFormat)new TextOutputFormat(), new JobConf());
        hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
        TextOutputFormat.setOutputPath((JobConf)hadoopOutputFormat.getJobConf(), (Path)new Path(outputPath));
        words.addSink((SinkFunction)new OutputFormatSinkFunction((org.apache.flink.api.common.io.OutputFormat)hadoopOutputFormat)).setParallelism(1);
        return env.execute("Hadoop Compat WordCount");
    }

    public static final class Counter
    implements Reducer<Text, LongWritable, Text, LongWritable> {
        public void reduce(Text k, Iterator<LongWritable> vs, OutputCollector<Text, LongWritable> out, Reporter rep) throws IOException {
            long cnt = 0L;
            while (vs.hasNext()) {
                cnt += vs.next().get();
            }
            out.collect((Object)k, (Object)new LongWritable(cnt));
        }

        public void configure(JobConf arg0) {
        }

        public void close() throws IOException {
        }
    }

    public static final class Tokenizer
    implements Mapper<LongWritable, Text, Text, LongWritable> {
        public void map(LongWritable k, Text v, OutputCollector<Text, LongWritable> out, Reporter rep) throws IOException {
            String[] tokens;
            String line = v.toString();
            for (String token : tokens = line.toLowerCase().split("\\W+")) {
                if (token.length() <= 0) continue;
                out.collect((Object)new Text(token), (Object)new LongWritable(1L));
            }
        }

        public void configure(JobConf arg0) {
        }

        public void close() throws IOException {
        }
    }
}

