package org.apache.flink.test.io;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/test/io/RichInputOutputITCase.class */
public class RichInputOutputITCase extends JavaProgramTestBase {
    private String inputPath;
    private static ConcurrentLinkedQueue<Integer> readCalls;
    private static ConcurrentLinkedQueue<Integer> writeCalls;

    /* loaded from: input_file:org/apache/flink/test/io/RichInputOutputITCase$TestInputFormat.class */
    private static final class TestInputFormat extends TextInputFormat {
        private static final long serialVersionUID = 1;
        private LongCounter counter;

        public TestInputFormat(Path path) {
            super(path);
            this.counter = new LongCounter();
        }

        public void initializeSplit(FileInputSplit fileInputSplit, Long l) throws IOException {
            try {
                getRuntimeContext().addAccumulator("DATA_SOURCE_ACCUMULATOR", this.counter);
            } catch (UnsupportedOperationException e) {
            }
            super.initializeSplit(fileInputSplit, l);
        }

        public String nextRecord(String str) throws IOException {
            RichInputOutputITCase.readCalls.add(1);
            this.counter.add(1L);
            return (String) super.nextRecord(str);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/io/RichInputOutputITCase$TestOutputFormat.class */
    private static final class TestOutputFormat extends RichOutputFormat<String> {
        private LongCounter counter;

        private TestOutputFormat() {
            this.counter = new LongCounter();
        }

        public void configure(Configuration configuration) {
        }

        public void open(int i, int i2) {
            try {
                getRuntimeContext().addAccumulator("DATA_SINK_ACCUMULATOR", this.counter);
            } catch (UnsupportedOperationException e) {
            }
        }

        public void close() throws IOException {
        }

        public void writeRecord(String str) {
            RichInputOutputITCase.writeCalls.add(1);
            this.counter.add(1L);
        }
    }

    protected void preSubmit() throws Exception {
        this.inputPath = createTempFile("input", "ab\ncd\nef\n");
    }

    protected void testProgram() throws Exception {
        readCalls = new ConcurrentLinkedQueue<>();
        writeCalls = new ConcurrentLinkedQueue<>();
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.createInput(new TestInputFormat(new Path(this.inputPath))).output(new TestOutputFormat());
        JobExecutionResult execute = executionEnvironment.execute();
        Object obj = execute.getAllAccumulatorResults().get("DATA_SOURCE_ACCUMULATOR");
        Object obj2 = execute.getAllAccumulatorResults().get("DATA_SINK_ACCUMULATOR");
        long longValue = ((Long) obj).longValue();
        long longValue2 = ((Long) obj2).longValue();
        Assert.assertEquals(longValue, readCalls.size());
        Assert.assertEquals(longValue2, writeCalls.size());
    }
}
