/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class CoStreamITCase
extends AbstractTestBase {
    @Test
    public void test() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        TestListResultSink resultSink = new TestListResultSink();
        DataStreamSource src = env.fromElements((Object[])new Integer[]{1, 3, 5});
        KeyedStream filter1 = src.filter((FilterFunction)new FilterFunction<Integer>(){

            public boolean filter(Integer value) throws Exception {
                return true;
            }
        }).keyBy((KeySelector)new KeySelector<Integer, Integer>(){

            public Integer getKey(Integer value) throws Exception {
                return value;
            }
        });
        KeyedStream filter2 = src.map((MapFunction)new MapFunction<Integer, Tuple2<Integer, Integer>>(){

            public Tuple2<Integer, Integer> map(Integer value) throws Exception {
                return new Tuple2((Object)value, (Object)(value + 1));
            }
        }).rebalance().filter((FilterFunction)new FilterFunction<Tuple2<Integer, Integer>>(){

            public boolean filter(Tuple2<Integer, Integer> value) throws Exception {
                return true;
            }
        }).disableChaining().keyBy((KeySelector)new KeySelector<Tuple2<Integer, Integer>, Integer>(){

            public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
                return (Integer)value.f0;
            }
        });
        SingleOutputStreamOperator connected = filter1.connect((DataStream)filter2).flatMap((CoFlatMapFunction)new CoFlatMapFunction<Integer, Tuple2<Integer, Integer>, String>(){

            public void flatMap1(Integer value, Collector<String> out) throws Exception {
                out.collect((Object)value.toString());
            }

            public void flatMap2(Tuple2<Integer, Integer> value, Collector<String> out) throws Exception {
                out.collect((Object)value.toString());
            }
        });
        connected.addSink((SinkFunction)resultSink);
        env.execute();
        List<String> expected = Arrays.asList("(1,2)", "(3,4)", "(5,6)", "1", "3", "5");
        List result = resultSink.getResult();
        Collections.sort(result);
        Assert.assertEquals(expected, (Object)result);
    }
}

