package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamMapTest.class */
public class StreamMapTest {

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamMapTest$Map.class */
    private static class Map implements MapFunction<Integer, String> {
        private static final long serialVersionUID = 1;

        private Map() {
        }

        public String map(Integer num) throws Exception {
            return "+" + (num.intValue() + 1);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamMapTest$TestOpenCloseMapFunction.class */
    private static class TestOpenCloseMapFunction extends RichMapFunction<String, String> {
        private static final long serialVersionUID = 1;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        private TestOpenCloseMapFunction() {
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            if (closeCalled) {
                Assert.fail("Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail("Open was not called before close.");
            }
            closeCalled = true;
        }

        public String map(String str) throws Exception {
            if (!openCalled) {
                Assert.fail("Open was not called before run.");
            }
            return str;
        }
    }

    @Test
    public void testMap() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new StreamMap(new Map()));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(1, 0 + 1));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(2, 0 + 2));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(0 + 2));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(3, 0 + 3));
        concurrentLinkedQueue.add(new StreamRecord("+2", 0 + 1));
        concurrentLinkedQueue.add(new StreamRecord("+3", 0 + 2));
        concurrentLinkedQueue.add(new Watermark(0 + 2));
        concurrentLinkedQueue.add(new StreamRecord("+4", 0 + 3));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput());
    }

    @Test
    public void testOpenClose() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new StreamMap(new TestOpenCloseMapFunction()));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("Hello", 0L));
        oneInputStreamOperatorTestHarness.close();
        Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
        Assert.assertTrue("Output contains no elements.", oneInputStreamOperatorTestHarness.getOutput().size() > 0);
    }
}
