/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source.datagen;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.BlockingSourceContext;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class DataGeneratorSourceTest {
    @Test
    void testRandomGenerator() throws Exception {
        long min = 10L;
        long max = 20L;
        final DataGeneratorSource source = new DataGeneratorSource((DataGenerator)RandomGenerator.longGenerator((long)min, (long)max));
        StreamSource src = new StreamSource((SourceFunction)source);
        AbstractStreamOperatorTestHarness testHarness = new AbstractStreamOperatorTestHarness((StreamOperator)src, 1, 1, 0);
        testHarness.open();
        final int totalNumber = 1000;
        final ArrayList results = new ArrayList();
        source.run((SourceFunction.SourceContext)new SourceFunction.SourceContext<Long>(){
            private Object lock = new Object();
            private int emitNumber = 0;

            public void collect(Long element) {
                if (++this.emitNumber > totalNumber) {
                    source.isRunning = false;
                }
                results.add(element);
            }

            public void collectWithTimestamp(Long element, long timestamp) {
            }

            public void emitWatermark(Watermark mark) {
            }

            public void markAsTemporarilyIdle() {
            }

            public Object getCheckpointLock() {
                return this.lock;
            }

            public void close() {
            }
        });
        for (Long l : results) {
            Assertions.assertThat((Long)l).isBetween(Long.valueOf(min), Long.valueOf(max));
        }
    }

    @Test
    void testSequenceCheckpointRestore() throws Exception {
        boolean initElement = false;
        int maxElement = 100;
        HashSet<Long> expectedOutput = new HashSet<Long>();
        for (long i = 0L; i <= 100L; ++i) {
            expectedOutput.add(i);
        }
        DataGeneratorSourceTest.innerTestDataGenCheckpointRestore(() -> new DataGeneratorSource((DataGenerator)SequenceGenerator.longGenerator((long)0L, (long)100L)), expectedOutput);
    }

    public static <T> void innerTestDataGenCheckpointRestore(Supplier<DataGeneratorSource<T>> supplier, Set<T> expectedOutput) throws Exception {
        int maxParallelsim = 2;
        ConcurrentHashMap outputCollector = new ConcurrentHashMap();
        OneShotLatch latchToTrigger1 = new OneShotLatch();
        OneShotLatch latchToWait1 = new OneShotLatch();
        OneShotLatch latchToTrigger2 = new OneShotLatch();
        OneShotLatch latchToWait2 = new OneShotLatch();
        DataGeneratorSource source1 = supplier.get();
        StreamSource src1 = new StreamSource(source1);
        AbstractStreamOperatorTestHarness testHarness1 = new AbstractStreamOperatorTestHarness((StreamOperator)src1, 2, 2, 0);
        testHarness1.open();
        DataGeneratorSource source2 = supplier.get();
        StreamSource src2 = new StreamSource(source2);
        AbstractStreamOperatorTestHarness testHarness2 = new AbstractStreamOperatorTestHarness((StreamOperator)src2, 2, 2, 1);
        testHarness2.open();
        Thread runner1 = new Thread(() -> {
            try {
                source1.run((SourceFunction.SourceContext)new BlockingSourceContext("1", latchToTrigger1, latchToWait1, outputCollector, 21));
            }
            catch (Throwable t) {
                t.printStackTrace();
            }
        });
        Thread runner2 = new Thread(() -> {
            try {
                source2.run((SourceFunction.SourceContext)new BlockingSourceContext("2", latchToTrigger2, latchToWait2, outputCollector, 32));
            }
            catch (Throwable t) {
                t.printStackTrace();
            }
        });
        runner1.start();
        runner2.start();
        if (!latchToTrigger1.isTriggered()) {
            latchToTrigger1.await();
        }
        if (!latchToTrigger2.isTriggered()) {
            latchToTrigger2.await();
        }
        OperatorSubtaskState snapshot = AbstractStreamOperatorTestHarness.repackageState((OperatorSubtaskState[])new OperatorSubtaskState[]{testHarness1.snapshot(0L, 0L), testHarness2.snapshot(0L, 0L)});
        DataGeneratorSource source3 = supplier.get();
        StreamSource src3 = new StreamSource(source3);
        OperatorSubtaskState initState = AbstractStreamOperatorTestHarness.repartitionOperatorState((OperatorSubtaskState)snapshot, (int)2, (int)2, (int)1, (int)0);
        AbstractStreamOperatorTestHarness testHarness3 = new AbstractStreamOperatorTestHarness((StreamOperator)src3, 2, 1, 0);
        testHarness3.setup();
        testHarness3.initializeState(initState);
        testHarness3.open();
        OneShotLatch latchToTrigger3 = new OneShotLatch();
        OneShotLatch latchToWait3 = new OneShotLatch();
        latchToWait3.trigger();
        Thread runner3 = new Thread(() -> {
            try {
                source3.run((SourceFunction.SourceContext)new BlockingSourceContext("3", latchToTrigger3, latchToWait3, outputCollector, 3));
            }
            catch (Throwable t) {
                t.printStackTrace();
            }
        });
        runner3.start();
        runner3.join();
        Assertions.assertThat(outputCollector).hasSize(3);
        HashSet dedupRes = new HashSet(expectedOutput.size());
        for (Map.Entry elementsPerTask : outputCollector.entrySet()) {
            String key = (String)elementsPerTask.getKey();
            List elements = (List)outputCollector.get(key);
            Assertions.assertThat((List)elements).isNotEmpty();
            for (Object elem : elements) {
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)dedupRes.add(elem)).as("Duplicate entry: " + elem, new Object[0])).isTrue();
                ((AbstractCollectionAssert)Assertions.assertThat(expectedOutput).as("Unexpected element: " + elem, new Object[0])).contains(new Object[]{elem});
            }
        }
        Assertions.assertThat(dedupRes).hasSameSizeAs(expectedOutput);
        latchToWait1.trigger();
        latchToWait2.trigger();
        runner1.join();
        runner2.join();
    }
}

