/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class ProcessOperatorTest {
    ProcessOperatorTest() {
    }

    @Test
    void testTimestampAndWatermarkQuerying() throws Exception {
        ProcessOperator operator = new ProcessOperator((ProcessFunction)new QueryingProcessFunction(TimeDomain.EVENT_TIME));
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator);
        testHarness.setup();
        testHarness.open();
        testHarness.processWatermark(new Watermark(17L));
        testHarness.processElement(new StreamRecord((Object)5, 12L));
        testHarness.processWatermark(new Watermark(42L));
        testHarness.processElement(new StreamRecord((Object)6, 13L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new Watermark(17L));
        expectedOutput.add(new StreamRecord((Object)"5TIME:17 TS:12", 12L));
        expectedOutput.add(new Watermark(42L));
        expectedOutput.add(new StreamRecord((Object)"6TIME:42 TS:13", 13L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTimestampAndProcessingTimeQuerying() throws Exception {
        ProcessOperator operator = new ProcessOperator((ProcessFunction)new QueryingProcessFunction(TimeDomain.PROCESSING_TIME));
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator);
        testHarness.setup();
        testHarness.open();
        testHarness.setProcessingTime(17L);
        testHarness.processElement(new StreamRecord((Object)5));
        testHarness.setProcessingTime(42L);
        testHarness.processElement(new StreamRecord((Object)6));
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)"5TIME:17 TS:null"));
        expectedOutput.add(new StreamRecord((Object)"6TIME:42 TS:null"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testNullOutputTagRefusal() throws Exception {
        ProcessOperator operator = new ProcessOperator((ProcessFunction)new NullOutputTagEmittingProcessFunction());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator);
        testHarness.setup();
        testHarness.open();
        testHarness.setProcessingTime(17L);
        try {
            Assertions.assertThatThrownBy(() -> testHarness.processElement(new StreamRecord((Object)5))).isInstanceOf(IllegalArgumentException.class);
        }
        finally {
            testHarness.close();
        }
    }

    @Test
    void testSideOutput() throws Exception {
        ProcessOperator operator = new ProcessOperator((ProcessFunction)new SideOutputProcessFunction());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(operator);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)42, 17L));
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)"IN:42", 17L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        ConcurrentLinkedQueue<StreamRecord> expectedIntSideOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedIntSideOutput.add(new StreamRecord((Object)42, 17L));
        ConcurrentLinkedQueue<StreamRecord<Integer>> intSideOutput = testHarness.getSideOutput(SideOutputProcessFunction.INTEGER_OUTPUT_TAG);
        TestHarnessUtil.assertOutputEquals("Side output was not correct.", expectedIntSideOutput, intSideOutput);
        ConcurrentLinkedQueue<StreamRecord> expectedLongSideOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedLongSideOutput.add(new StreamRecord((Object)42L, 17L));
        ConcurrentLinkedQueue<StreamRecord<Long>> longSideOutput = testHarness.getSideOutput(SideOutputProcessFunction.LONG_OUTPUT_TAG);
        TestHarnessUtil.assertOutputEquals("Side output was not correct.", expectedLongSideOutput, longSideOutput);
        testHarness.close();
    }

    private static class QueryingProcessFunction
    extends ProcessFunction<Integer, String> {
        private static final long serialVersionUID = 1L;
        private final TimeDomain timeDomain;

        public QueryingProcessFunction(TimeDomain timeDomain) {
            this.timeDomain = timeDomain;
        }

        public void processElement(Integer value, ProcessFunction.Context ctx, Collector<String> out) throws Exception {
            if (this.timeDomain.equals((Object)TimeDomain.EVENT_TIME)) {
                out.collect((Object)(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()));
            } else {
                out.collect((Object)(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()));
            }
        }

        public void onTimer(long timestamp, ProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
        }
    }

    private static class SideOutputProcessFunction
    extends ProcessFunction<Integer, String> {
        static final OutputTag<Integer> INTEGER_OUTPUT_TAG = new OutputTag<Integer>("int-out"){};
        static final OutputTag<Long> LONG_OUTPUT_TAG = new OutputTag<Long>("long-out"){};

        private SideOutputProcessFunction() {
        }

        public void processElement(Integer value, ProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)("IN:" + value));
            ctx.output(INTEGER_OUTPUT_TAG, (Object)value);
            ctx.output(LONG_OUTPUT_TAG, (Object)value.longValue());
        }
    }

    private static class NullOutputTagEmittingProcessFunction
    extends ProcessFunction<Integer, String> {
        private NullOutputTagEmittingProcessFunction() {
        }

        public void processElement(Integer value, ProcessFunction.Context ctx, Collector<String> out) throws Exception {
            ctx.output(null, (Object)value);
        }
    }
}

