package org.apache.flink.streaming.api.operators.async.queue;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.class */
class StreamElementQueueTest {
    private final AsyncDataStream.OutputMode outputMode;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$api$datastream$AsyncDataStream$OutputMode = new int[AsyncDataStream.OutputMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$api$datastream$AsyncDataStream$OutputMode[AsyncDataStream.OutputMode.ORDERED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$datastream$AsyncDataStream$OutputMode[AsyncDataStream.OutputMode.UNORDERED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @Parameters
    private static Collection<AsyncDataStream.OutputMode> outputModes() {
        return Arrays.asList(AsyncDataStream.OutputMode.ORDERED, AsyncDataStream.OutputMode.UNORDERED);
    }

    StreamElementQueueTest(AsyncDataStream.OutputMode outputMode) {
        this.outputMode = (AsyncDataStream.OutputMode) Preconditions.checkNotNull(outputMode);
    }

    private StreamElementQueue<Integer> createStreamElementQueue(int i) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$datastream$AsyncDataStream$OutputMode[this.outputMode.ordinal()]) {
            case 1:
                return new OrderedStreamElementQueue(i);
            case 2:
                return new UnorderedStreamElementQueue(i);
            default:
                throw new IllegalStateException("Unknown output mode: " + this.outputMode);
        }
    }

    @TestTemplate
    void testPut() {
        StreamElementQueue<Integer> createStreamElementQueue = createStreamElementQueue(2);
        StreamElement watermark = new Watermark(0L);
        StreamElement streamRecord = new StreamRecord(42, 1L);
        Assertions.assertThat(createStreamElementQueue.tryPut(watermark)).isPresent();
        Assertions.assertThat(createStreamElementQueue.tryPut(streamRecord)).isPresent();
        Assertions.assertThat(createStreamElementQueue.size()).isEqualTo(2);
        Assertions.assertThat(createStreamElementQueue.tryPut(new Watermark(2L))).isNotPresent();
        Assertions.assertThat(createStreamElementQueue.values()).containsExactly(new StreamElement[]{watermark, streamRecord});
    }

    @TestTemplate
    void testPop() {
        StreamElementQueue<Integer> createStreamElementQueue = createStreamElementQueue(2);
        QueueUtil.putSuccessfully(createStreamElementQueue, new Watermark(0L));
        ResultFuture<Integer> putSuccessfully = QueueUtil.putSuccessfully(createStreamElementQueue, new StreamRecord(42, 1L));
        Assertions.assertThat(createStreamElementQueue.size()).isEqualTo(2);
        Assertions.assertThat(QueueUtil.popCompleted(createStreamElementQueue)).containsExactly(new StreamElement[]{new Watermark(0L)});
        Assertions.assertThat(createStreamElementQueue.size()).isOne();
        putSuccessfully.complete(Collections.singleton(43));
        Assertions.assertThat(QueueUtil.popCompleted(createStreamElementQueue)).containsExactly(new StreamElement[]{new StreamRecord(43, 1L)});
        Assertions.assertThat(createStreamElementQueue.size()).isZero();
        Assertions.assertThat(createStreamElementQueue.isEmpty()).isTrue();
    }

    @TestTemplate
    void testPutOnFull() {
        StreamElementQueue<Integer> createStreamElementQueue = createStreamElementQueue(1);
        ResultFuture<Integer> putSuccessfully = QueueUtil.putSuccessfully(createStreamElementQueue, new StreamRecord(42, 0L));
        Assertions.assertThat(createStreamElementQueue.size()).isOne();
        QueueUtil.putUnsuccessfully(createStreamElementQueue, new StreamRecord(43, 1L));
        putSuccessfully.complete(Collections.singleton(1764));
        Assertions.assertThat(QueueUtil.popCompleted(createStreamElementQueue)).containsExactly(new StreamElement[]{new StreamRecord(1764, 0L)});
        QueueUtil.putSuccessfully(createStreamElementQueue, new StreamRecord(43, 1L));
    }

    @TestTemplate
    void testWatermarkOnly() {
        StreamElementQueue<Integer> createStreamElementQueue = createStreamElementQueue(2);
        QueueUtil.putSuccessfully(createStreamElementQueue, new Watermark(2L));
        QueueUtil.putSuccessfully(createStreamElementQueue, new Watermark(5L));
        Assertions.assertThat(createStreamElementQueue.size()).isEqualTo(2);
        Assertions.assertThat(createStreamElementQueue.isEmpty()).isFalse();
        Assertions.assertThat(QueueUtil.popCompleted(createStreamElementQueue)).containsExactly(new StreamElement[]{new Watermark(2L), new Watermark(5L)});
        Assertions.assertThat(createStreamElementQueue.size()).isZero();
        Assertions.assertThat(QueueUtil.popCompleted(createStreamElementQueue)).isEmpty();
    }
}
