/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.hadoopcompatibility.mapred;

import java.io.IOException;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.hadoopcompatibility.mapred.HadoopTestData;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={ParameterizedTestExtension.class})
class HadoopMapFunctionITCase
extends MultipleProgramsTestBase {
    HadoopMapFunctionITCase() {
    }

    @TestTemplate
    void testNonPassingMapper(@TempDir java.nio.file.Path tempFolder) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataStream(env);
        SingleOutputStreamOperator nonPassingFlatMapDs = ds.flatMap((FlatMapFunction)new HadoopMapFunction((Mapper)new NonPassingMapper()));
        String resultPath = tempFolder.toUri().toString();
        nonPassingFlatMapDs.sinkTo((Sink)FileSink.forRowFormat((Path)new Path(resultPath), (Encoder)new SimpleStringEncoder()).build());
        env.execute();
        TestBaseUtils.compareResultsByLinesInMemory((String)"\n", (String)resultPath);
    }

    @TestTemplate
    void testDataDuplicatingMapper(@TempDir java.nio.file.Path tempFolder) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataStream(env);
        SingleOutputStreamOperator duplicatingFlatMapDs = ds.flatMap((FlatMapFunction)new HadoopMapFunction((Mapper)new DuplicatingMapper()));
        String resultPath = tempFolder.toUri().toString();
        duplicatingFlatMapDs.sinkTo((Sink)FileSink.forRowFormat((Path)new Path(resultPath), (Encoder)new SimpleStringEncoder()).build());
        env.execute();
        String expected = "(1,Hi)\n(1,HI)\n(2,Hello)\n(2,HELLO)\n(3,Hello world)\n(3,HELLO WORLD)\n(4,Hello world, how are you?)\n(4,HELLO WORLD, HOW ARE YOU?)\n(5,I am fine.)\n(5,I AM FINE.)\n(6,Luke Skywalker)\n(6,LUKE SKYWALKER)\n(7,Comment#1)\n(7,COMMENT#1)\n(8,Comment#2)\n(8,COMMENT#2)\n(9,Comment#3)\n(9,COMMENT#3)\n(10,Comment#4)\n(10,COMMENT#4)\n(11,Comment#5)\n(11,COMMENT#5)\n(12,Comment#6)\n(12,COMMENT#6)\n(13,Comment#7)\n(13,COMMENT#7)\n(14,Comment#8)\n(14,COMMENT#8)\n(15,Comment#9)\n(15,COMMENT#9)\n(16,Comment#10)\n(16,COMMENT#10)\n(17,Comment#11)\n(17,COMMENT#11)\n(18,Comment#12)\n(18,COMMENT#12)\n(19,Comment#13)\n(19,COMMENT#13)\n(20,Comment#14)\n(20,COMMENT#14)\n(21,Comment#15)\n(21,COMMENT#15)\n";
        TestBaseUtils.compareResultsByLinesInMemory((String)expected, (String)resultPath);
    }

    @TestTemplate
    void testConfigurableMapper(@TempDir java.nio.file.Path tempFolder) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        JobConf conf = new JobConf();
        conf.set("my.filterPrefix", "Hello");
        DataStream<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataStream(env);
        SingleOutputStreamOperator hellos = ds.flatMap((FlatMapFunction)new HadoopMapFunction((Mapper)new ConfigurableMapper(), conf));
        String resultPath = tempFolder.toUri().toString();
        hellos.sinkTo((Sink)FileSink.forRowFormat((Path)new Path(resultPath), (Encoder)new SimpleStringEncoder()).build());
        env.execute();
        String expected = "(2,Hello)\n(3,Hello world)\n(4,Hello world, how are you?)\n";
        TestBaseUtils.compareResultsByLinesInMemory((String)expected, (String)resultPath);
    }

    public static class ConfigurableMapper
    implements Mapper<IntWritable, Text, IntWritable, Text> {
        private String filterPrefix;

        public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r) throws IOException {
            if (v.toString().startsWith(this.filterPrefix)) {
                out.collect((Object)k, (Object)v);
            }
        }

        public void configure(JobConf c) {
            this.filterPrefix = c.get("my.filterPrefix");
        }

        public void close() throws IOException {
        }
    }

    public static class DuplicatingMapper
    implements Mapper<IntWritable, Text, IntWritable, Text> {
        public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r) throws IOException {
            out.collect((Object)k, (Object)v);
            out.collect((Object)k, (Object)new Text(v.toString().toUpperCase()));
        }

        public void configure(JobConf arg0) {
        }

        public void close() throws IOException {
        }
    }

    public static class NonPassingMapper
    implements Mapper<IntWritable, Text, IntWritable, Text> {
        public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r) throws IOException {
            if (v.toString().contains("bananas")) {
                out.collect((Object)k, (Object)v);
            }
        }

        public void configure(JobConf arg0) {
        }

        public void close() throws IOException {
        }
    }
}

