package org.apache.flink.table.planner.plan.nodes.exec;

import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.WithBoundedness;
import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.PlanReference;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.CompiledPlanUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.utils.JsonTestUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

@Execution(ExecutionMode.CONCURRENT)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.class */
class TransformationsTest {
    TransformationsTest() {
    }

    @Test
    public void testLegacyBatchSource() {
        StreamTableEnvironment create = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment(), EnvironmentSettings.newInstance().inBatchMode().build());
        LegacySourceTransformation<?> legacySourceTransformation = toLegacySourceTransformation(create, create.from(TableDescriptor.forConnector(TestValuesTableFactory.IDENTIFIER).option("bounded", "true").schema(dummySchema()).build()));
        assertBoundedness(Boundedness.BOUNDED, legacySourceTransformation);
        Assertions.assertThat(legacySourceTransformation.getOperator().emitsProgressiveWatermarks()).isFalse();
        Assertions.assertThat(legacySourceTransformation.getLineageVertex()).isNotNull();
        Assertions.assertThat(legacySourceTransformation.getLineageVertex().boundedness()).isEqualTo(Boundedness.BOUNDED);
        List datasets = legacySourceTransformation.getLineageVertex().datasets();
        Assertions.assertThat(datasets.size()).isEqualTo(1);
        Assertions.assertThat(((LineageDataset) datasets.get(0)).name()).contains(new CharSequence[]{"*anonymous_values$"});
        Assertions.assertThat(((LineageDataset) datasets.get(0)).namespace()).isEqualTo("values://FromElementsFunction");
    }

    @Test
    public void testLegacyStreamSource() {
        StreamTableEnvironment create = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment(), EnvironmentSettings.newInstance().inStreamingMode().build());
        LegacySourceTransformation<?> legacySourceTransformation = toLegacySourceTransformation(create, create.from(TableDescriptor.forConnector(TestValuesTableFactory.IDENTIFIER).option("bounded", "false").schema(dummySchema()).build()));
        assertBoundedness(Boundedness.CONTINUOUS_UNBOUNDED, legacySourceTransformation);
        Assertions.assertThat(legacySourceTransformation.getOperator().emitsProgressiveWatermarks()).isTrue();
        Assertions.assertThat(legacySourceTransformation.getLineageVertex()).isNotNull();
        Assertions.assertThat(legacySourceTransformation.getLineageVertex().boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
        List datasets = legacySourceTransformation.getLineageVertex().datasets();
        Assertions.assertThat(datasets.size()).isEqualTo(1);
        Assertions.assertThat(((LineageDataset) datasets.get(0)).name()).contains(new CharSequence[]{"*anonymous_values$"});
        Assertions.assertThat(((LineageDataset) datasets.get(0)).namespace()).isEqualTo("values://FromElementsFunction");
    }

    @Test
    public void testLegacyBatchValues() {
        StreamTableEnvironment create = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment(), EnvironmentSettings.newInstance().inBatchMode().build());
        assertBoundedness(Boundedness.BOUNDED, toLegacySourceTransformation(create, create.fromValues(new Object[]{1, 2, 3})));
    }

    @Test
    public void testUidGeneration() {
        checkUids(tableConfig -> {
            tableConfig.set(ExecutionConfigOptions.TABLE_EXEC_UID_GENERATION, ExecutionConfigOptions.UidGeneration.PLAN_ONLY);
        }, true, false);
        checkUids(tableConfig2 -> {
            tableConfig2.set(ExecutionConfigOptions.TABLE_EXEC_UID_GENERATION, ExecutionConfigOptions.UidGeneration.ALWAYS);
        }, true, true);
        checkUids(tableConfig3 -> {
            tableConfig3.set(ExecutionConfigOptions.TABLE_EXEC_UID_GENERATION, ExecutionConfigOptions.UidGeneration.DISABLED);
        }, false, false);
    }

    private static void checkUids(Consumer<TableConfig> consumer, boolean z, boolean z2) {
        StreamTableEnvironment create = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment(), EnvironmentSettings.newInstance().inStreamingMode().build());
        consumer.accept(create.getConfig());
        create.createTemporaryTable("source_table", TableDescriptor.forConnector(TestValuesTableFactory.IDENTIFIER).option("bounded", "true").schema(dummySchema()).build());
        create.createTemporaryTable("sink_table", TableDescriptor.forConnector(TestValuesTableFactory.IDENTIFIER).schema(dummySchema()).build());
        Table select = create.from("source_table").select(new Expression[]{(Expression) Expressions.$("i").abs()});
        List list = (List) CompiledPlanUtils.toTransformations(create, select.insertInto("sink_table").compilePlan()).get(0).getTransitivePredecessors().stream().map((v0) -> {
            return v0.getUid();
        }).collect(Collectors.toList());
        Assertions.assertThat(list).hasSize(3);
        if (z) {
            Assertions.assertThat(list).allSatisfy(str -> {
                Assertions.assertThat(str).isNotNull();
            });
        } else {
            Assertions.assertThat(list).allSatisfy(str2 -> {
                Assertions.assertThat(str2).isNull();
            });
        }
        List list2 = (List) CompiledPlanUtils.toTransformations(create, create.loadPlan(PlanReference.fromJsonString(select.insertInto("sink_table").compilePlan().asJsonString()))).get(0).getTransitivePredecessors().stream().map((v0) -> {
            return v0.getUid();
        }).collect(Collectors.toList());
        Assertions.assertThat(list2).hasSize(3);
        if (z) {
            Assertions.assertThat(list2).allSatisfy(str3 -> {
                Assertions.assertThat(str3).isNotNull();
            });
        } else {
            Assertions.assertThat(list2).allSatisfy(str4 -> {
                Assertions.assertThat(str4).isNull();
            });
        }
        List list3 = (List) create.toChangelogStream(select).getTransformation().getTransitivePredecessors().stream().map((v0) -> {
            return v0.getUid();
        }).collect(Collectors.toList());
        Assertions.assertThat(list3).hasSize(3);
        if (z2) {
            Assertions.assertThat(list3).allSatisfy(str5 -> {
                Assertions.assertThat(str5).isNotNull();
            });
        } else {
            Assertions.assertThat(list3).allSatisfy(str6 -> {
                Assertions.assertThat(str6).isNull();
            });
        }
    }

    @Test
    public void testUidDefaults() throws IOException {
        checkUidModification(tableConfig -> {
        }, jsonNode -> {
        }, tableEnvironment -> {
            return planFromCurrentFlinkVersion(tableEnvironment).asJsonString();
        }, "\\d+_sink", "\\d+_constraint-validator", "\\d+_values");
    }

    @Test
    public void testUidFlink1_15() throws IOException {
        checkUidModification(tableConfig -> {
            tableConfig.set(ExecutionConfigOptions.TABLE_EXEC_UID_FORMAT, "<id>_<type>_<version>_<transformation>");
        }, jsonNode -> {
        }, tableEnvironment -> {
            return planFromFlink1_15(tableEnvironment).asJsonString();
        }, "\\d+_stream-exec-sink_1_sink", "\\d+_stream-exec-sink_1_constraint-validator", "\\d+_stream-exec-values_1_values");
    }

    @Test
    public void testUidFlink1_18() throws IOException {
        checkUidModification(tableConfig -> {
            tableConfig.set(ExecutionConfigOptions.TABLE_EXEC_UID_FORMAT, "<id>_<type>_<version>_<transformation>");
        }, jsonNode -> {
        }, tableEnvironment -> {
            return planFromCurrentFlinkVersion(tableEnvironment).asJsonString();
        }, "\\d+_stream-exec-sink_1_sink", "\\d+_stream-exec-sink_1_constraint-validator", "\\d+_stream-exec-values_1_values");
    }

    @Test
    public void testPerNodeCustomUid() throws IOException {
        checkUidModification(tableConfig -> {
        }, jsonNode -> {
            JsonTestUtils.setExecNodeConfig(jsonNode, "stream-exec-sink_1", ExecutionConfigOptions.TABLE_EXEC_UID_FORMAT.key(), "my_custom_<transformation>_<id>");
        }, tableEnvironment -> {
            return planFromCurrentFlinkVersion(tableEnvironment).asJsonString();
        }, "my_custom_sink_\\d+", "my_custom_constraint-validator_\\d+", "\\d+_values");
    }

    private static void checkUidModification(Consumer<TableConfig> consumer, Consumer<JsonNode> consumer2, Function<TableEnvironment, String> function, String... strArr) throws IOException {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        consumer.accept(create.getConfig());
        JsonNode readFromString = JsonTestUtils.readFromString(function.apply(create));
        consumer2.accept(readFromString);
        List list = (List) CompiledPlanUtils.toTransformations(create, create.loadPlan(PlanReference.fromJsonString(readFromString.toString()))).get(0).getTransitivePredecessors().stream().map((v0) -> {
            return v0.getUid();
        }).collect(Collectors.toList());
        Assertions.assertThat(list).hasSize(strArr.length);
        IntStream.range(0, strArr.length).forEach(i -> {
            Assertions.assertThat((String) list.get(i)).matches(strArr[i]);
        });
    }

    private static CompiledPlan planFromCurrentFlinkVersion(TableEnvironment tableEnvironment) {
        return tableEnvironment.fromValues(new Object[]{1, 2, 3}).insertInto(TableDescriptor.forConnector("blackhole").build()).compilePlan();
    }

    private static CompiledPlan planFromFlink1_15(TableEnvironment tableEnvironment) {
        return tableEnvironment.loadPlan(PlanReference.fromResource("/jsonplan/testUidFlink1_15.out"));
    }

    private static LegacySourceTransformation<?> toLegacySourceTransformation(StreamTableEnvironment streamTableEnvironment, Table table) {
        Transformation transformation = streamTableEnvironment.toChangelogStream(table).getTransformation();
        while (true) {
            Transformation transformation2 = transformation;
            if (transformation2.getInputs().size() != 1) {
                Assertions.assertThat(transformation2).isInstanceOf(LegacySourceTransformation.class);
                return (LegacySourceTransformation) transformation2;
            }
            transformation = (Transformation) transformation2.getInputs().get(0);
        }
    }

    private static void assertBoundedness(Boundedness boundedness, Transformation<?> transformation) {
        Assertions.assertThat(transformation).asInstanceOf(InstanceOfAssertFactories.type(WithBoundedness.class)).extracting((v0) -> {
            return v0.getBoundedness();
        }).isEqualTo(boundedness);
    }

    private static Schema dummySchema() {
        return Schema.newBuilder().column("i", DataTypes.INT()).build();
    }
}
