package org.apache.flink.runtime.operators;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/FlatMapTaskTest.class */
public class FlatMapTaskTest extends DriverTestBase<FlatMapFunction<Record, Record>> {
    private static final Logger LOG = LoggerFactory.getLogger(FlatMapTaskTest.class);
    private final DriverTestBase.CountingOutputCollector output;

    /* loaded from: input_file:org/apache/flink/runtime/operators/FlatMapTaskTest$MockFailingMapStub.class */
    public static class MockFailingMapStub extends RichFlatMapFunction<Record, Record> {
        private static final long serialVersionUID = 1;
        private int cnt = 0;

        public void flatMap(Record record, Collector<Record> collector) throws Exception {
            int i = this.cnt + 1;
            this.cnt = i;
            if (i >= 10) {
                throw new ExpectedTestException();
            }
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Record) obj, (Collector<Record>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/FlatMapTaskTest$MockMapStub.class */
    public static class MockMapStub extends RichFlatMapFunction<Record, Record> {
        private static final long serialVersionUID = 1;

        public void flatMap(Record record, Collector<Record> collector) throws Exception {
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Record) obj, (Collector<Record>) collector);
        }
    }

    public FlatMapTaskTest(ExecutionConfig executionConfig) {
        super(executionConfig, 0L, 0);
        this.output = new DriverTestBase.CountingOutputCollector();
    }

    @TestTemplate
    void testMapTask() {
        addInput(new UniformRecordGenerator(100, 20, false));
        setOutput(this.output);
        try {
            testDriver((Driver) new FlatMapDriver(), MockMapStub.class);
        } catch (Exception e) {
            LOG.debug("Exception while running the test driver.", e);
            Assertions.fail("Invoke method caused exception.");
        }
        Assertions.assertThat(this.output.getNumberOfRecords()).withFailMessage("Wrong result set size.", new Object[0]).isEqualTo(2000);
    }

    @TestTemplate
    void testFailingMapTask() {
        addInput(new UniformRecordGenerator(100, 20, false));
        setOutput(new DiscardingOutputCollector());
        FlatMapDriver flatMapDriver = new FlatMapDriver();
        Assertions.assertThatThrownBy(() -> {
            testDriver((Driver) flatMapDriver, MockFailingMapStub.class);
        }).isInstanceOf(ExpectedTestException.class);
    }

    @TestTemplate
    void testCancelMapTask() {
        addInput(new InfiniteInputIterator());
        setOutput(new DiscardingOutputCollector());
        final FlatMapDriver flatMapDriver = new FlatMapDriver();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: org.apache.flink.runtime.operators.FlatMapTaskTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    FlatMapTaskTest.this.testDriver((Driver) flatMapDriver, MockMapStub.class);
                    atomicBoolean.set(true);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        thread.start();
        TaskCancelThread taskCancelThread = new TaskCancelThread(1, thread, this);
        taskCancelThread.start();
        try {
            taskCancelThread.join();
            thread.join();
        } catch (InterruptedException e) {
            Assertions.fail("Joining threads failed");
        }
        Assertions.assertThat(atomicBoolean).withFailMessage("Test threw an exception even though it was properly canceled.", new Object[0]).isTrue();
    }
}
