/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.hadoopcompatibility.mapred;

import java.io.Closeable;
import java.io.File;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.OperatingSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
@Tag(value="org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler")
public class HadoopIOFormatsITCase
extends JavaProgramTestBase {
    private static final int NUM_PROGRAMS = 2;
    @Parameter
    private int curProgId;
    private String[] resultPath;
    private String[] expectedResult;
    private String sequenceFileInPath;
    private String sequenceFileInPathNull;

    @BeforeEach
    void checkOperatingSystem() {
        ((AbstractBooleanAssert)Assumptions.assumeThat((boolean)OperatingSystem.isWindows()).as("This test can't run successfully on Windows.", new Object[0])).isFalse();
    }

    @TestTemplate
    public void testJobWithObjectReuse() throws Exception {
        super.testJobWithoutObjectReuse();
    }

    @TestTemplate
    public void testJobWithoutObjectReuse() throws Exception {
        super.testJobWithoutObjectReuse();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void preSubmit() throws Exception {
        this.resultPath = new String[]{this.getTempDirPath("result0"), this.getTempDirPath("result1")};
        File sequenceFile = this.createAndRegisterTempFile("seqFile");
        this.sequenceFileInPath = sequenceFile.toURI().toString();
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get((URI)URI.create(sequenceFile.getAbsolutePath()), (Configuration)conf);
        org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(sequenceFile.getAbsolutePath());
        int kvCount = 4;
        LongWritable key = new LongWritable();
        Text value = new Text();
        SequenceFile.Writer writer = null;
        try {
            writer = SequenceFile.createWriter((FileSystem)fs, (Configuration)conf, (org.apache.hadoop.fs.Path)path, key.getClass(), value.getClass());
            for (int i = 0; i < kvCount; ++i) {
                if (i == 1) {
                    for (int a = 0; a < 15; ++a) {
                        key.set((long)i);
                        value.set(i + " - somestring");
                        writer.append((Writable)key, (Writable)value);
                    }
                }
                key.set((long)i);
                value.set(i + " - somestring");
                writer.append((Writable)key, (Writable)value);
            }
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(writer);
            throw throwable;
        }
        IOUtils.closeStream((Closeable)writer);
        File sequenceFileNull = this.createAndRegisterTempFile("seqFileNullKey");
        this.sequenceFileInPathNull = sequenceFileNull.toURI().toString();
        path = new org.apache.hadoop.fs.Path(this.sequenceFileInPathNull);
        LongWritable value1 = new LongWritable();
        SequenceFile.Writer writer1 = null;
        try {
            writer1 = SequenceFile.createWriter((FileSystem)fs, (Configuration)conf, (org.apache.hadoop.fs.Path)path, NullWritable.class, value1.getClass());
            for (int i = 0; i < kvCount; ++i) {
                value1.set((long)i);
                writer1.append((Writable)NullWritable.get(), (Writable)value1);
            }
        }
        catch (Throwable throwable) {
            IOUtils.closeStream(writer1);
            throw throwable;
        }
        IOUtils.closeStream((Closeable)writer1);
    }

    protected JobExecutionResult testProgram() throws Exception {
        Tuple2<String[], JobExecutionResult> expectedResultAndJobExecutionResult = HadoopIOFormatPrograms.runProgram(this.curProgId, this.resultPath, this.sequenceFileInPath, this.sequenceFileInPathNull);
        this.expectedResult = (String[])expectedResultAndJobExecutionResult.f0;
        return (JobExecutionResult)expectedResultAndJobExecutionResult.f1;
    }

    protected void postSubmit() throws Exception {
        for (int i = 0; i < this.resultPath.length; ++i) {
            TestBaseUtils.compareResultsByLinesInMemory((String)this.expectedResult[i], (String)this.resultPath[i]);
        }
    }

    @Parameters(name="curProgId = {0}")
    public static Collection<Integer> getConfigurations() {
        ArrayList<Integer> programIds = new ArrayList<Integer>(2);
        for (int i = 1; i <= 2; ++i) {
            programIds.add(i);
        }
        return programIds;
    }

    private static class HadoopIOFormatPrograms {
        private HadoopIOFormatPrograms() {
        }

        public static Tuple2<String[], JobExecutionResult> runProgram(int progId, String[] resultPath, String sequenceFileInPath, String sequenceFileInPathNull) throws Exception {
            switch (progId) {
                case 1: {
                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                    SequenceFileInputFormat sfif = new SequenceFileInputFormat();
                    JobConf hdconf = new JobConf();
                    SequenceFileInputFormat.addInputPath((JobConf)hdconf, (org.apache.hadoop.fs.Path)new org.apache.hadoop.fs.Path(sequenceFileInPath));
                    HadoopInputFormat hif = new HadoopInputFormat((InputFormat)sfif, LongWritable.class, Text.class, hdconf);
                    DataStreamSource ds = env.createInput((org.apache.flink.api.common.io.InputFormat)hif);
                    ds.map((MapFunction)new MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>(){

                        public Tuple2<Long, Text> map(Tuple2<LongWritable, Text> value) throws Exception {
                            return new Tuple2((Object)((LongWritable)value.f0).get(), (Object)((Text)value.f1));
                        }
                    }).print();
                    SingleOutputStreamOperator sumed = ds.map((MapFunction)new MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>(){

                        public Tuple2<Long, Text> map(Tuple2<LongWritable, Text> value) throws Exception {
                            return new Tuple2((Object)((LongWritable)value.f0).get(), (Object)((Text)value.f1));
                        }
                    }).windowAll((WindowAssigner)GlobalWindows.createWithEndOfStreamTrigger()).reduce((ReduceFunction)new ReduceFunction<Tuple2<Long, Text>>(){

                        public Tuple2<Long, Text> reduce(Tuple2<Long, Text> value1, Tuple2<Long, Text> value2) {
                            return Tuple2.of((Object)((Long)value1.f0 + (Long)value2.f0), (Object)((Text)value2.f1));
                        }
                    });
                    sumed.sinkTo((Sink)FileSink.forRowFormat((Path)new Path(resultPath[0]), (Encoder)new SimpleStringEncoder()).build());
                    SingleOutputStreamOperator res = ds.keyBy((KeySelector & Serializable)x -> (LongWritable)x.f0).window((WindowAssigner)GlobalWindows.createWithEndOfStreamTrigger()).reduce((ReduceFunction)new ReduceFunction<Tuple2<LongWritable, Text>>(){

                        public Tuple2<LongWritable, Text> reduce(Tuple2<LongWritable, Text> value1, Tuple2<LongWritable, Text> value2) throws Exception {
                            return value1;
                        }
                    }).map((MapFunction)new MapFunction<Tuple2<LongWritable, Text>, String>(){

                        public String map(Tuple2<LongWritable, Text> value) throws Exception {
                            return value.f1 + " - " + ((LongWritable)value.f0).get();
                        }
                    });
                    res.sinkTo((Sink)FileSink.forRowFormat((Path)new Path(resultPath[1]), (Encoder)new SimpleStringEncoder()).build());
                    JobExecutionResult jobExecutionResult = env.execute();
                    return Tuple2.of((Object)new String[]{"(21,3 - somestring)", "0 - somestring - 0\n1 - somestring - 1\n2 - somestring - 2\n3 - somestring - 3\n"}, (Object)jobExecutionResult);
                }
                case 2: {
                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                    SequenceFileInputFormat sfif = new SequenceFileInputFormat();
                    JobConf hdconf = new JobConf();
                    SequenceFileInputFormat.addInputPath((JobConf)hdconf, (org.apache.hadoop.fs.Path)new org.apache.hadoop.fs.Path(sequenceFileInPathNull));
                    HadoopInputFormat hif = new HadoopInputFormat((InputFormat)sfif, NullWritable.class, LongWritable.class, hdconf);
                    DataStreamSource ds = env.createInput((org.apache.flink.api.common.io.InputFormat)hif);
                    SingleOutputStreamOperator res = ds.map((MapFunction)new MapFunction<Tuple2<NullWritable, LongWritable>, Tuple2<Void, Long>>(){

                        public Tuple2<Void, Long> map(Tuple2<NullWritable, LongWritable> value) throws Exception {
                            return new Tuple2(null, (Object)((LongWritable)value.f1).get());
                        }
                    });
                    SingleOutputStreamOperator res1 = res.keyBy((KeySelector & Serializable)x -> (Long)x.f1).sum(1);
                    res1.sinkTo((Sink)FileSink.forRowFormat((Path)new Path(resultPath[1]), (Encoder)new SimpleStringEncoder()).build());
                    res.sinkTo((Sink)FileSink.forRowFormat((Path)new Path(resultPath[0]), (Encoder)new SimpleStringEncoder()).build());
                    JobExecutionResult jobExecutionResult = env.execute();
                    return Tuple2.of((Object)new String[]{"(null,2)\n(null,0)\n(null,1)\n(null,3)", "(null,0)\n(null,1)\n(null,2)\n(null,3)"}, (Object)jobExecutionResult);
                }
            }
            throw new IllegalArgumentException("Invalid program id");
        }
    }
}

