package org.apache.flink.test.operators;

import java.io.IOException;
import java.io.NotSerializableException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase.class */
public class CoGroupITCase extends MultipleProgramsTestBase {

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$CoGroup1.class */
    private static class CoGroup1 implements CoGroupFunction<CollectionDataSets.POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1;

        private CoGroup1() {
        }

        public void coGroup(Iterable<CollectionDataSets.POJO> iterable, Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> iterable2, Collector<CollectionDataSets.CustomType> collector) throws Exception {
            for (CollectionDataSets.POJO pojo : iterable) {
                Iterator<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> it = iterable2.iterator();
                while (it.hasNext()) {
                    Assert.assertTrue(pojo.nestedPojo.longNumber == ((Long) it.next().f6).longValue());
                    collector.collect(new CollectionDataSets.CustomType(-1, pojo.nestedPojo.longNumber, "Flink"));
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$CoGroup2.class */
    private static class CoGroup2 implements CoGroupFunction<CollectionDataSets.POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1;

        private CoGroup2() {
        }

        public void coGroup(Iterable<CollectionDataSets.POJO> iterable, Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> iterable2, Collector<CollectionDataSets.CustomType> collector) throws Exception {
            for (CollectionDataSets.POJO pojo : iterable) {
                Iterator<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> it = iterable2.iterator();
                while (it.hasNext()) {
                    Assert.assertTrue(pojo.nestedPojo.longNumber == ((Long) it.next().f6).longValue());
                    collector.collect(new CollectionDataSets.CustomType(-1, pojo.nestedPojo.longNumber, "Flink"));
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$CoGroup3.class */
    private static class CoGroup3 implements CoGroupFunction<CollectionDataSets.POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1;

        private CoGroup3() {
        }

        public void coGroup(Iterable<CollectionDataSets.POJO> iterable, Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> iterable2, Collector<CollectionDataSets.CustomType> collector) throws Exception {
            for (CollectionDataSets.POJO pojo : iterable) {
                Iterator<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> it = iterable2.iterator();
                while (it.hasNext()) {
                    Assert.assertTrue(pojo.nestedPojo.longNumber == ((Long) it.next().f6).longValue());
                    collector.collect(new CollectionDataSets.CustomType(-1, pojo.nestedPojo.longNumber, "Flink"));
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$CoGroupAtomic1.class */
    private static class CoGroupAtomic1 implements CoGroupFunction<Tuple3<Integer, Long, String>, Integer, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1;

        private CoGroupAtomic1() {
        }

        public void coGroup(Iterable<Tuple3<Integer, Long, String>> iterable, Iterable<Integer> iterable2, Collector<Tuple3<Integer, Long, String>> collector) throws Exception {
            ArrayList arrayList = new ArrayList();
            Iterator<Integer> it = iterable2.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
            }
            for (Tuple3<Integer, Long, String> tuple3 : iterable) {
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    if (((Integer) tuple3.f0).equals((Integer) it2.next())) {
                        collector.collect(tuple3);
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$CoGroupAtomic2.class */
    private static class CoGroupAtomic2 implements CoGroupFunction<Integer, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1;

        private CoGroupAtomic2() {
        }

        public void coGroup(Iterable<Integer> iterable, Iterable<Tuple3<Integer, Long, String>> iterable2, Collector<Tuple3<Integer, Long, String>> collector) throws Exception {
            ArrayList arrayList = new ArrayList();
            Iterator<Integer> it = iterable.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
            }
            for (Tuple3<Integer, Long, String> tuple3 : iterable2) {
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    if (((Integer) tuple3.f0).equals((Integer) it2.next())) {
                        collector.collect(tuple3);
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$CustomTypeCoGroup.class */
    private static class CustomTypeCoGroup implements CoGroupFunction<CollectionDataSets.CustomType, CollectionDataSets.CustomType, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1;

        private CustomTypeCoGroup() {
        }

        public void coGroup(Iterable<CollectionDataSets.CustomType> iterable, Iterable<CollectionDataSets.CustomType> iterable2, Collector<CollectionDataSets.CustomType> collector) {
            CollectionDataSets.CustomType customType = new CollectionDataSets.CustomType(0, 0L, "test");
            for (CollectionDataSets.CustomType customType2 : iterable) {
                customType.myInt = customType2.myInt;
                customType.myLong += customType2.myLong;
            }
            for (CollectionDataSets.CustomType customType3 : iterable2) {
                customType.myInt = customType3.myInt;
                customType.myLong += customType3.myLong;
            }
            collector.collect(customType);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$KeySelector1.class */
    private static class KeySelector1 implements KeySelector<CollectionDataSets.POJO, Long> {
        private static final long serialVersionUID = 1;

        private KeySelector1() {
        }

        public Long getKey(CollectionDataSets.POJO pojo) throws Exception {
            return Long.valueOf(pojo.nestedPojo.longNumber);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$KeySelector2.class */
    private static class KeySelector2 implements KeySelector<CollectionDataSets.CustomType, Integer> {
        private static final long serialVersionUID = 1;

        private KeySelector2() {
        }

        public Integer getKey(CollectionDataSets.CustomType customType) {
            return Integer.valueOf(customType.myInt);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$KeySelector3.class */
    private static class KeySelector3 implements KeySelector<CollectionDataSets.CustomType, Integer> {
        private static final long serialVersionUID = 1;

        private KeySelector3() {
        }

        public Integer getKey(CollectionDataSets.CustomType customType) {
            return Integer.valueOf(customType.myInt);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$KeySelector4.class */
    private static class KeySelector4 implements KeySelector<CollectionDataSets.CustomType, Integer> {
        private static final long serialVersionUID = 1;

        private KeySelector4() {
        }

        public Integer getKey(CollectionDataSets.CustomType customType) {
            return Integer.valueOf(customType.myInt);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$KeySelector5.class */
    private static class KeySelector5 implements KeySelector<CollectionDataSets.CustomType, Integer> {
        private static final long serialVersionUID = 1;

        private KeySelector5() {
        }

        public Integer getKey(CollectionDataSets.CustomType customType) {
            return Integer.valueOf(customType.myInt);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$KeySelector6.class */
    private static class KeySelector6 implements KeySelector<CollectionDataSets.POJO, Tuple1<Long>> {
        private static final long serialVersionUID = 1;

        private KeySelector6() {
        }

        public Tuple1<Long> getKey(CollectionDataSets.POJO pojo) throws Exception {
            return new Tuple1<>(Long.valueOf(pojo.nestedPojo.longNumber));
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$KeySelector7.class */
    private static class KeySelector7 implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>> {
        private static final long serialVersionUID = 1;

        private KeySelector7() {
        }

        public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> tuple5) {
            return new Tuple2<>(tuple5.f0, tuple5.f4);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$KeySelector8.class */
    private static class KeySelector8 implements KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
        private static final long serialVersionUID = 1;

        private KeySelector8() {
        }

        public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> tuple3) {
            return new Tuple2<>(tuple3.f0, tuple3.f1);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$MixedCoGroup.class */
    private static class MixedCoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CollectionDataSets.CustomType, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1;

        private MixedCoGroup() {
        }

        public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> iterable, Iterable<CollectionDataSets.CustomType> iterable2, Collector<Tuple3<Integer, Long, String>> collector) throws Exception {
            long j = 0;
            int i = 0;
            Iterator<Tuple5<Integer, Long, Integer, String, Long>> it = iterable.iterator();
            while (it.hasNext()) {
                j += ((Integer) r0.f0).intValue();
                i = ((Integer) it.next().f2).intValue();
            }
            for (CollectionDataSets.CustomType customType : iterable2) {
                i = customType.myInt;
                j += customType.myLong;
            }
            collector.collect(new Tuple3(Integer.valueOf(i), Long.valueOf(j), "test"));
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$MixedCoGroup2.class */
    private static class MixedCoGroup2 implements CoGroupFunction<CollectionDataSets.CustomType, Tuple5<Integer, Long, Integer, String, Long>, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1;

        private MixedCoGroup2() {
        }

        public void coGroup(Iterable<CollectionDataSets.CustomType> iterable, Iterable<Tuple5<Integer, Long, Integer, String, Long>> iterable2, Collector<CollectionDataSets.CustomType> collector) {
            CollectionDataSets.CustomType customType = new CollectionDataSets.CustomType(0, 0L, "test");
            for (CollectionDataSets.CustomType customType2 : iterable) {
                customType.myInt = customType2.myInt;
                customType.myLong += customType2.myLong;
            }
            Iterator<Tuple5<Integer, Long, Integer, String, Long>> it = iterable2.iterator();
            while (it.hasNext()) {
                customType.myInt = ((Integer) it.next().f2).intValue();
                customType.myLong += ((Integer) r0.f0).intValue();
            }
            collector.collect(customType);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$TestDistribution.class */
    public static class TestDistribution implements DataDistribution {
        public Object[][] boundaries = {new Object[]{2, 2L}, new Object[]{5, 4L}, new Object[]{10, 12L}, new Object[]{21, 6L}};

        public Object[] getBucketBoundary(int i, int i2) {
            return this.boundaries[i];
        }

        public int getNumberOfFields() {
            return 2;
        }

        public TypeInformation[] getKeyTypes() {
            return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO};
        }

        public void write(DataOutputView dataOutputView) throws IOException {
        }

        public void read(DataInputView dataInputView) throws IOException {
        }

        public boolean equals(Object obj) {
            return obj instanceof TestDistribution;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$Tuple3ReturnLeft.class */
    private static class Tuple3ReturnLeft implements CoGroupFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1;

        private Tuple3ReturnLeft() {
        }

        public void coGroup(Iterable<Tuple3<Integer, Long, String>> iterable, Iterable<Tuple3<Integer, Long, String>> iterable2, Collector<Tuple3<Integer, Long, String>> collector) {
            for (Tuple3<Integer, Long, String> tuple3 : iterable) {
                if (((Integer) tuple3.f0).intValue() < 6) {
                    collector.collect(tuple3);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$Tuple5CoGroup.class */
    private static class Tuple5CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1;

        private Tuple5CoGroup() {
        }

        public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> iterable, Iterable<Tuple5<Integer, Long, Integer, String, Long>> iterable2, Collector<Tuple2<Integer, Integer>> collector) {
            int i = 0;
            int i2 = 0;
            for (Tuple5<Integer, Long, Integer, String, Long> tuple5 : iterable) {
                i += ((Integer) tuple5.f2).intValue();
                i2 = ((Integer) tuple5.f0).intValue();
            }
            for (Tuple5<Integer, Long, Integer, String, Long> tuple52 : iterable2) {
                i += ((Integer) tuple52.f2).intValue();
                i2 = ((Integer) tuple52.f0).intValue();
            }
            collector.collect(new Tuple2(Integer.valueOf(i2), Integer.valueOf(i)));
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$Tuple5CoGroupBC.class */
    private static class Tuple5CoGroupBC extends RichCoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
        private static final long serialVersionUID = 1;
        private int broadcast;

        private Tuple5CoGroupBC() {
            this.broadcast = 42;
        }

        public void open(Configuration configuration) {
            int i = 0;
            Iterator it = getRuntimeContext().getBroadcastVariable("ints").iterator();
            while (it.hasNext()) {
                i += ((Integer) it.next()).intValue();
            }
            this.broadcast = i;
        }

        public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> iterable, Iterable<Tuple5<Integer, Long, Integer, String, Long>> iterable2, Collector<Tuple3<Integer, Integer, Integer>> collector) {
            int i = 0;
            int i2 = 0;
            for (Tuple5<Integer, Long, Integer, String, Long> tuple5 : iterable) {
                i += ((Integer) tuple5.f2).intValue();
                i2 = ((Integer) tuple5.f0).intValue();
            }
            for (Tuple5<Integer, Long, Integer, String, Long> tuple52 : iterable2) {
                i += ((Integer) tuple52.f2).intValue();
                i2 = ((Integer) tuple52.f0).intValue();
            }
            collector.collect(new Tuple3(Integer.valueOf(i2), Integer.valueOf(i), Integer.valueOf(this.broadcast)));
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$Tuple5ReturnRight.class */
    private static class Tuple5ReturnRight implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
        private static final long serialVersionUID = 1;

        private Tuple5ReturnRight() {
        }

        public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> iterable, Iterable<Tuple5<Integer, Long, Integer, String, Long>> iterable2, Collector<Tuple5<Integer, Long, Integer, String, Long>> collector) {
            for (Tuple5<Integer, Long, Integer, String, Long> tuple5 : iterable2) {
                if (((Integer) tuple5.f0).intValue() < 4) {
                    collector.collect(tuple5);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CoGroupITCase$Tuple5Tuple3CoGroup.class */
    private static class Tuple5Tuple3CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1;

        private Tuple5Tuple3CoGroup() {
        }

        public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> iterable, Iterable<Tuple3<Integer, Long, String>> iterable2, Collector<Tuple3<Integer, Long, String>> collector) {
            ArrayList arrayList = new ArrayList();
            Iterator<Tuple5<Integer, Long, Integer, String, Long>> it = iterable.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().f3);
            }
            for (Tuple3<Integer, Long, String> tuple3 : iterable2) {
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    collector.collect(new Tuple3(tuple3.f0, tuple3.f1, (String) it2.next()));
                }
            }
        }
    }

    public CoGroupITCase(MultipleProgramsTestBase.TestExecutionMode testExecutionMode) {
        super(testExecutionMode);
    }

    @Test
    public void testCoGroupTuplesWithKeyFieldSelector() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsTuples(CollectionDataSets.get5TupleDataSet(executionEnvironment).coGroup(CollectionDataSets.get5TupleDataSet(executionEnvironment)).where(new int[]{0}).equalTo(new int[]{0}).with(new Tuple5CoGroup()).collect(), "1,0\n2,6\n3,24\n4,60\n5,120\n");
    }

    @Test
    public void testCoGroupOnTwoCustomTypeInputsWithKeyExtractors() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsText(CollectionDataSets.getCustomTypeDataSet(executionEnvironment).coGroup(CollectionDataSets.getCustomTypeDataSet(executionEnvironment)).where(new KeySelector4()).equalTo(new KeySelector5()).with(new CustomTypeCoGroup()).collect(), "1,0,test\n2,6,test\n3,24,test\n4,60,test\n5,120,test\n6,210,test\n");
    }

    @Test
    public void testCorrectnessOfCoGroupIfUDFReturnsLeftInputObjects() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsTuples(CollectionDataSets.get3TupleDataSet(executionEnvironment).coGroup(CollectionDataSets.get3TupleDataSet(executionEnvironment)).where(new int[]{0}).equalTo(new int[]{0}).with(new Tuple3ReturnLeft()).collect(), "1,1,Hi\n2,2,Hello\n3,2,Hello world\n4,3,Hello world, how are you?\n5,3,I am fine.\n");
    }

    @Test
    public void testCorrectnessOfCoGroupIfUDFReturnsRightInputObjects() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsTuples(CollectionDataSets.get5TupleDataSet(executionEnvironment).coGroup(CollectionDataSets.get5TupleDataSet(executionEnvironment)).where(new int[]{0}).equalTo(new int[]{0}).with(new Tuple5ReturnRight()).collect(), "1,1,0,Hallo,1\n2,2,1,Hallo Welt,2\n2,3,2,Hallo Welt wie,1\n3,4,3,Hallo Welt wie gehts?,2\n3,5,4,ABC,2\n3,6,5,BCD,3\n");
    }

    @Test
    public void testCoGroupWithBroadcastSet() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsTuples(CollectionDataSets.get5TupleDataSet(executionEnvironment).coGroup(CollectionDataSets.get5TupleDataSet(executionEnvironment)).where(new int[]{0}).equalTo(new int[]{0}).with(new Tuple5CoGroupBC()).withBroadcastSet(CollectionDataSets.getIntegerDataSet(executionEnvironment), "ints").collect(), "1,0,55\n2,6,55\n3,24,55\n4,60,55\n5,120,55\n");
    }

    @Test
    public void testCoGroupOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsTuples(CollectionDataSets.get5TupleDataSet(executionEnvironment).coGroup(CollectionDataSets.getCustomTypeDataSet(executionEnvironment)).where(new int[]{2}).equalTo(new KeySelector2()).with(new MixedCoGroup()).collect(), "0,1,test\n1,2,test\n2,5,test\n3,15,test\n4,33,test\n5,63,test\n6,109,test\n7,4,test\n8,4,test\n9,4,test\n10,5,test\n11,5,test\n12,5,test\n13,5,test\n14,5,test\n");
    }

    @Test
    public void testCoGroupOnACustomTypeWithKeyExtractorAndATupleInputWithKeyFieldSelector() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsText(CollectionDataSets.getCustomTypeDataSet(executionEnvironment).coGroup(CollectionDataSets.get5TupleDataSet(executionEnvironment)).where(new KeySelector3()).equalTo(new int[]{2}).with(new MixedCoGroup2()).collect(), "0,1,test\n1,2,test\n2,5,test\n3,15,test\n4,33,test\n5,63,test\n6,109,test\n7,4,test\n8,4,test\n9,4,test\n10,5,test\n11,5,test\n12,5,test\n13,5,test\n14,5,test\n");
    }

    @Test
    public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsTuples(CollectionDataSets.get5TupleDataSet(executionEnvironment).coGroup(CollectionDataSets.get3TupleDataSet(executionEnvironment)).where(new int[]{0, 4}).equalTo(new int[]{0, 1}).with(new Tuple5Tuple3CoGroup()).collect(), "1,1,Hallo\n2,2,Hallo Welt\n3,2,Hallo Welt wie gehts?\n3,2,ABC\n5,3,HIJ\n5,3,IJK\n");
    }

    @Test
    public void testCoGroupWithMultipleKeyFieldsWithStaticClassKeyExtractor() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsTuples(CollectionDataSets.get5TupleDataSet(executionEnvironment).coGroup(CollectionDataSets.get3TupleDataSet(executionEnvironment)).where(new KeySelector7()).equalTo(new KeySelector8()).with(new Tuple5Tuple3CoGroup()).collect(), "1,1,Hallo\n2,2,Hallo Welt\n3,2,Hallo Welt wie gehts?\n3,2,ABC\n5,3,HIJ\n5,3,IJK\n");
    }

    @Test
    public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithClosureCleaner() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsTuples(CollectionDataSets.get5TupleDataSet(executionEnvironment).coGroup(CollectionDataSets.get3TupleDataSet(executionEnvironment)).where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>>() { // from class: org.apache.flink.test.operators.CoGroupITCase.3
            public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> tuple5) throws Exception {
                return new Tuple2<>(tuple5.f0, tuple5.f4);
            }
        }).equalTo(new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>() { // from class: org.apache.flink.test.operators.CoGroupITCase.2
            public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> tuple3) {
                return new Tuple2<>(tuple3.f0, tuple3.f1);
            }
        }).with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() { // from class: org.apache.flink.test.operators.CoGroupITCase.1
            public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> iterable, Iterable<Tuple3<Integer, Long, String>> iterable2, Collector<Tuple3<Integer, Long, String>> collector) {
                ArrayList arrayList = new ArrayList();
                Iterator<Tuple5<Integer, Long, Integer, String, Long>> it = iterable.iterator();
                while (it.hasNext()) {
                    arrayList.add(it.next().f3);
                }
                for (Tuple3<Integer, Long, String> tuple3 : iterable2) {
                    Iterator it2 = arrayList.iterator();
                    while (it2.hasNext()) {
                        collector.collect(new Tuple3(tuple3.f0, tuple3.f1, (String) it2.next()));
                    }
                }
            }
        }).collect(), "1,1,Hallo\n2,2,Hallo Welt\n3,2,Hallo Welt wie gehts?\n3,2,ABC\n5,3,HIJ\n5,3,IJK\n");
    }

    @Test
    public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithoutClosureCleaner() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableClosureCleaner();
        boolean z = false;
        try {
            CollectionDataSets.get5TupleDataSet(executionEnvironment).coGroup(CollectionDataSets.get3TupleDataSet(executionEnvironment)).where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>>() { // from class: org.apache.flink.test.operators.CoGroupITCase.6
                public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> tuple5) throws Exception {
                    return new Tuple2<>(tuple5.f0, tuple5.f4);
                }
            }).equalTo(new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>() { // from class: org.apache.flink.test.operators.CoGroupITCase.5
                public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> tuple3) {
                    return new Tuple2<>(tuple3.f0, tuple3.f1);
                }
            }).with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() { // from class: org.apache.flink.test.operators.CoGroupITCase.4
                public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> iterable, Iterable<Tuple3<Integer, Long, String>> iterable2, Collector<Tuple3<Integer, Long, String>> collector) {
                    ArrayList arrayList = new ArrayList();
                    Iterator<Tuple5<Integer, Long, Integer, String, Long>> it = iterable.iterator();
                    while (it.hasNext()) {
                        arrayList.add(it.next().f3);
                    }
                    for (Tuple3<Integer, Long, String> tuple3 : iterable2) {
                        Iterator it2 = arrayList.iterator();
                        while (it2.hasNext()) {
                            collector.collect(new Tuple3(tuple3.f0, tuple3.f1, (String) it2.next()));
                        }
                    }
                }
            });
        } catch (InvalidProgramException e) {
            z = e.getCause() instanceof NotSerializableException;
        }
        Assert.assertTrue(z);
    }

    @Test
    public void testCoGroupTwoCustomTypeInputsWithExpressionKeys() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsText(CollectionDataSets.getCustomTypeDataSet(executionEnvironment).coGroup(CollectionDataSets.getCustomTypeDataSet(executionEnvironment)).where(new String[]{"myInt"}).equalTo(new String[]{"myInt"}).with(new CustomTypeCoGroup()).collect(), "1,0,test\n2,6,test\n3,24,test\n4,60,test\n5,120,test\n6,210,test\n");
    }

    @Test
    public void testCoGroupOnTwoCustomTypeInputsWithExpressionKeyAndFieldSelector() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsText(CollectionDataSets.getSmallPojoDataSet(executionEnvironment).coGroup(CollectionDataSets.getSmallTuplebasedDataSet(executionEnvironment)).where(new String[]{"nestedPojo.longNumber"}).equalTo(new int[]{6}).with(new CoGroup1()).collect(), "-1,20000,Flink\n-1,10000,Flink\n-1,30000,Flink\n");
    }

    @Test
    public void testCoGroupFieldSelectorAndComplicatedKeySelector() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsText(CollectionDataSets.getSmallPojoDataSet(executionEnvironment).coGroup(CollectionDataSets.getSmallTuplebasedDataSet(executionEnvironment)).where(new KeySelector6()).equalTo(new int[]{6}).with(new CoGroup3()).collect(), "-1,20000,Flink\n-1,10000,Flink\n-1,30000,Flink\n");
    }

    @Test
    public void testCoGroupFieldSelectorAndKeySelector() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsText(CollectionDataSets.getSmallPojoDataSet(executionEnvironment).coGroup(CollectionDataSets.getSmallTuplebasedDataSet(executionEnvironment)).where(new KeySelector1()).equalTo(new int[]{6}).with(new CoGroup2()).collect(), "-1,20000,Flink\n-1,10000,Flink\n-1,30000,Flink\n");
    }

    @Test
    public void testCoGroupWithAtomicType1() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsText(CollectionDataSets.getSmall3TupleDataSet(executionEnvironment).coGroup(executionEnvironment.fromElements(new Integer[]{0, 1, 2})).where(new int[]{0}).equalTo(new String[]{"*"}).with(new CoGroupAtomic1()).collect(), "(1,1,Hi)\n(2,2,Hello)");
    }

    @Test
    public void testCoGroupWithAtomicType2() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsText(executionEnvironment.fromElements(new Integer[]{0, 1, 2}).coGroup(CollectionDataSets.getSmall3TupleDataSet(executionEnvironment)).where(new String[]{"*"}).equalTo(new int[]{0}).with(new CoGroupAtomic2()).collect(), "(1,1,Hi)\n(2,2,Hello)");
    }

    @Test
    public void testCoGroupWithRangePartitioning() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> dataSet = CollectionDataSets.get5TupleDataSet(executionEnvironment);
        DataSet<Tuple3<Integer, Long, String>> dataSet2 = CollectionDataSets.get3TupleDataSet(executionEnvironment);
        executionEnvironment.setParallelism(4);
        TestDistribution testDistribution = new TestDistribution();
        TestBaseUtils.compareResultAsTuples(DataSetUtils.partitionByRange(dataSet, testDistribution, new int[]{0, 4}).coGroup(DataSetUtils.partitionByRange(dataSet2, testDistribution, new int[]{0, 1})).where(new int[]{0, 4}).equalTo(new int[]{0, 1}).with(new Tuple5Tuple3CoGroup()).collect(), "1,1,Hallo\n2,2,Hallo Welt\n3,2,Hallo Welt wie gehts?\n3,2,ABC\n5,3,HIJ\n5,3,IJK\n");
    }

    @Test
    public void testCoGroupLambda() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        TestBaseUtils.compareResultAsText(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(1, "hello"), new Tuple2(2, "what's"), new Tuple2(2, "up")}).coGroup(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(1, "not"), new Tuple2(1, "much"), new Tuple2(2, "really")})).where(new int[]{0}).equalTo(new int[]{0}).with((iterable, iterable2, collector) -> {
            int i = 0;
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                i += ((Integer) ((Tuple2) it.next()).f0).intValue();
            }
            Iterator it2 = iterable2.iterator();
            while (it2.hasNext()) {
                i += ((Integer) ((Tuple2) it2.next()).f0).intValue();
            }
            collector.collect(Integer.valueOf(i));
        }).returns(Integer.class).collect(), "6\n3\n");
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 48317781:
                if (implMethodName.equals("lambda$testCoGroupLambda$c199a89c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/CoGroupFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("coGroup") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Iterable;Ljava/lang/Iterable;Lorg/apache/flink/util/Collector;)V") && serializedLambda.getImplClass().equals("org/apache/flink/test/operators/CoGroupITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;Ljava/lang/Iterable;Lorg/apache/flink/util/Collector;)V")) {
                    return (iterable, iterable2, collector) -> {
                        int i = 0;
                        Iterator it = iterable.iterator();
                        while (it.hasNext()) {
                            i += ((Integer) ((Tuple2) it.next()).f0).intValue();
                        }
                        Iterator it2 = iterable2.iterator();
                        while (it2.hasNext()) {
                            i += ((Integer) ((Tuple2) it2.next()).f0).intValue();
                        }
                        collector.collect(Integer.valueOf(i));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
