/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.functions.source.legacy.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class StreamSourceOperatorWatermarksTest {
    StreamSourceOperatorWatermarksTest() {
    }

    @Test
    void testEmitMaxWatermarkForFiniteSource() throws Exception {
        StreamSource sourceOperator = new StreamSource(new FiniteSource());
        StreamTaskTestHarness testHarness = StreamSourceOperatorWatermarksTest.setupSourceStreamTask(sourceOperator, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.invoke();
        testHarness.waitForTaskCompletion();
        Assertions.assertThat(testHarness.getOutput()).hasSize(1);
        Assertions.assertThat((Object)testHarness.getOutput().peek()).isEqualTo((Object)Watermark.MAX_WATERMARK);
    }

    @Test
    void testDisabledProgressiveWatermarksForFiniteSource() throws Exception {
        StreamSource sourceOperator = new StreamSource(new FiniteSourceWithWatermarks(), false);
        StreamTaskTestHarness testHarness = StreamSourceOperatorWatermarksTest.setupSourceStreamTask(sourceOperator, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.invoke();
        testHarness.waitForTaskCompletion();
        Assertions.assertThat((Object)testHarness.getOutput().poll()).isEqualTo((Object)Watermark.MAX_WATERMARK);
        Assertions.assertThat((Object)testHarness.getOutput().poll()).isEqualTo((Object)Watermark.MAX_WATERMARK);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
    }

    @Test
    void testNoMaxWatermarkOnImmediateCancel() throws Exception {
        StreamSource sourceOperator = new StreamSource(new InfiniteSource());
        StreamTaskTestHarness testHarness = StreamSourceOperatorWatermarksTest.setupSourceStreamTask(sourceOperator, BasicTypeInfo.STRING_TYPE_INFO, true);
        testHarness.invoke();
        Assertions.assertThatThrownBy(testHarness::waitForTaskCompletion).hasCauseInstanceOf(CancelTaskException.class);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
    }

    @Test
    void testNoMaxWatermarkOnAsyncCancel() throws Exception {
        StreamTaskTestHarness testHarness;
        block2: {
            StreamSource sourceOperator = new StreamSource(new InfiniteSource());
            testHarness = StreamSourceOperatorWatermarksTest.setupSourceStreamTask(sourceOperator, BasicTypeInfo.STRING_TYPE_INFO);
            testHarness.invoke();
            testHarness.waitForTaskRunning();
            Thread.sleep(200L);
            testHarness.getTask().cancel();
            try {
                testHarness.waitForTaskCompletion();
            }
            catch (Throwable t) {
                if (ExceptionUtils.findThrowable((Throwable)t, CancelTaskException.class).isPresent()) break block2;
                throw t;
            }
        }
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
    }

    private static <T> StreamTaskTestHarness<T> setupSourceStreamTask(StreamSource<T, ?> sourceOperator, TypeInformation<T> outputType) {
        return StreamSourceOperatorWatermarksTest.setupSourceStreamTask(sourceOperator, outputType, false);
    }

    private static <T> StreamTaskTestHarness<T> setupSourceStreamTask(StreamSource<T, ?> sourceOperator, TypeInformation<T> outputType, boolean cancelImmediatelyAfterCreation) {
        StreamTaskTestHarness<T> testHarness = new StreamTaskTestHarness<T>(env -> {
            SourceStreamTask sourceTask = new SourceStreamTask(env);
            if (cancelImmediatelyAfterCreation) {
                try {
                    sourceTask.cancel();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            return sourceTask;
        }, outputType);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setStreamOperator(sourceOperator);
        streamConfig.setOperatorID(new OperatorID());
        return testHarness;
    }

    private static final class InfiniteSource<T>
    implements SourceFunction<T> {
        private volatile boolean running = true;

        private InfiniteSource() {
        }

        public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
            while (this.running) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    private static final class FiniteSourceWithWatermarks<T>
    extends RichSourceFunction<T> {
        private FiniteSourceWithWatermarks() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<T> ctx) {
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                ctx.emitWatermark(new Watermark(1000L));
                ctx.emitWatermark(new Watermark(2000L));
                ctx.emitWatermark(Watermark.MAX_WATERMARK);
            }
        }

        public void cancel() {
        }
    }

    private static final class FiniteSource<T>
    extends RichSourceFunction<T> {
        private FiniteSource() {
        }

        public void run(SourceFunction.SourceContext<T> ctx) {
        }

        public void cancel() {
        }
    }
}

