package org.apache.flink.streaming.api.lineage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest.class */
class LineageGraphUtilsTest {
    private static final String SOURCE_DATASET_NAME = "LineageSource";
    private static final String SOURCE_DATASET_NAMESPACE = "source://LineageSource";
    private static final String SINK_DATASET_NAME = "LineageSink";
    private static final String SINK_DATASET_NAMESPACE = "sink://LineageSink";
    private static final String LEGACY_SOURCE_DATASET_NAME = "LineageSourceFunction";
    private static final String LEGACY_SOURCE_DATASET_NAMESPACE = "source://LineageSourceFunction";
    private static final String LEGACY_SINK_DATASET_NAME = "LineageSinkFunction";
    private static final String LEGACY_SINK_DATASET_NAMESPACE = "sink://LineageSinkFunction";

    /* loaded from: input_file:org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest$LineageSink.class */
    private static class LineageSink extends DiscardingSink<Long> implements LineageVertexProvider {
        public LineageVertex getLineageVertex() {
            DefaultLineageDataset defaultLineageDataset = new DefaultLineageDataset(LineageGraphUtilsTest.SINK_DATASET_NAME, LineageGraphUtilsTest.SINK_DATASET_NAMESPACE, new HashMap());
            DefaultLineageVertex defaultLineageVertex = new DefaultLineageVertex();
            defaultLineageVertex.addLineageDataset(defaultLineageDataset);
            return defaultLineageVertex;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest$LineageSinkFunction.class */
    private static class LineageSinkFunction implements SinkFunction<Long>, LineageVertexProvider {
        private LineageSinkFunction() {
        }

        public LineageVertex getLineageVertex() {
            DefaultLineageDataset defaultLineageDataset = new DefaultLineageDataset(LineageGraphUtilsTest.LEGACY_SINK_DATASET_NAME, LineageGraphUtilsTest.LEGACY_SINK_DATASET_NAMESPACE, new HashMap());
            DefaultLineageVertex defaultLineageVertex = new DefaultLineageVertex();
            defaultLineageVertex.addLineageDataset(defaultLineageDataset);
            return defaultLineageVertex;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest$LineageSource.class */
    private static class LineageSource extends NumberSequenceSource implements LineageVertexProvider {
        public LineageSource(long j, long j2) {
            super(j, j2);
        }

        public LineageVertex getLineageVertex() {
            DefaultLineageDataset defaultLineageDataset = new DefaultLineageDataset(LineageGraphUtilsTest.SOURCE_DATASET_NAME, LineageGraphUtilsTest.SOURCE_DATASET_NAMESPACE, new HashMap());
            DefaultSourceLineageVertex defaultSourceLineageVertex = new DefaultSourceLineageVertex(Boundedness.BOUNDED);
            defaultSourceLineageVertex.addDataset(defaultLineageDataset);
            return defaultSourceLineageVertex;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/lineage/LineageGraphUtilsTest$LineageSourceFunction.class */
    private static class LineageSourceFunction implements SourceFunction<Long>, LineageVertexProvider {
        private volatile boolean running = true;

        private LineageSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<Long> sourceContext) throws Exception {
            long j = 0;
            while (this.running) {
                synchronized (sourceContext.getCheckpointLock()) {
                    long j2 = j;
                    j = j2 + 1;
                    sourceContext.collect(Long.valueOf(j2));
                }
            }
        }

        public void cancel() {
            this.running = false;
        }

        public LineageVertex getLineageVertex() {
            DefaultLineageDataset defaultLineageDataset = new DefaultLineageDataset(LineageGraphUtilsTest.LEGACY_SOURCE_DATASET_NAME, LineageGraphUtilsTest.LEGACY_SOURCE_DATASET_NAMESPACE, new HashMap());
            DefaultSourceLineageVertex defaultSourceLineageVertex = new DefaultSourceLineageVertex(Boundedness.CONTINUOUS_UNBOUNDED);
            defaultSourceLineageVertex.addDataset(defaultLineageDataset);
            return defaultSourceLineageVertex;
        }
    }

    LineageGraphUtilsTest() {
    }

    @Test
    void testExtractLineageGraphFromLegacyTransformations() {
        LineageGraph convertToLineageGraph = LineageGraphUtils.convertToLineageGraph(Arrays.asList(StreamExecutionEnvironment.getExecutionEnvironment().addSource(new LineageSourceFunction()).addSink(new LineageSinkFunction()).getTransformation()));
        Assertions.assertThat(convertToLineageGraph.sources().size()).isEqualTo(1);
        Assertions.assertThat(((SourceLineageVertex) convertToLineageGraph.sources().get(0)).boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
        Assertions.assertThat(((SourceLineageVertex) convertToLineageGraph.sources().get(0)).datasets().size()).isEqualTo(1);
        Assertions.assertThat(((LineageDataset) ((SourceLineageVertex) convertToLineageGraph.sources().get(0)).datasets().get(0)).name()).isEqualTo(LEGACY_SOURCE_DATASET_NAME);
        Assertions.assertThat(((LineageDataset) ((SourceLineageVertex) convertToLineageGraph.sources().get(0)).datasets().get(0)).namespace()).isEqualTo(LEGACY_SOURCE_DATASET_NAMESPACE);
        Assertions.assertThat(convertToLineageGraph.sinks().size()).isEqualTo(1);
        Assertions.assertThat(((LineageVertex) convertToLineageGraph.sinks().get(0)).datasets().size()).isEqualTo(1);
        Assertions.assertThat(((LineageDataset) ((LineageVertex) convertToLineageGraph.sinks().get(0)).datasets().get(0)).name()).isEqualTo(LEGACY_SINK_DATASET_NAME);
        Assertions.assertThat(((LineageDataset) ((LineageVertex) convertToLineageGraph.sinks().get(0)).datasets().get(0)).namespace()).isEqualTo(LEGACY_SINK_DATASET_NAMESPACE);
        Assertions.assertThat(convertToLineageGraph.relations().size()).isEqualTo(1);
    }

    @Test
    void testExtractLineageGraphFromTransformations() {
        LineageGraph convertToLineageGraph = LineageGraphUtils.convertToLineageGraph(Arrays.asList(StreamExecutionEnvironment.getExecutionEnvironment().fromSource(new LineageSource(1L, 5L), WatermarkStrategy.noWatermarks(), "").sinkTo(new LineageSink()).getTransformation()));
        Assertions.assertThat(convertToLineageGraph.sources().size()).isEqualTo(1);
        Assertions.assertThat(((SourceLineageVertex) convertToLineageGraph.sources().get(0)).boundedness()).isEqualTo(Boundedness.BOUNDED);
        Assertions.assertThat(((SourceLineageVertex) convertToLineageGraph.sources().get(0)).datasets().size()).isEqualTo(1);
        Assertions.assertThat(((LineageDataset) ((SourceLineageVertex) convertToLineageGraph.sources().get(0)).datasets().get(0)).name()).isEqualTo(SOURCE_DATASET_NAME);
        Assertions.assertThat(((LineageDataset) ((SourceLineageVertex) convertToLineageGraph.sources().get(0)).datasets().get(0)).namespace()).isEqualTo(SOURCE_DATASET_NAMESPACE);
        Assertions.assertThat(convertToLineageGraph.sinks().size()).isEqualTo(1);
        Assertions.assertThat(((LineageVertex) convertToLineageGraph.sinks().get(0)).datasets().size()).isEqualTo(1);
        Assertions.assertThat(((LineageDataset) ((LineageVertex) convertToLineageGraph.sinks().get(0)).datasets().get(0)).name()).isEqualTo(SINK_DATASET_NAME);
        Assertions.assertThat(((LineageDataset) ((LineageVertex) convertToLineageGraph.sinks().get(0)).datasets().get(0)).namespace()).isEqualTo(SINK_DATASET_NAMESPACE);
        Assertions.assertThat(convertToLineageGraph.relations().size()).isEqualTo(1);
    }

    @Test
    void testExtractPartialLineageGraphWithSourceOnly() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromSource(new LineageSource(1L, 5L), WatermarkStrategy.noWatermarks(), "").sinkTo(new DiscardingSink());
        LineageGraph convertToLineageGraph = LineageGraphUtils.convertToLineageGraph(executionEnvironment.getTransformations());
        Assertions.assertThat(convertToLineageGraph.sources().size()).isEqualTo(1);
        Assertions.assertThat(((SourceLineageVertex) convertToLineageGraph.sources().get(0)).boundedness()).isEqualTo(Boundedness.BOUNDED);
        Assertions.assertThat(((SourceLineageVertex) convertToLineageGraph.sources().get(0)).datasets().size()).isEqualTo(1);
        Assertions.assertThat(((LineageDataset) ((SourceLineageVertex) convertToLineageGraph.sources().get(0)).datasets().get(0)).name()).isEqualTo(SOURCE_DATASET_NAME);
        Assertions.assertThat(((LineageDataset) ((SourceLineageVertex) convertToLineageGraph.sources().get(0)).datasets().get(0)).namespace()).isEqualTo(SOURCE_DATASET_NAMESPACE);
        Assertions.assertThat(convertToLineageGraph.sinks()).isEmpty();
        Assertions.assertThat(convertToLineageGraph.relations()).isEmpty();
    }

    @Test
    void testExtractPartialLineageGraphWithSinkOnly() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromSource(new NumberSequenceSource(1L, 5L), WatermarkStrategy.noWatermarks(), "").sinkTo(new LineageSink());
        LineageGraph convertToLineageGraph = LineageGraphUtils.convertToLineageGraph(executionEnvironment.getTransformations());
        Assertions.assertThat(convertToLineageGraph.sinks().size()).isEqualTo(1);
        Assertions.assertThat(((LineageVertex) convertToLineageGraph.sinks().get(0)).datasets().size()).isEqualTo(1);
        Assertions.assertThat(((LineageDataset) ((LineageVertex) convertToLineageGraph.sinks().get(0)).datasets().get(0)).name()).isEqualTo(SINK_DATASET_NAME);
        Assertions.assertThat(((LineageDataset) ((LineageVertex) convertToLineageGraph.sinks().get(0)).datasets().get(0)).namespace()).isEqualTo(SINK_DATASET_NAMESPACE);
        Assertions.assertThat(convertToLineageGraph.sources()).isEmpty();
        Assertions.assertThat(convertToLineageGraph.relations()).isEmpty();
    }

    @Test
    void testSourceDeduplication() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromSource(new LineageSource(1L, 5L), WatermarkStrategy.noWatermarks(), "").sinkTo(new DiscardingSink());
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(executionEnvironment.getTransformations());
        arrayList.addAll(executionEnvironment.getTransformations());
        LineageGraph convertToLineageGraph = LineageGraphUtils.convertToLineageGraph(arrayList);
        Assertions.assertThat(convertToLineageGraph.sources().size()).isEqualTo(1);
        Assertions.assertThat(convertToLineageGraph.sinks().size()).isEqualTo(0);
        Assertions.assertThat(convertToLineageGraph.relations()).isEmpty();
    }

    @Test
    void testSinkDuduplication() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromSource(new NumberSequenceSource(1L, 5L), WatermarkStrategy.noWatermarks(), "").sinkTo(new LineageSink());
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(executionEnvironment.getTransformations());
        arrayList.addAll(executionEnvironment.getTransformations());
        LineageGraph convertToLineageGraph = LineageGraphUtils.convertToLineageGraph(arrayList);
        Assertions.assertThat(convertToLineageGraph.sources().size()).isEqualTo(0);
        Assertions.assertThat(convertToLineageGraph.sinks().size()).isEqualTo(1);
        Assertions.assertThat(convertToLineageGraph.relations()).isEmpty();
    }
}
