package org.apache.flink.test.cancelling;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.InfiniteIntegerInputFormat;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/cancelling/MapCancelingITCase.class */
public class MapCancelingITCase extends CancelingTestBase {

    /* loaded from: input_file:org/apache/flink/test/cancelling/MapCancelingITCase$DelayingIdentityMapper.class */
    private static final class DelayingIdentityMapper<IN> implements MapFunction<IN, IN> {
        private static final long serialVersionUID = 1;
        private static final int WAIT_TIME_PER_VALUE = 10000;

        private DelayingIdentityMapper() {
        }

        public IN map(IN in) throws Exception {
            Thread.sleep(10000L);
            return in;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/cancelling/MapCancelingITCase$IdentityMapper.class */
    private static final class IdentityMapper<IN> implements MapFunction<IN, IN> {
        private static final long serialVersionUID = 1;

        private IdentityMapper() {
        }

        public IN map(IN in) throws Exception {
            return in;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/cancelling/MapCancelingITCase$LongCancelTimeIdentityMapper.class */
    private static final class LongCancelTimeIdentityMapper<IN> implements MapFunction<IN, IN> {
        private static final long serialVersionUID = 1;
        private static final int WAIT_TIME_PER_VALUE = 5000;

        private LongCancelTimeIdentityMapper() {
        }

        public IN map(IN in) throws Exception {
            long currentTimeMillis;
            long currentTimeMillis2 = System.currentTimeMillis();
            long j = 5000;
            do {
                try {
                    Thread.sleep(j);
                } catch (InterruptedException e) {
                }
                currentTimeMillis = (5000 - System.currentTimeMillis()) + currentTimeMillis2;
                j = currentTimeMillis;
            } while (currentTimeMillis > 0);
            return in;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/cancelling/MapCancelingITCase$StuckInOpenIdentityMapper.class */
    private static final class StuckInOpenIdentityMapper<IN> extends RichMapFunction<IN, IN> {
        private static final long serialVersionUID = 1;

        private StuckInOpenIdentityMapper() {
        }

        public void open(Configuration configuration) throws Exception {
            synchronized (this) {
                wait();
            }
        }

        public IN map(IN in) throws Exception {
            return in;
        }
    }

    @Test
    public void testMapCancelling() throws Exception {
        executeTask(new IdentityMapper());
    }

    @Test
    public void testSlowMapCancelling() throws Exception {
        executeTask(new DelayingIdentityMapper());
    }

    @Test
    public void testMapWithLongCancellingResponse() throws Exception {
        executeTask(new LongCancelTimeIdentityMapper());
    }

    @Test
    public void testMapPriorToFirstRecordReading() throws Exception {
        executeTask(new StuckInOpenIdentityMapper());
    }

    public void executeTask(MapFunction<Integer, Integer> mapFunction) throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.createInput(new InfiniteIntegerInputFormat(false)).map(mapFunction).output(new DiscardingOutputFormat());
        executionEnvironment.setParallelism(4);
        runAndCancelJob(executionEnvironment.createProgramPlan(), 5000, 10000);
    }
}
