/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.async.queue;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.QueueUtil;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class StreamElementQueueTest
extends TestLogger {
    private final AsyncDataStream.OutputMode outputMode;

    @Parameterized.Parameters
    public static Collection<AsyncDataStream.OutputMode> outputModes() {
        return Arrays.asList(AsyncDataStream.OutputMode.ORDERED, AsyncDataStream.OutputMode.UNORDERED);
    }

    public StreamElementQueueTest(AsyncDataStream.OutputMode outputMode) {
        this.outputMode = (AsyncDataStream.OutputMode)Preconditions.checkNotNull((Object)outputMode);
    }

    private StreamElementQueue<Integer> createStreamElementQueue(int capacity) {
        switch (this.outputMode) {
            case ORDERED: {
                return new OrderedStreamElementQueue(capacity);
            }
            case UNORDERED: {
                return new UnorderedStreamElementQueue(capacity);
            }
        }
        throw new IllegalStateException("Unknown output mode: " + this.outputMode);
    }

    @Test
    public void testPut() {
        StreamElementQueue<Integer> queue = this.createStreamElementQueue(2);
        Watermark watermark = new Watermark(0L);
        StreamRecord streamRecord = new StreamRecord((Object)42, 1L);
        Assert.assertTrue((boolean)queue.tryPut((StreamElement)watermark).isPresent());
        Assert.assertTrue((boolean)queue.tryPut((StreamElement)streamRecord).isPresent());
        Assert.assertEquals((long)2L, (long)queue.size());
        Assert.assertFalse((boolean)queue.tryPut((StreamElement)new Watermark(2L)).isPresent());
        Assert.assertEquals(Arrays.asList(watermark, streamRecord), (Object)queue.values());
    }

    @Test
    public void testPop() {
        StreamElementQueue<Integer> queue = this.createStreamElementQueue(2);
        QueueUtil.putSuccessfully(queue, (StreamElement)new Watermark(0L));
        ResultFuture<Integer> recordResult = QueueUtil.putSuccessfully(queue, (StreamElement)new StreamRecord((Object)42, 1L));
        Assert.assertEquals((long)2L, (long)queue.size());
        Assert.assertEquals(Arrays.asList(new Watermark(0L)), QueueUtil.popCompleted(queue));
        Assert.assertEquals((long)1L, (long)queue.size());
        recordResult.complete(Collections.singleton(43));
        Assert.assertEquals(Arrays.asList(new StreamRecord((Object)43, 1L)), QueueUtil.popCompleted(queue));
        Assert.assertEquals((long)0L, (long)queue.size());
        Assert.assertTrue((boolean)queue.isEmpty());
    }

    @Test
    public void testPutOnFull() throws Exception {
        StreamElementQueue<Integer> queue = this.createStreamElementQueue(1);
        ResultFuture<Integer> resultFuture = QueueUtil.putSuccessfully(queue, (StreamElement)new StreamRecord((Object)42, 0L));
        Assert.assertEquals((long)1L, (long)queue.size());
        QueueUtil.putUnsuccessfully(queue, (StreamElement)new StreamRecord((Object)43, 1L));
        resultFuture.complete(Collections.singleton(1764));
        Assert.assertEquals(Arrays.asList(new StreamRecord((Object)1764, 0L)), QueueUtil.popCompleted(queue));
        QueueUtil.putSuccessfully(queue, (StreamElement)new StreamRecord((Object)43, 1L));
    }

    @Test
    public void testWatermarkOnly() {
        StreamElementQueue<Integer> queue = this.createStreamElementQueue(2);
        QueueUtil.putSuccessfully(queue, (StreamElement)new Watermark(2L));
        QueueUtil.putSuccessfully(queue, (StreamElement)new Watermark(5L));
        Assert.assertEquals((long)2L, (long)queue.size());
        Assert.assertFalse((boolean)queue.isEmpty());
        Assert.assertEquals(Arrays.asList(new Watermark(2L), new Watermark(5L)), QueueUtil.popCompleted(queue));
        Assert.assertEquals((long)0L, (long)queue.size());
        Assert.assertTrue((boolean)queue.isEmpty());
        Assert.assertEquals(Collections.emptyList(), QueueUtil.popCompleted(queue));
    }
}

