package org.apache.flink.streaming.api.graph;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Objects;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraphGeneratorTest;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.HamcrestCondition;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraphGeneratorExecutionModeDetectionTest.class */
class StreamGraphGeneratorExecutionModeDetectionTest {
    StreamGraphGeneratorExecutionModeDetectionTest() {
    }

    @Test
    void testExecutionModePropagationFromEnvWithDefaultAndBoundedSource() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.fromSource(new MockSource(Boundedness.BOUNDED, 100), WatermarkStrategy.noWatermarks(), "bounded-source").print();
        Assertions.assertThat(executionEnvironment.getStreamGraph()).is(HamcrestCondition.matching(hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, true, true)));
    }

    @Test
    void testExecutionModePropagationFromEnvWithDefaultAndUnboundedSource() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromSource(new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 100), WatermarkStrategy.noWatermarks(), "unbounded-source").print();
        Assertions.assertThat(executionEnvironment.getStreamGraph()).is(HamcrestCondition.matching(hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, false, true)));
    }

    @Test
    void testExecutionModePropagationFromEnvWithAutomaticAndBoundedSource() {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.AUTOMATIC);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.configure(configuration, getClass().getClassLoader());
        executionEnvironment.fromSource(new MockSource(Boundedness.BOUNDED, 100), WatermarkStrategy.noWatermarks(), "bounded-source").print();
        Assertions.assertThat(executionEnvironment.isChainingEnabled()).isTrue();
        Assertions.assertThat(executionEnvironment.getCheckpointInterval()).isEqualTo(100L);
        Assertions.assertThat(executionEnvironment.getStreamGraph()).is(HamcrestCondition.matching(hasProperties(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING, JobType.BATCH, false, false)));
    }

    @Test
    void testExecutionModePropagationFromEnvWithBatchAndUnboundedSource() {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.configure(configuration, getClass().getClassLoader());
        executionEnvironment.fromSource(new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 100), WatermarkStrategy.noWatermarks(), "unbounded-source").print();
        Objects.requireNonNull(executionEnvironment);
        Assertions.assertThatThrownBy(executionEnvironment::getStreamGraph).isInstanceOf(IllegalStateException.class).hasMessageContaining("combination is not allowed");
    }

    @Test
    void testDetectionThroughTransitivePredecessors() {
        SourceTransformation<Integer, ?, ?> sourceTransformation = getSourceTransformation("Bounded Source", Boundedness.BOUNDED);
        Assertions.assertThat(sourceTransformation.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
        SourceTransformation<Integer, ?, ?> sourceTransformation2 = getSourceTransformation("Unbounded Source", Boundedness.CONTINUOUS_UNBOUNDED);
        Assertions.assertThat(sourceTransformation2.getBoundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
        Assertions.assertThat(generateStreamGraph(RuntimeExecutionMode.AUTOMATIC, new TwoInputTransformation(sourceTransformation, sourceTransformation2, "Test Two Input Transformation", SimpleOperatorFactory.of(new StreamGraphGeneratorTest.OutputTypeConfigurableOperationWithTwoInputs()), BasicTypeInfo.INT_TYPE_INFO, 1))).is(HamcrestCondition.matching(hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, false, true)));
    }

    @Test
    void testBoundedDetection() {
        SourceTransformation<Integer, ?, ?> sourceTransformation = getSourceTransformation("Bounded Source", Boundedness.BOUNDED);
        Assertions.assertThat(sourceTransformation.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
        Assertions.assertThat(generateStreamGraph(RuntimeExecutionMode.AUTOMATIC, sourceTransformation)).is(HamcrestCondition.matching(hasProperties(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING, JobType.BATCH, false, false)));
    }

    @Test
    void testUnboundedDetection() {
        SourceTransformation<Integer, ?, ?> sourceTransformation = getSourceTransformation("Unbounded Source", Boundedness.CONTINUOUS_UNBOUNDED);
        Assertions.assertThat(sourceTransformation.getBoundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
        Assertions.assertThat(generateStreamGraph(RuntimeExecutionMode.AUTOMATIC, sourceTransformation)).is(HamcrestCondition.matching(hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, false, true)));
    }

    @Test
    void testMixedDetection() {
        SourceTransformation<Integer, ?, ?> sourceTransformation = getSourceTransformation("Unbounded Source", Boundedness.CONTINUOUS_UNBOUNDED);
        Assertions.assertThat(sourceTransformation.getBoundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED);
        Assertions.assertThat(getSourceTransformation("Bounded Source", Boundedness.BOUNDED).getBoundedness()).isEqualTo(Boundedness.BOUNDED);
        Assertions.assertThat(generateStreamGraph(RuntimeExecutionMode.AUTOMATIC, sourceTransformation)).is(HamcrestCondition.matching(hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, false, true)));
    }

    @Test
    void testExplicitOverridesDetectedMode() {
        SourceTransformation<Integer, ?, ?> sourceTransformation = getSourceTransformation("Bounded Source", Boundedness.BOUNDED);
        Assertions.assertThat(sourceTransformation.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
        Assertions.assertThat(generateStreamGraph(RuntimeExecutionMode.AUTOMATIC, sourceTransformation)).is(HamcrestCondition.matching(hasProperties(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING, JobType.BATCH, false, false)));
        Assertions.assertThat(generateStreamGraph(RuntimeExecutionMode.STREAMING, sourceTransformation)).is(HamcrestCondition.matching(hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, false, true)));
    }

    private StreamGraph generateStreamGraph(RuntimeExecutionMode runtimeExecutionMode, Transformation<?>... transformationArr) {
        ArrayList arrayList = new ArrayList();
        Collections.addAll(arrayList, transformationArr);
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode);
        return new StreamGraphGenerator(arrayList, new ExecutionConfig(), new CheckpointConfig(), configuration).generate();
    }

    private SourceTransformation<Integer, ?, ?> getSourceTransformation(String str, Boundedness boundedness) {
        return new SourceTransformation<>(str, new MockSource(boundedness, 100), WatermarkStrategy.noWatermarks(), IntegerTypeInfo.of(Integer.class), 1);
    }

    private static TypeSafeMatcher<StreamGraph> hasProperties(final GlobalStreamExchangeMode globalStreamExchangeMode, final JobType jobType, final boolean z, final boolean z2) {
        return new TypeSafeMatcher<StreamGraph>() { // from class: org.apache.flink.streaming.api.graph.StreamGraphGeneratorExecutionModeDetectionTest.1
            /* JADX INFO: Access modifiers changed from: protected */
            public boolean matchesSafely(StreamGraph streamGraph) {
                return globalStreamExchangeMode == streamGraph.getGlobalStreamExchangeMode() && jobType == streamGraph.getJobType() && streamGraph.getCheckpointConfig().isCheckpointingEnabled() == z && streamGraph.isAllVerticesInSameSlotSharingGroupByDefault() == z2;
            }

            public void describeTo(Description description) {
                description.appendText("a StreamGraph with exchangeMode=").appendValue(globalStreamExchangeMode).appendText(", jobType=").appendValue(jobType).appendText(", isCheckpointingEnabled=").appendValue(Boolean.valueOf(z)).appendText(", isAllVerticesInSameSlotSharingGroupByDefault=").appendValue(Boolean.valueOf(z2));
            }

            /* JADX INFO: Access modifiers changed from: protected */
            public void describeMismatchSafely(StreamGraph streamGraph, Description description) {
                description.appendText("was ").appendText("a StreamGraph with exchangeMode=").appendValue(globalStreamExchangeMode).appendText(", jobType=").appendValue(jobType).appendText(", isCheckpointingEnabled=").appendValue(Boolean.valueOf(z)).appendText(", isAllVerticesInSameSlotSharingGroupByDefault=").appendValue(Boolean.valueOf(z2));
            }
        };
    }
}
