package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/ProcessOperatorTest.class */
class ProcessOperatorTest {

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/ProcessOperatorTest$NullOutputTagEmittingProcessFunction.class */
    private static class NullOutputTagEmittingProcessFunction extends ProcessFunction<Integer, String> {
        private NullOutputTagEmittingProcessFunction() {
        }

        public void processElement(Integer num, ProcessFunction<Integer, String>.Context context, Collector<String> collector) throws Exception {
            context.output((OutputTag) null, num);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (ProcessFunction<Integer, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/ProcessOperatorTest$QueryingProcessFunction.class */
    private static class QueryingProcessFunction extends ProcessFunction<Integer, String> {
        private static final long serialVersionUID = 1;
        private final TimeDomain timeDomain;

        public QueryingProcessFunction(TimeDomain timeDomain) {
            this.timeDomain = timeDomain;
        }

        public void processElement(Integer num, ProcessFunction<Integer, String>.Context context, Collector<String> collector) throws Exception {
            if (this.timeDomain.equals(TimeDomain.EVENT_TIME)) {
                long currentWatermark = context.timerService().currentWatermark();
                context.timestamp();
                collector.collect(num + "TIME:" + currentWatermark + " TS:" + collector);
            } else {
                long currentProcessingTime = context.timerService().currentProcessingTime();
                context.timestamp();
                collector.collect(num + "TIME:" + currentProcessingTime + " TS:" + collector);
            }
        }

        public void onTimer(long j, ProcessFunction<Integer, String>.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (ProcessFunction<Integer, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/ProcessOperatorTest$SideOutputProcessFunction.class */
    private static class SideOutputProcessFunction extends ProcessFunction<Integer, String> {
        static final OutputTag<Integer> INTEGER_OUTPUT_TAG = new OutputTag<Integer>("int-out") { // from class: org.apache.flink.streaming.api.operators.ProcessOperatorTest.SideOutputProcessFunction.1
        };
        static final OutputTag<Long> LONG_OUTPUT_TAG = new OutputTag<Long>("long-out") { // from class: org.apache.flink.streaming.api.operators.ProcessOperatorTest.SideOutputProcessFunction.2
        };

        private SideOutputProcessFunction() {
        }

        public void processElement(Integer num, ProcessFunction<Integer, String>.Context context, Collector<String> collector) throws Exception {
            collector.collect("IN:" + num);
            context.output(INTEGER_OUTPUT_TAG, num);
            context.output(LONG_OUTPUT_TAG, Long.valueOf(num.longValue()));
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (ProcessFunction<Integer, String>.Context) context, (Collector<String>) collector);
        }
    }

    ProcessOperatorTest() {
    }

    @Test
    void testTimestampAndWatermarkQuerying() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new ProcessOperator(new QueryingProcessFunction(TimeDomain.EVENT_TIME)));
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(17L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(5, 12L));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(42L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(6, 13L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new Watermark(17L));
        concurrentLinkedQueue.add(new StreamRecord("5TIME:17 TS:12", 12L));
        concurrentLinkedQueue.add(new Watermark(42L));
        concurrentLinkedQueue.add(new StreamRecord("6TIME:42 TS:13", 13L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    void testTimestampAndProcessingTimeQuerying() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new ProcessOperator(new QueryingProcessFunction(TimeDomain.PROCESSING_TIME)));
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.setProcessingTime(17L);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(5));
        oneInputStreamOperatorTestHarness.setProcessingTime(42L);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(6));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("5TIME:17 TS:null"));
        concurrentLinkedQueue.add(new StreamRecord("6TIME:42 TS:null"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput());
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    void testNullOutputTagRefusal() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new ProcessOperator(new NullOutputTagEmittingProcessFunction()));
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.setProcessingTime(17L);
        try {
            Assertions.assertThatThrownBy(() -> {
                oneInputStreamOperatorTestHarness.processElement(new StreamRecord(5));
            }).isInstanceOf(IllegalArgumentException.class);
        } finally {
            oneInputStreamOperatorTestHarness.close();
        }
    }

    @Test
    void testSideOutput() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new ProcessOperator(new SideOutputProcessFunction()));
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(42, 17L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("IN:42", 17L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput());
        ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
        concurrentLinkedQueue2.add(new StreamRecord(42, 17L));
        TestHarnessUtil.assertOutputEquals("Side output was not correct.", concurrentLinkedQueue2, oneInputStreamOperatorTestHarness.getSideOutput(SideOutputProcessFunction.INTEGER_OUTPUT_TAG));
        ConcurrentLinkedQueue concurrentLinkedQueue3 = new ConcurrentLinkedQueue();
        concurrentLinkedQueue3.add(new StreamRecord(42L, 17L));
        TestHarnessUtil.assertOutputEquals("Side output was not correct.", concurrentLinkedQueue3, oneInputStreamOperatorTestHarness.getSideOutput(SideOutputProcessFunction.LONG_OUTPUT_TAG));
        oneInputStreamOperatorTestHarness.close();
    }
}
