/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.source;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.api.operators.source.OnEventTestWatermarkGenerator;
import org.apache.flink.streaming.api.operators.source.OnPeriodicTestWatermarkGenerator;
import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class SourceOperatorEventTimeTest {
    SourceOperatorEventTimeTest() {
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testMainOutputPeriodicWatermarks(boolean emitProgressiveWatermarks) throws Exception {
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new OnPeriodicTestWatermarkGenerator());
        List<Watermark> result = this.testSequenceOfWatermarks(emitProgressiveWatermarks, (WatermarkStrategy<Integer>)watermarkStrategy, output -> output.collect((Object)0, 100L), output -> output.collect((Object)0, 120L), output -> output.collect((Object)0, 110L));
        this.assertWatermarksOrEmpty(emitProgressiveWatermarks, result, new Watermark(100L), new Watermark(120L));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testMainOutputEventWatermarks(boolean emitProgressiveWatermarks) throws Exception {
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new OnEventTestWatermarkGenerator());
        List<Watermark> result = this.testSequenceOfWatermarks(emitProgressiveWatermarks, (WatermarkStrategy<Integer>)watermarkStrategy, output -> output.collect((Object)0, 100L), output -> output.collect((Object)0, 120L), output -> output.collect((Object)0, 110L));
        this.assertWatermarksOrEmpty(emitProgressiveWatermarks, result, new Watermark(100L), new Watermark(120L));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testPerSplitOutputPeriodicWatermarks(boolean emitProgressiveWatermarks) throws Exception {
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new OnPeriodicTestWatermarkGenerator());
        List<Watermark> result = this.testSequenceOfWatermarks(emitProgressiveWatermarks, (WatermarkStrategy<Integer>)watermarkStrategy, output -> {
            output.createOutputForSplit("A");
            output.createOutputForSplit("B");
        }, output -> output.createOutputForSplit("A").collect((Object)0, 100L), output -> output.createOutputForSplit("B").collect((Object)0, 200L), output -> output.createOutputForSplit("A").collect((Object)0, 150L), output -> output.releaseOutputForSplit("A"), output -> output.createOutputForSplit("B").collect((Object)0, 200L));
        this.assertWatermarksOrEmpty(emitProgressiveWatermarks, result, new Watermark(100L), new Watermark(150L), new Watermark(200L));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testPerSplitOutputEventWatermarks(boolean emitProgressiveWatermarks) throws Exception {
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new OnEventTestWatermarkGenerator());
        List<Watermark> result = this.testSequenceOfWatermarks(emitProgressiveWatermarks, (WatermarkStrategy<Integer>)watermarkStrategy, output -> {
            output.createOutputForSplit("one");
            output.createOutputForSplit("two");
        }, output -> output.createOutputForSplit("one").collect((Object)0, 100L), output -> output.createOutputForSplit("two").collect((Object)0, 200L), output -> output.createOutputForSplit("one").collect((Object)0, 150L), output -> output.releaseOutputForSplit("one"), output -> output.createOutputForSplit("two").collect((Object)0, 200L));
        this.assertWatermarksOrEmpty(emitProgressiveWatermarks, result, new Watermark(100L), new Watermark(150L), new Watermark(200L));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testCreatingPerSplitOutputOnSplitAddition(boolean emitProgressiveWatermarks) throws Exception {
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new OnEventTestWatermarkGenerator());
        InterpretingSourceReader reader = new InterpretingSourceReader(output -> output.createOutputForSplit("1").collect((Object)0, 100L), output -> output.createOutputForSplit("1").collect((Object)0, 200L), output -> output.createOutputForSplit("1").collect((Object)0, 300L), output -> output.createOutputForSplit("2").collect((Object)0, 150L), output -> output.createOutputForSplit("2").collect((Object)0, 400L));
        SourceOperator<Integer, MockSourceSplit> sourceOperator = TestingSourceOperator.createTestOperator(reader, watermarkStrategy, emitProgressiveWatermarks);
        sourceOperator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Arrays.asList(new MockSourceSplit(1), new MockSourceSplit(2)), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        List<Watermark> result = this.testSequenceOfWatermarks(sourceOperator);
        this.assertWatermarksOrEmpty(emitProgressiveWatermarks, result, new Watermark(150L), new Watermark(300L));
    }

    @Test
    void testMainAndPerSplitWatermarkIdleness() throws Exception {
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new OnEventTestWatermarkGenerator());
        InterpretingSourceReader reader = new InterpretingSourceReader(output -> output.collect((Object)0, 100L), ReaderOutput::markIdle, output -> output.createOutputForSplit("1").collect((Object)0, 150L), output -> output.createOutputForSplit("2").collect((Object)0, 200L), output -> output.createOutputForSplit("1").markIdle(), output -> output.createOutputForSplit("2").markIdle(), output -> output.collect((Object)0, 250L), output -> output.createOutputForSplit("1").markIdle(), output -> output.createOutputForSplit("2").markIdle(), ReaderOutput::markIdle, output -> output.createOutputForSplit("1").collect((Object)0, 300L));
        SourceOperator<Integer, MockSourceSplit> sourceOperator = TestingSourceOperator.createTestOperator(reader, watermarkStrategy, true);
        List<Object> events = this.testSequenceOfEvents(sourceOperator);
        Assertions.assertThat(events).containsExactly(new Object[]{new StreamRecord((Object)0, 100L), new Watermark(100L), new WatermarkStatus(-1), new StreamRecord((Object)0, 150L), new WatermarkStatus(0), new Watermark(150L), new StreamRecord((Object)0, 200L), new Watermark(200L), new WatermarkStatus(-1), new StreamRecord((Object)0, 250L), new WatermarkStatus(0), new Watermark(250L), new WatermarkStatus(-1), new StreamRecord((Object)0, 300L), new WatermarkStatus(0), new Watermark(300L)});
    }

    private void assertWatermarksOrEmpty(boolean emitProgressiveWatermarks, List<Watermark> actualWatermarks, Watermark ... expectedWatermarks) {
        if (emitProgressiveWatermarks) {
            Assertions.assertThat(actualWatermarks).contains((Object[])expectedWatermarks);
        } else {
            Assertions.assertThat(actualWatermarks).isEmpty();
        }
    }

    @SafeVarargs
    private final List<Watermark> testSequenceOfWatermarks(boolean emitProgressiveWatermarks, WatermarkStrategy<Integer> watermarkStrategy, Consumer<ReaderOutput<Integer>> ... actions) throws Exception {
        InterpretingSourceReader reader = new InterpretingSourceReader(actions);
        SourceOperator<Integer, MockSourceSplit> sourceOperator = TestingSourceOperator.createTestOperator(reader, watermarkStrategy, emitProgressiveWatermarks);
        return this.testSequenceOfWatermarks(sourceOperator);
    }

    private final List<Watermark> testSequenceOfWatermarks(SourceOperator<Integer, MockSourceSplit> sourceOperator) throws Exception {
        List<Object> allEvents = this.testSequenceOfEvents(sourceOperator);
        return allEvents.stream().filter(evt -> evt instanceof Watermark).map(evt -> (Watermark)evt).collect(Collectors.toList());
    }

    private final List<Object> testSequenceOfEvents(SourceOperator<Integer, MockSourceSplit> sourceOperator) throws Exception {
        CollectingDataOutput out = new CollectingDataOutput();
        TestProcessingTimeService timeService = (TestProcessingTimeService)sourceOperator.getProcessingTimeService();
        while (sourceOperator.emitNext(out) != DataInputStatus.END_OF_INPUT) {
            timeService.setCurrentTime(timeService.getCurrentProcessingTime() + 100L);
        }
        return out.events;
    }

    private static final class InterpretingSourceReader
    implements SourceReader<Integer, MockSourceSplit> {
        private final Iterator<Consumer<ReaderOutput<Integer>>> actions;

        @SafeVarargs
        private InterpretingSourceReader(Consumer<ReaderOutput<Integer>> ... actions) {
            this.actions = Arrays.asList(actions).iterator();
        }

        public void start() {
        }

        public InputStatus pollNext(ReaderOutput<Integer> output) {
            if (this.actions.hasNext()) {
                this.actions.next().accept(output);
                return InputStatus.MORE_AVAILABLE;
            }
            return InputStatus.END_OF_INPUT;
        }

        public List<MockSourceSplit> snapshotState(long checkpointId) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<Void> isAvailable() {
            return CompletableFuture.completedFuture(null);
        }

        public void addSplits(List<MockSourceSplit> splits) {
        }

        public void notifyNoMoreSplits() {
        }

        public void close() {
        }
    }
}

