/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class StreamGroupedReduceOperatorTest {
    private static TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;

    StreamGroupedReduceOperatorTest() {
    }

    @Test
    void testGroupedReduce() throws Exception {
        IntegerKeySelector keySelector = new IntegerKeySelector();
        StreamGroupedReduceOperator operator = new StreamGroupedReduceOperator((ReduceFunction)new MyReducer(), (TypeSerializer)IntSerializer.INSTANCE);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, (KeySelector<Integer, Integer>)keySelector, (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)1, initialTime + 1L));
        testHarness.processElement(new StreamRecord((Object)1, initialTime + 2L));
        testHarness.processWatermark(new Watermark(initialTime + 2L));
        testHarness.processElement(new StreamRecord((Object)2, initialTime + 3L));
        testHarness.processElement(new StreamRecord((Object)2, initialTime + 4L));
        testHarness.processElement(new StreamRecord((Object)3, initialTime + 5L));
        expectedOutput.add(new StreamRecord((Object)1, initialTime + 1L));
        expectedOutput.add(new StreamRecord((Object)2, initialTime + 2L));
        expectedOutput.add(new Watermark(initialTime + 2L));
        expectedOutput.add(new StreamRecord((Object)2, initialTime + 3L));
        expectedOutput.add(new StreamRecord((Object)4, initialTime + 4L));
        expectedOutput.add(new StreamRecord((Object)3, initialTime + 5L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    void testOpenClose() throws Exception {
        IntegerKeySelector keySelector = new IntegerKeySelector();
        StreamGroupedReduceOperator operator = new StreamGroupedReduceOperator((ReduceFunction)new TestOpenCloseReduceFunction(), (TypeSerializer)IntSerializer.INSTANCE);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, (KeySelector<Integer, Integer>)keySelector, (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO);
        long initialTime = 0L;
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)1, initialTime));
        testHarness.processElement(new StreamRecord((Object)2, initialTime));
        testHarness.close();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)TestOpenCloseReduceFunction.openCalled).as("RichFunction methods where not called.", new Object[0])).isTrue();
        ((AbstractCollectionAssert)Assertions.assertThat(testHarness.getOutput()).as("Output contains no elements.", new Object[0])).isNotEmpty();
    }

    private static class IntegerKeySelector
    implements KeySelector<Integer, Integer> {
        private static final long serialVersionUID = 1L;

        private IntegerKeySelector() {
        }

        public Integer getKey(Integer value) throws Exception {
            return value;
        }
    }

    private static class MyReducer
    implements ReduceFunction<Integer> {
        private static final long serialVersionUID = 1L;

        private MyReducer() {
        }

        public Integer reduce(Integer value1, Integer value2) throws Exception {
            return value1 + value2;
        }
    }

    private static class TestOpenCloseReduceFunction
    extends RichReduceFunction<Integer> {
        private static final long serialVersionUID = 1L;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        private TestOpenCloseReduceFunction() {
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)closeCalled).as("Close called before open.", new Object[0])).isFalse();
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)openCalled).as("Open was not called before close.", new Object[0])).isTrue();
            closeCalled = true;
        }

        public Integer reduce(Integer in1, Integer in2) throws Exception {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)openCalled).as("Open was not called before run.", new Object[0])).isTrue();
            return in1 + in2;
        }
    }
}

