/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.operators;

import java.io.IOException;
import java.io.NotSerializableException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.TwoInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class CoGroupITCase
extends MultipleProgramsTestBase {
    public CoGroupITCase(MultipleProgramsTestBase.TestExecutionMode mode) {
        super(mode);
    }

    @Test
    public void testCoGroupTuplesWithKeyFieldSelector() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
        CoGroupOperator coGroupDs = ds.coGroup(ds2).where(new int[]{0}).equalTo(new int[]{0}).with((CoGroupFunction)new Tuple5CoGroup());
        List result = coGroupDs.collect();
        String expected = "1,0\n2,6\n3,24\n4,60\n5,120\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCoGroupOnTwoCustomTypeInputsWithKeyExtractors() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
        DataSet<CollectionDataSets.CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
        CoGroupOperator coGroupDs = ds.coGroup(ds2).where((KeySelector)new KeySelector4()).equalTo((KeySelector)new KeySelector5()).with((CoGroupFunction)new CustomTypeCoGroup());
        List result = coGroupDs.collect();
        String expected = "1,0,test\n2,6,test\n3,24,test\n4,60,test\n5,120,test\n6,210,test\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfCoGroupIfUDFReturnsLeftInputObjects() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
        CoGroupOperator coGroupDs = ds.coGroup(ds2).where(new int[]{0}).equalTo(new int[]{0}).with((CoGroupFunction)new Tuple3ReturnLeft());
        List result = coGroupDs.collect();
        String expected = "1,1,Hi\n2,2,Hello\n3,2,Hello world\n4,3,Hello world, how are you?\n5,3,I am fine.\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfCoGroupIfUDFReturnsRightInputObjects() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
        CoGroupOperator coGroupDs = ds.coGroup(ds2).where(new int[]{0}).equalTo(new int[]{0}).with((CoGroupFunction)new Tuple5ReturnRight());
        List result = coGroupDs.collect();
        String expected = "1,1,0,Hallo,1\n2,2,1,Hallo Welt,2\n2,3,2,Hallo Welt wie,1\n3,4,3,Hallo Welt wie gehts?,2\n3,5,4,ABC,2\n3,6,5,BCD,3\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCoGroupWithBroadcastSet() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
        TwoInputUdfOperator coGroupDs = ds.coGroup(ds2).where(new int[]{0}).equalTo(new int[]{0}).with((CoGroupFunction)new Tuple5CoGroupBC()).withBroadcastSet(intDs, "ints");
        List result = coGroupDs.collect();
        String expected = "1,0,55\n2,6,55\n3,24,55\n4,60,55\n5,120,55\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCoGroupOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
        DataSet<CollectionDataSets.CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
        CoGroupOperator coGroupDs = ds.coGroup(ds2).where(new int[]{2}).equalTo((KeySelector)new KeySelector2()).with((CoGroupFunction)new MixedCoGroup());
        List result = coGroupDs.collect();
        String expected = "0,1,test\n1,2,test\n2,5,test\n3,15,test\n4,33,test\n5,63,test\n6,109,test\n7,4,test\n8,4,test\n9,4,test\n10,5,test\n11,5,test\n12,5,test\n13,5,test\n14,5,test\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCoGroupOnACustomTypeWithKeyExtractorAndATupleInputWithKeyFieldSelector() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
        DataSet<CollectionDataSets.CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
        CoGroupOperator coGroupDs = ds2.coGroup(ds).where((KeySelector)new KeySelector3()).equalTo(new int[]{2}).with((CoGroupFunction)new MixedCoGroup2());
        List result = coGroupDs.collect();
        String expected = "0,1,test\n1,2,test\n2,5,test\n3,15,test\n4,33,test\n5,63,test\n6,109,test\n7,4,test\n8,4,test\n9,4,test\n10,5,test\n11,5,test\n12,5,test\n13,5,test\n14,5,test\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
        DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
        CoGroupOperator coGrouped = ds1.coGroup(ds2).where(new int[]{0, 4}).equalTo(new int[]{0, 1}).with((CoGroupFunction)new Tuple5Tuple3CoGroup());
        List result = coGrouped.collect();
        String expected = "1,1,Hallo\n2,2,Hallo Welt\n3,2,Hallo Welt wie gehts?\n3,2,ABC\n5,3,HIJ\n5,3,IJK\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCoGroupWithMultipleKeyFieldsWithStaticClassKeyExtractor() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
        DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
        CoGroupOperator coGrouped = ds1.coGroup(ds2).where((KeySelector)new KeySelector7()).equalTo((KeySelector)new KeySelector8()).with((CoGroupFunction)new Tuple5Tuple3CoGroup());
        List result = coGrouped.collect();
        String expected = "1,1,Hallo\n2,2,Hallo Welt\n3,2,Hallo Welt wie gehts?\n3,2,ABC\n5,3,HIJ\n5,3,IJK\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithClosureCleaner() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
        DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
        CoGroupOperator coGrouped = ds1.coGroup(ds2).where((KeySelector)new KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>>(){

            public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
                return new Tuple2(t.f0, t.f4);
            }
        }).equalTo((KeySelector)new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>(){

            public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) {
                return new Tuple2(t.f0, t.f1);
            }
        }).with((CoGroupFunction)new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>(){

            public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, Iterable<Tuple3<Integer, Long, String>> second, Collector<Tuple3<Integer, Long, String>> out) {
                ArrayList<Object> strs = new ArrayList<Object>();
                for (Tuple5<Integer, Long, Integer, String, Long> tuple5 : first) {
                    strs.add(tuple5.f3);
                }
                for (Tuple3 tuple3 : second) {
                    for (String string : strs) {
                        out.collect((Object)new Tuple3(tuple3.f0, tuple3.f1, (Object)string));
                    }
                }
            }
        });
        List result = coGrouped.collect();
        String expected = "1,1,Hallo\n2,2,Hallo Welt\n3,2,Hallo Welt wie gehts?\n3,2,ABC\n5,3,HIJ\n5,3,IJK\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithoutClosureCleaner() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableClosureCleaner();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
        DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
        boolean correctExceptionTriggered = false;
        try {
            CoGroupOperator coGroupOperator = ds1.coGroup(ds2).where((KeySelector)new KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>>(){

                public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
                    return new Tuple2(t.f0, t.f4);
                }
            }).equalTo((KeySelector)new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>(){

                public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) {
                    return new Tuple2(t.f0, t.f1);
                }
            }).with((CoGroupFunction)new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>(){

                public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, Iterable<Tuple3<Integer, Long, String>> second, Collector<Tuple3<Integer, Long, String>> out) {
                    ArrayList<Object> strs = new ArrayList<Object>();
                    for (Tuple5<Integer, Long, Integer, String, Long> tuple5 : first) {
                        strs.add(tuple5.f3);
                    }
                    for (Tuple3 tuple3 : second) {
                        for (String string : strs) {
                            out.collect((Object)new Tuple3(tuple3.f0, tuple3.f1, (Object)string));
                        }
                    }
                }
            });
        }
        catch (InvalidProgramException ex) {
            correctExceptionTriggered = ex.getCause() instanceof NotSerializableException;
        }
        Assert.assertTrue((boolean)correctExceptionTriggered);
    }

    @Test
    public void testCoGroupTwoCustomTypeInputsWithExpressionKeys() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
        DataSet<CollectionDataSets.CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
        CoGroupOperator coGroupDs = ds.coGroup(ds2).where(new String[]{"myInt"}).equalTo(new String[]{"myInt"}).with((CoGroupFunction)new CustomTypeCoGroup());
        List result = coGroupDs.collect();
        String expected = "1,0,test\n2,6,test\n3,24,test\n4,60,test\n5,120,test\n6,210,test\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testCoGroupOnTwoCustomTypeInputsWithExpressionKeyAndFieldSelector() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
        DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
        CoGroupOperator coGroupDs = ds.coGroup(ds2).where(new String[]{"nestedPojo.longNumber"}).equalTo(new int[]{6}).with((CoGroupFunction)new CoGroup1());
        List result = coGroupDs.collect();
        String expected = "-1,20000,Flink\n-1,10000,Flink\n-1,30000,Flink\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testCoGroupFieldSelectorAndComplicatedKeySelector() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
        DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
        CoGroupOperator coGroupDs = ds.coGroup(ds2).where((KeySelector)new KeySelector6()).equalTo(new int[]{6}).with((CoGroupFunction)new CoGroup3());
        List result = coGroupDs.collect();
        String expected = "-1,20000,Flink\n-1,10000,Flink\n-1,30000,Flink\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testCoGroupFieldSelectorAndKeySelector() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
        DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
        CoGroupOperator coGroupDs = ds.coGroup(ds2).where((KeySelector)new KeySelector1()).equalTo(new int[]{6}).with((CoGroupFunction)new CoGroup2());
        List result = coGroupDs.collect();
        String expected = "-1,20000,Flink\n-1,10000,Flink\n-1,30000,Flink\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testCoGroupWithAtomicType1() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
        DataSource ds2 = env.fromElements((Object[])new Integer[]{0, 1, 2});
        CoGroupOperator coGroupDs = ds1.coGroup((DataSet)ds2).where(new int[]{0}).equalTo(new String[]{"*"}).with((CoGroupFunction)new CoGroupAtomic1());
        List result = coGroupDs.collect();
        String expected = "(1,1,Hi)\n(2,2,Hello)";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testCoGroupWithAtomicType2() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource ds1 = env.fromElements((Object[])new Integer[]{0, 1, 2});
        DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
        CoGroupOperator coGroupDs = ds1.coGroup(ds2).where(new String[]{"*"}).equalTo(new int[]{0}).with((CoGroupFunction)new CoGroupAtomic2());
        List result = coGroupDs.collect();
        String expected = "(1,1,Hi)\n(2,2,Hello)";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testCoGroupWithRangePartitioning() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
        DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
        env.setParallelism(4);
        TestDistribution testDis = new TestDistribution();
        CoGroupOperator coGrouped = DataSetUtils.partitionByRange(ds1, (DataDistribution)testDis, (int[])new int[]{0, 4}).coGroup((DataSet)DataSetUtils.partitionByRange(ds2, (DataDistribution)testDis, (int[])new int[]{0, 1})).where(new int[]{0, 4}).equalTo(new int[]{0, 1}).with((CoGroupFunction)new Tuple5Tuple3CoGroup());
        List result = coGrouped.collect();
        String expected = "1,1,Hallo\n2,2,Hallo Welt\n3,2,Hallo Welt wie gehts?\n3,2,ABC\n5,3,HIJ\n5,3,IJK\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCoGroupLambda() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource left = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)1, (Object)"hello"), new Tuple2((Object)2, (Object)"what's"), new Tuple2((Object)2, (Object)"up")});
        DataSource right = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)1, (Object)"not"), new Tuple2((Object)1, (Object)"much"), new Tuple2((Object)2, (Object)"really")});
        TwoInputUdfOperator joined = left.coGroup((DataSet)right).where(new int[]{0}).equalTo(new int[]{0}).with((CoGroupFunction & Serializable)(values1, values2, out) -> {
            int sum = 0;
            for (Tuple2 next : values1) {
                sum += ((Integer)next.f0).intValue();
            }
            for (Tuple2 next : values2) {
                sum += ((Integer)next.f0).intValue();
            }
            out.collect((Object)sum);
        }).returns(Integer.class);
        List result = joined.collect();
        String expected = "6\n3\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    public static class TestDistribution
    implements DataDistribution {
        public Object[][] boundaries = new Object[][]{{2, 2L}, {5, 4L}, {10, 12L}, {21, 6L}};

        public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
            return this.boundaries[bucketNum];
        }

        public int getNumberOfFields() {
            return 2;
        }

        public TypeInformation[] getKeyTypes() {
            return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO};
        }

        public void write(DataOutputView out) throws IOException {
        }

        public void read(DataInputView in) throws IOException {
        }

        public boolean equals(Object obj) {
            return obj instanceof TestDistribution;
        }
    }

    private static class CoGroupAtomic2
    implements CoGroupFunction<Integer, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        private CoGroupAtomic2() {
        }

        public void coGroup(Iterable<Integer> first, Iterable<Tuple3<Integer, Long, String>> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
            ArrayList<Integer> ints = new ArrayList<Integer>();
            for (Integer n : first) {
                ints.add(n);
            }
            for (Tuple3 tuple3 : second) {
                for (Integer i : ints) {
                    if (!((Integer)tuple3.f0).equals(i)) continue;
                    out.collect((Object)tuple3);
                }
            }
        }
    }

    private static class CoGroupAtomic1
    implements CoGroupFunction<Tuple3<Integer, Long, String>, Integer, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        private CoGroupAtomic1() {
        }

        public void coGroup(Iterable<Tuple3<Integer, Long, String>> first, Iterable<Integer> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
            ArrayList<Integer> ints = new ArrayList<Integer>();
            for (Integer n : second) {
                ints.add(n);
            }
            for (Tuple3 tuple3 : first) {
                for (Integer i : ints) {
                    if (!((Integer)tuple3.f0).equals(i)) continue;
                    out.collect((Object)tuple3);
                }
            }
        }
    }

    private static class Tuple5Tuple3CoGroup
    implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        private Tuple5Tuple3CoGroup() {
        }

        public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, Iterable<Tuple3<Integer, Long, String>> second, Collector<Tuple3<Integer, Long, String>> out) {
            ArrayList<Object> strs = new ArrayList<Object>();
            for (Tuple5<Integer, Long, Integer, String, Long> tuple5 : first) {
                strs.add(tuple5.f3);
            }
            for (Tuple3 tuple3 : second) {
                for (String string : strs) {
                    out.collect((Object)new Tuple3(tuple3.f0, tuple3.f1, (Object)string));
                }
            }
        }
    }

    private static class Tuple5CoGroupBC
    extends RichCoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
        private static final long serialVersionUID = 1L;
        private int broadcast = 42;

        private Tuple5CoGroupBC() {
        }

        public void open(Configuration config) {
            List ints = this.getRuntimeContext().getBroadcastVariable("ints");
            int sum = 0;
            for (Integer i : ints) {
                sum += i.intValue();
            }
            this.broadcast = sum;
        }

        public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, Iterable<Tuple5<Integer, Long, Integer, String, Long>> second, Collector<Tuple3<Integer, Integer, Integer>> out) {
            int sum = 0;
            int id = 0;
            for (Tuple5<Integer, Long, Integer, String, Long> element : first) {
                sum += ((Integer)element.f2).intValue();
                id = (Integer)element.f0;
            }
            for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
                sum += ((Integer)element.f2).intValue();
                id = (Integer)element.f0;
            }
            out.collect((Object)new Tuple3((Object)id, (Object)sum, (Object)this.broadcast));
        }
    }

    private static class Tuple5ReturnRight
    implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
        private static final long serialVersionUID = 1L;

        private Tuple5ReturnRight() {
        }

        public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, Iterable<Tuple5<Integer, Long, Integer, String, Long>> second, Collector<Tuple5<Integer, Long, Integer, String, Long>> out) {
            for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
                if ((Integer)element.f0 >= 4) continue;
                out.collect(element);
            }
        }
    }

    private static class Tuple3ReturnLeft
    implements CoGroupFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        private Tuple3ReturnLeft() {
        }

        public void coGroup(Iterable<Tuple3<Integer, Long, String>> first, Iterable<Tuple3<Integer, Long, String>> second, Collector<Tuple3<Integer, Long, String>> out) {
            for (Tuple3<Integer, Long, String> element : first) {
                if ((Integer)element.f0 >= 6) continue;
                out.collect(element);
            }
        }
    }

    private static class MixedCoGroup2
    implements CoGroupFunction<CollectionDataSets.CustomType, Tuple5<Integer, Long, Integer, String, Long>, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        private MixedCoGroup2() {
        }

        public void coGroup(Iterable<CollectionDataSets.CustomType> first, Iterable<Tuple5<Integer, Long, Integer, String, Long>> second, Collector<CollectionDataSets.CustomType> out) {
            CollectionDataSets.CustomType o = new CollectionDataSets.CustomType(0, 0L, "test");
            for (CollectionDataSets.CustomType customType : first) {
                o.myInt = customType.myInt;
                o.myLong += customType.myLong;
            }
            for (Tuple5 tuple5 : second) {
                o.myInt = (Integer)tuple5.f2;
                o.myLong += (long)((Integer)tuple5.f0).intValue();
            }
            out.collect((Object)o);
        }
    }

    private static class MixedCoGroup
    implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CollectionDataSets.CustomType, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        private MixedCoGroup() {
        }

        public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, Iterable<CollectionDataSets.CustomType> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
            long sum = 0L;
            int id = 0;
            for (Tuple5<Integer, Long, Integer, String, Long> tuple5 : first) {
                sum += (long)((Integer)tuple5.f0).intValue();
                id = (Integer)tuple5.f2;
            }
            for (CollectionDataSets.CustomType customType : second) {
                id = customType.myInt;
                sum += customType.myLong;
            }
            out.collect((Object)new Tuple3((Object)id, (Object)sum, (Object)"test"));
        }
    }

    private static class CustomTypeCoGroup
    implements CoGroupFunction<CollectionDataSets.CustomType, CollectionDataSets.CustomType, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        private CustomTypeCoGroup() {
        }

        public void coGroup(Iterable<CollectionDataSets.CustomType> first, Iterable<CollectionDataSets.CustomType> second, Collector<CollectionDataSets.CustomType> out) {
            CollectionDataSets.CustomType o = new CollectionDataSets.CustomType(0, 0L, "test");
            for (CollectionDataSets.CustomType element : first) {
                o.myInt = element.myInt;
                o.myLong += element.myLong;
            }
            for (CollectionDataSets.CustomType element : second) {
                o.myInt = element.myInt;
                o.myLong += element.myLong;
            }
            out.collect((Object)o);
        }
    }

    private static class Tuple5CoGroup
    implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;

        private Tuple5CoGroup() {
        }

        public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first, Iterable<Tuple5<Integer, Long, Integer, String, Long>> second, Collector<Tuple2<Integer, Integer>> out) {
            int sum = 0;
            int id = 0;
            for (Tuple5<Integer, Long, Integer, String, Long> element : first) {
                sum += ((Integer)element.f2).intValue();
                id = (Integer)element.f0;
            }
            for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
                sum += ((Integer)element.f2).intValue();
                id = (Integer)element.f0;
            }
            out.collect((Object)new Tuple2((Object)id, (Object)sum));
        }
    }

    private static class CoGroup2
    implements CoGroupFunction<CollectionDataSets.POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        private CoGroup2() {
        }

        public void coGroup(Iterable<CollectionDataSets.POJO> first, Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second, Collector<CollectionDataSets.CustomType> out) throws Exception {
            for (CollectionDataSets.POJO p : first) {
                for (Tuple7<Integer, String, Integer, Integer, Long, String, Long> t : second) {
                    Assert.assertTrue((p.nestedPojo.longNumber == (Long)t.f6 ? 1 : 0) != 0);
                    out.collect((Object)new CollectionDataSets.CustomType(-1, p.nestedPojo.longNumber, "Flink"));
                }
            }
        }
    }

    private static class KeySelector1
    implements KeySelector<CollectionDataSets.POJO, Long> {
        private static final long serialVersionUID = 1L;

        private KeySelector1() {
        }

        public Long getKey(CollectionDataSets.POJO value) throws Exception {
            return value.nestedPojo.longNumber;
        }
    }

    private static class CoGroup3
    implements CoGroupFunction<CollectionDataSets.POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        private CoGroup3() {
        }

        public void coGroup(Iterable<CollectionDataSets.POJO> first, Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second, Collector<CollectionDataSets.CustomType> out) throws Exception {
            for (CollectionDataSets.POJO p : first) {
                for (Tuple7<Integer, String, Integer, Integer, Long, String, Long> t : second) {
                    Assert.assertTrue((p.nestedPojo.longNumber == (Long)t.f6 ? 1 : 0) != 0);
                    out.collect((Object)new CollectionDataSets.CustomType(-1, p.nestedPojo.longNumber, "Flink"));
                }
            }
        }
    }

    private static class KeySelector6
    implements KeySelector<CollectionDataSets.POJO, Tuple1<Long>> {
        private static final long serialVersionUID = 1L;

        private KeySelector6() {
        }

        public Tuple1<Long> getKey(CollectionDataSets.POJO value) throws Exception {
            return new Tuple1((Object)value.nestedPojo.longNumber);
        }
    }

    private static class CoGroup1
    implements CoGroupFunction<CollectionDataSets.POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        private CoGroup1() {
        }

        public void coGroup(Iterable<CollectionDataSets.POJO> first, Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second, Collector<CollectionDataSets.CustomType> out) throws Exception {
            for (CollectionDataSets.POJO p : first) {
                for (Tuple7<Integer, String, Integer, Integer, Long, String, Long> t : second) {
                    Assert.assertTrue((p.nestedPojo.longNumber == (Long)t.f6 ? 1 : 0) != 0);
                    out.collect((Object)new CollectionDataSets.CustomType(-1, p.nestedPojo.longNumber, "Flink"));
                }
            }
        }
    }

    private static class KeySelector8
    implements KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
        private static final long serialVersionUID = 1L;

        private KeySelector8() {
        }

        public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) {
            return new Tuple2(t.f0, t.f1);
        }
    }

    private static class KeySelector7
    implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>> {
        private static final long serialVersionUID = 1L;

        private KeySelector7() {
        }

        public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) {
            return new Tuple2(t.f0, t.f4);
        }
    }

    private static class KeySelector3
    implements KeySelector<CollectionDataSets.CustomType, Integer> {
        private static final long serialVersionUID = 1L;

        private KeySelector3() {
        }

        public Integer getKey(CollectionDataSets.CustomType in) {
            return in.myInt;
        }
    }

    private static class KeySelector2
    implements KeySelector<CollectionDataSets.CustomType, Integer> {
        private static final long serialVersionUID = 1L;

        private KeySelector2() {
        }

        public Integer getKey(CollectionDataSets.CustomType in) {
            return in.myInt;
        }
    }

    private static class KeySelector5
    implements KeySelector<CollectionDataSets.CustomType, Integer> {
        private static final long serialVersionUID = 1L;

        private KeySelector5() {
        }

        public Integer getKey(CollectionDataSets.CustomType in) {
            return in.myInt;
        }
    }

    private static class KeySelector4
    implements KeySelector<CollectionDataSets.CustomType, Integer> {
        private static final long serialVersionUID = 1L;

        private KeySelector4() {
        }

        public Integer getKey(CollectionDataSets.CustomType in) {
            return in.myInt;
        }
    }
}

