/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.multipleinput;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.multipleinput.MultipleInputTestBase;
import org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper;
import org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapperGenerator;
import org.apache.flink.table.runtime.operators.multipleinput.UnionStreamOperator;
import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.junit.Assert;
import org.junit.Test;

public class TableOperatorWrapperGeneratorTest
extends MultipleInputTestBase {
    @Test
    public void testSimple() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Transformation<RowData> source1 = this.createSource(env, "source1");
        Transformation<RowData> source2 = this.createSource(env, "source2");
        OneInputTransformation<RowData, RowData> agg1 = this.createOneInputTransform(source1, "agg1", (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        agg1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1);
        OneInputTransformation<RowData, RowData> agg2 = this.createOneInputTransform(source2, "agg2", (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        agg2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 2);
        TwoInputTransformation<RowData, RowData, RowData> join = this.createTwoInputTransform((Transformation<RowData>)agg1, (Transformation<RowData>)agg2, "join", (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()})));
        join.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 3);
        TableOperatorWrapperGenerator generator = new TableOperatorWrapperGenerator(Arrays.asList(source1, source2), join, new int[]{1, 0});
        generator.generate();
        TableOperatorWrapper<StreamOperator<RowData>> headWrapper1 = this.createWrapper(agg1, 1, 0.16666666666666666);
        TableOperatorWrapper<StreamOperator<RowData>> headWrapper2 = this.createWrapper(agg2, 2, 0.3333333333333333);
        TableOperatorWrapper<StreamOperator<RowData>> outputWrapper = this.createWrapper(join, 0, 0.5);
        outputWrapper.addInput(headWrapper1, 1);
        outputWrapper.addInput(headWrapper2, 2);
        Assert.assertEquals(Arrays.asList(Pair.of(source1, (Object)new InputSpec(1, 1, headWrapper1, 1)), Pair.of(source2, (Object)new InputSpec(2, 0, headWrapper2, 1))), (Object)generator.getInputTransformAndInputSpecPairs());
        Assert.assertEquals(outputWrapper, (Object)generator.getTailWrapper());
        Assert.assertEquals((long)6L, (long)generator.getManagedMemoryWeight());
        Assert.assertEquals((long)10L, (long)generator.getParallelism());
        Assert.assertEquals((long)-1L, (long)generator.getMaxParallelism());
        Assert.assertEquals((Object)ResourceSpec.UNKNOWN, (Object)generator.getMinResources());
        Assert.assertEquals((Object)ResourceSpec.UNKNOWN, (Object)generator.getPreferredResources());
    }

    @Test
    public void testComplex() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Transformation<RowData> source1 = this.createSource(env, "source1");
        Transformation<RowData> source2 = this.createSource(env, "source2");
        Transformation<RowData> source3 = this.createSource(env, "source3");
        Transformation<RowData> source4 = this.createSource(env, "source4");
        Transformation<RowData> source5 = this.createSource(env, "source5");
        OneInputTransformation<RowData, RowData> agg1 = this.createOneInputTransform(source1, "agg1", (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        agg1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1);
        OneInputTransformation<RowData, RowData> agg2 = this.createOneInputTransform(source2, "agg2", (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        agg2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 2);
        TwoInputTransformation<RowData, RowData, RowData> join1 = this.createTwoInputTransform((Transformation<RowData>)agg1, (Transformation<RowData>)agg2, "join1", (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()})));
        join1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 3);
        TwoInputTransformation<RowData, RowData, RowData> join2 = this.createTwoInputTransform((Transformation<RowData>)join1, source3, "join2", (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()})));
        join2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 4);
        TwoInputTransformation<RowData, RowData, RowData> join3 = this.createTwoInputTransform(source4, source5, "join3", (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()})));
        join3.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 5);
        TwoInputTransformation<RowData, RowData, RowData> join4 = this.createTwoInputTransform((Transformation<RowData>)join2, (Transformation<RowData>)join3, "join4", (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()})));
        join4.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 6);
        TableOperatorWrapperGenerator generator = new TableOperatorWrapperGenerator(Arrays.asList(source1, source2, source3, source4, source5), join4, new int[]{2, 3, 4, 0, 1});
        generator.generate();
        TableOperatorWrapper<StreamOperator<RowData>> aggWrapper1 = this.createWrapper(agg1, 3, 0.047619047619047616);
        TableOperatorWrapper<StreamOperator<RowData>> aggWrapper2 = this.createWrapper(agg2, 4, 0.09523809523809523);
        TableOperatorWrapper<StreamOperator<RowData>> joinWrapper1 = this.createWrapper(join1, 2, 0.14285714285714285);
        joinWrapper1.addInput(aggWrapper1, 1);
        joinWrapper1.addInput(aggWrapper2, 2);
        TableOperatorWrapper<StreamOperator<RowData>> joinWrapper2 = this.createWrapper(join2, 1, 0.19047619047619047);
        joinWrapper2.addInput(joinWrapper1, 1);
        TableOperatorWrapper<StreamOperator<RowData>> joinWrapper3 = this.createWrapper(join3, 5, 0.23809523809523808);
        TableOperatorWrapper<StreamOperator<RowData>> outputWrapper = this.createWrapper(join4, 0, 0.2857142857142857);
        outputWrapper.addInput(joinWrapper2, 1);
        outputWrapper.addInput(joinWrapper3, 2);
        Assert.assertEquals(Arrays.asList(Pair.of(source1, (Object)new InputSpec(1, 2, aggWrapper1, 1)), Pair.of(source2, (Object)new InputSpec(2, 3, aggWrapper2, 1)), Pair.of(source3, (Object)new InputSpec(3, 4, joinWrapper2, 2)), Pair.of(source4, (Object)new InputSpec(4, 0, joinWrapper3, 1)), Pair.of(source5, (Object)new InputSpec(5, 1, joinWrapper3, 2))), (Object)generator.getInputTransformAndInputSpecPairs());
        Assert.assertEquals(outputWrapper, (Object)generator.getTailWrapper());
        Assert.assertEquals((long)21L, (long)generator.getManagedMemoryWeight());
        Assert.assertEquals((long)10L, (long)generator.getParallelism());
        Assert.assertEquals((long)-1L, (long)generator.getMaxParallelism());
        Assert.assertEquals((Object)ResourceSpec.UNKNOWN, (Object)generator.getMinResources());
        Assert.assertEquals((Object)ResourceSpec.UNKNOWN, (Object)generator.getPreferredResources());
    }

    @Test
    public void testWithUnion() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Transformation<RowData> source1 = this.createSource(env, "source1");
        Transformation<RowData> source2 = this.createSource(env, "source2");
        Transformation<RowData> source3 = this.createSource(env, "source3");
        Transformation<RowData> source4 = this.createSource(env, "source4");
        Transformation<RowData> source5 = this.createSource(env, "source5");
        UnionTransformation<RowData> union1 = this.createUnionInputTransform("union1", source1, source2);
        UnionTransformation<RowData> union2 = this.createUnionInputTransform("union2", new Transformation[]{union1, source3});
        OneInputTransformation<RowData, RowData> agg1 = this.createOneInputTransform(source4, "agg1", (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        agg1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1);
        TwoInputTransformation<RowData, RowData, RowData> join1 = this.createTwoInputTransform((Transformation<RowData>)agg1, (Transformation<RowData>)union2, "join1", (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        join1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 2);
        UnionTransformation<RowData> union3 = this.createUnionInputTransform("union3", new Transformation[]{source5, join1});
        TableOperatorWrapperGenerator generator = new TableOperatorWrapperGenerator(Arrays.asList(source1, source2, source3, source4, source5), union3, new int[]{1, 1, 1, 0, 2});
        generator.generate();
        TableOperatorWrapper<StreamOperator<RowData>> unionWrapper1 = this.createWrapper(union1, 4);
        TableOperatorWrapper<StreamOperator<RowData>> unionWrapper2 = this.createWrapper(union2, 3);
        unionWrapper2.addInput(unionWrapper1, 1);
        TableOperatorWrapper<StreamOperator<RowData>> aggWrapper1 = this.createWrapper(agg1, 2, 0.3333333333333333);
        TableOperatorWrapper<StreamOperator<RowData>> joinWrapper1 = this.createWrapper(join1, 1, 0.6666666666666666);
        joinWrapper1.addInput(aggWrapper1, 1);
        joinWrapper1.addInput(unionWrapper2, 2);
        TableOperatorWrapper<StreamOperator<RowData>> outputWrapper = this.createWrapper(union3, 0);
        outputWrapper.addInput(joinWrapper1, 1);
        Assert.assertEquals(Arrays.asList(Pair.of(source5, (Object)new InputSpec(1, 2, outputWrapper, 1)), Pair.of(source4, (Object)new InputSpec(2, 0, aggWrapper1, 1)), Pair.of(source1, (Object)new InputSpec(3, 1, unionWrapper1, 1)), Pair.of(source2, (Object)new InputSpec(4, 1, unionWrapper1, 1)), Pair.of(source3, (Object)new InputSpec(5, 1, unionWrapper2, 1))), (Object)generator.getInputTransformAndInputSpecPairs());
        Assert.assertEquals(outputWrapper, (Object)generator.getTailWrapper());
        Assert.assertEquals((long)3L, (long)generator.getManagedMemoryWeight());
        Assert.assertEquals((long)10L, (long)generator.getParallelism());
        Assert.assertEquals((long)-1L, (long)generator.getMaxParallelism());
        Assert.assertEquals((Object)ResourceSpec.UNKNOWN, (Object)generator.getMinResources());
        Assert.assertEquals((Object)ResourceSpec.UNKNOWN, (Object)generator.getPreferredResources());
    }

    @Test
    public void testDifferentParallelisms() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Transformation<RowData> source1 = this.createSource(env, "source1");
        Transformation<RowData> source2 = this.createSource(env, "source2");
        Transformation<RowData> source3 = this.createSource(env, "source3");
        OneInputTransformation<RowData, RowData> calc1 = this.createOneInputTransform(source1, "calc1", (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        calc1.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1);
        calc1.setParallelism(100);
        OneInputTransformation<RowData, RowData> calc2 = this.createOneInputTransform(source2, "calc2", (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        calc2.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1);
        calc2.setParallelism(50);
        UnionTransformation<RowData> union = this.createUnionInputTransform("union1", new Transformation[]{calc1, calc2});
        TwoInputTransformation<RowData, RowData, RowData> join = this.createTwoInputTransform((Transformation<RowData>)union, source3, "join1", (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        join.declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, 1);
        join.setParallelism(200);
        TableOperatorWrapperGenerator generator = new TableOperatorWrapperGenerator(Arrays.asList(source1, source2, source3), join, new int[]{1, 1, 0});
        generator.generate();
        TableOperatorWrapper<StreamOperator<RowData>> calcWrapper1 = this.createWrapper(calc1, 2, 0.3333333333333333);
        TableOperatorWrapper<StreamOperator<RowData>> calcWrapper2 = this.createWrapper(calc2, 3, 0.3333333333333333);
        TableOperatorWrapper<StreamOperator<RowData>> unionWrapper = this.createWrapper(union, 1);
        unionWrapper.addInput(calcWrapper1, 1);
        unionWrapper.addInput(calcWrapper2, 2);
        TableOperatorWrapper<StreamOperator<RowData>> outputWrapper = this.createWrapper(join, 0, 0.3333333333333333);
        outputWrapper.addInput(unionWrapper, 2);
        Assert.assertEquals(Arrays.asList(Pair.of(source1, (Object)new InputSpec(1, 1, calcWrapper1, 1)), Pair.of(source2, (Object)new InputSpec(2, 1, calcWrapper2, 1)), Pair.of(source3, (Object)new InputSpec(3, 0, outputWrapper, 2))), (Object)generator.getInputTransformAndInputSpecPairs());
        Assert.assertEquals(Arrays.asList(new TableOperatorWrapper.Edge(calcWrapper1, unionWrapper, 1), new TableOperatorWrapper.Edge(calcWrapper2, unionWrapper, 2)), (Object)unionWrapper.getInputEdges());
        Assert.assertEquals(Collections.singletonList(new TableOperatorWrapper.Edge(unionWrapper, outputWrapper, 2)), (Object)outputWrapper.getInputEdges());
        Assert.assertEquals(outputWrapper, (Object)generator.getTailWrapper());
        Assert.assertEquals((long)3L, (long)generator.getManagedMemoryWeight());
        Assert.assertEquals((long)200L, (long)generator.getParallelism());
        Assert.assertEquals((long)-1L, (long)generator.getMaxParallelism());
        Assert.assertEquals((Object)ResourceSpec.UNKNOWN, (Object)generator.getMinResources());
        Assert.assertEquals((Object)ResourceSpec.UNKNOWN, (Object)generator.getPreferredResources());
    }

    @Test(expected=RuntimeException.class)
    public void testUnsupportedTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Transformation<RowData> source1 = this.createSource(env, "source1");
        Transformation<RowData> source2 = this.createSource(env, "source2");
        TestingTransformation<RowData> test = new TestingTransformation<RowData>(source1, "test", 10);
        TwoInputTransformation<RowData, RowData, RowData> join = this.createTwoInputTransform(test, source2, "join1", (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType(), DataTypes.STRING().getLogicalType()})));
        TableOperatorWrapperGenerator generator = new TableOperatorWrapperGenerator(Arrays.asList(source1, source2), join, new int[]{0, 0});
        generator.generate();
    }

    @SafeVarargs
    private final UnionTransformation<RowData> createUnionInputTransform(String name, Transformation<RowData> ... input) {
        UnionTransformation transform = new UnionTransformation(Arrays.asList(input));
        transform.setName(name);
        return transform;
    }

    private TableOperatorWrapper<StreamOperator<RowData>> createWrapper(OneInputTransformation<RowData, RowData> transform, int index, double managedMemoryFraction) {
        TableOperatorWrapper wrapper = new TableOperatorWrapper(transform.getOperatorFactory(), "SubOp" + index + "_" + transform.getName(), Collections.singletonList(transform.getInputType()), transform.getOutputType());
        wrapper.setManagedMemoryFraction(managedMemoryFraction);
        return wrapper;
    }

    private TableOperatorWrapper<StreamOperator<RowData>> createWrapper(TwoInputTransformation<RowData, RowData, RowData> transform, int index, double managedMemoryFraction) {
        TableOperatorWrapper wrapper = new TableOperatorWrapper(transform.getOperatorFactory(), "SubOp" + index + "_" + transform.getName(), Arrays.asList(transform.getInputType1(), transform.getInputType2()), transform.getOutputType());
        wrapper.setManagedMemoryFraction(managedMemoryFraction);
        return wrapper;
    }

    private TableOperatorWrapper<StreamOperator<RowData>> createWrapper(UnionTransformation<RowData> transform, int index) {
        TableOperatorWrapper wrapper = new TableOperatorWrapper((StreamOperatorFactory)SimpleOperatorFactory.of((StreamOperator)new UnionStreamOperator()), "SubOp" + index + "_" + transform.getName(), transform.getInputs().stream().map(Transformation::getOutputType).collect(Collectors.toList()), transform.getOutputType());
        wrapper.setManagedMemoryFraction(0.0);
        return wrapper;
    }

    private static class TestingTransformation<T>
    extends Transformation<T> {
        private final Transformation<T> input;

        public TestingTransformation(Transformation<T> input, String name, int parallelism) {
            super(name, input.getOutputType(), parallelism);
            this.input = input;
        }

        public List<Transformation<?>> getTransitivePredecessors() {
            return Collections.emptyList();
        }

        public List<Transformation<?>> getInputs() {
            return Collections.singletonList(this.input);
        }
    }
}

