/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.collect;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.operators.collect.AbstractCollectResultBuffer;
import org.apache.flink.streaming.api.operators.collect.CheckpointedCollectResultBuffer;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
import org.apache.flink.streaming.api.operators.collect.UncheckpointedCollectResultBuffer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class CollectResultBufferTest {
    private static final TypeSerializer<Integer> serializer = IntSerializer.INSTANCE;

    CollectResultBufferTest() {
    }

    @Test
    void testUncheckpointedValidResponse() throws Exception {
        String version = "version";
        UncheckpointedCollectResultBuffer buffer = new UncheckpointedCollectResultBuffer(serializer, false);
        CollectCoordinationResponse response = new CollectCoordinationResponse(version, 0L, Collections.emptyList());
        buffer.dealWithResponse(response, 0L);
        List<Integer> expected = Arrays.asList(1, 2, 3);
        response = new CollectCoordinationResponse(version, 0L, this.createSerializedResults(expected));
        buffer.dealWithResponse(response, 0L);
        for (Integer expectedValue : expected) {
            Assertions.assertThat((Integer)((Integer)buffer.next())).isEqualTo((Object)expectedValue);
        }
        expected = Arrays.asList(4, 5);
        response = new CollectCoordinationResponse(version, 0L, this.createSerializedResults(Arrays.asList(3, 4, 5)));
        buffer.dealWithResponse(response, 2L);
        for (Integer expectedValue : expected) {
            Assertions.assertThat((Integer)((Integer)buffer.next())).isEqualTo((Object)expectedValue);
        }
        Assertions.assertThat((Integer)((Integer)buffer.next())).isNull();
    }

    @Test
    void testUncheckpointedFaultTolerance() throws Exception {
        String version = "version";
        UncheckpointedCollectResultBuffer buffer = new UncheckpointedCollectResultBuffer(serializer, true);
        CollectCoordinationResponse response = new CollectCoordinationResponse(version, 0L, Collections.emptyList());
        buffer.dealWithResponse(response, 0L);
        List<Integer> expected = Arrays.asList(1, 2, 3);
        response = new CollectCoordinationResponse(version, 0L, this.createSerializedResults(expected));
        buffer.dealWithResponse(response, 0L);
        for (Integer expectedValue : expected) {
            Assertions.assertThat((Integer)((Integer)buffer.next())).isEqualTo((Object)expectedValue);
        }
        version = "another";
        response = new CollectCoordinationResponse(version, 0L, Collections.emptyList());
        buffer.dealWithResponse(response, 0L);
        response = new CollectCoordinationResponse(version, 0L, this.createSerializedResults(expected));
        buffer.dealWithResponse(response, 0L);
        for (Integer expectedValue : expected) {
            Assertions.assertThat((Integer)((Integer)buffer.next())).isEqualTo((Object)expectedValue);
        }
    }

    @Test
    void testUncheckpointedNotFaultTolerance() throws Exception {
        String version = "version";
        UncheckpointedCollectResultBuffer buffer = new UncheckpointedCollectResultBuffer(serializer, false);
        CollectCoordinationResponse response = new CollectCoordinationResponse(version, 0L, Collections.emptyList());
        buffer.dealWithResponse(response, 0L);
        List<Integer> expected = Arrays.asList(1, 2, 3);
        response = new CollectCoordinationResponse(version, 0L, this.createSerializedResults(expected));
        buffer.dealWithResponse(response, 0L);
        for (Integer expectedValue : expected) {
            Assertions.assertThat((Integer)((Integer)buffer.next())).isEqualTo((Object)expectedValue);
        }
        version = "another";
        CollectCoordinationResponse anotherResponse = new CollectCoordinationResponse(version, 0L, Collections.emptyList());
        Assertions.assertThatThrownBy(() -> CollectResultBufferTest.lambda$testUncheckpointedNotFaultTolerance$0((AbstractCollectResultBuffer)buffer, anotherResponse)).isInstanceOf(RuntimeException.class);
    }

    @Test
    void testCheckpointedValidResponse() throws Exception {
        String version = "version";
        CheckpointedCollectResultBuffer buffer = new CheckpointedCollectResultBuffer(serializer);
        CollectCoordinationResponse response = new CollectCoordinationResponse(version, 0L, Collections.emptyList());
        buffer.dealWithResponse(response, 0L);
        List<Integer> expected = Arrays.asList(1, 2, 3);
        response = new CollectCoordinationResponse(version, 0L, this.createSerializedResults(expected));
        buffer.dealWithResponse(response, 0L);
        Assertions.assertThat((Integer)((Integer)buffer.next())).isNull();
        response = new CollectCoordinationResponse(version, 3L, this.createSerializedResults(Arrays.asList(4, 5, 6)));
        buffer.dealWithResponse(response, 3L);
        for (Integer expectedValue : expected) {
            Assertions.assertThat((Integer)((Integer)buffer.next())).isEqualTo((Object)expectedValue);
        }
        expected = Arrays.asList(4, 5, 6);
        response = new CollectCoordinationResponse(version, 6L, this.createSerializedResults(Arrays.asList(6, 7)));
        buffer.dealWithResponse(response, 5L);
        for (Integer expectedValue : expected) {
            Assertions.assertThat((Integer)((Integer)buffer.next())).isEqualTo((Object)expectedValue);
        }
        response = new CollectCoordinationResponse(version, 6L, this.createSerializedResults(Arrays.asList(8, 9, 10)));
        buffer.dealWithResponse(response, 7L);
        expected = Arrays.asList(7);
        response = new CollectCoordinationResponse(version, 7L, this.createSerializedResults(Arrays.asList(8, 9)));
        buffer.dealWithResponse(response, 7L);
        for (Integer expectedValue : expected) {
            Assertions.assertThat((Integer)((Integer)buffer.next())).isEqualTo((Object)expectedValue);
        }
        buffer.complete();
        expected = Arrays.asList(8, 9, 10);
        for (Integer expectedValue : expected) {
            Assertions.assertThat((Integer)((Integer)buffer.next())).isEqualTo((Object)expectedValue);
        }
        Assertions.assertThat((Integer)((Integer)buffer.next())).isNull();
    }

    @Test
    void testCheckpointedRestart() throws Exception {
        String version = "version";
        CheckpointedCollectResultBuffer buffer = new CheckpointedCollectResultBuffer(serializer);
        CollectCoordinationResponse response = new CollectCoordinationResponse(version, 0L, Collections.emptyList());
        buffer.dealWithResponse(response, 0L);
        response = new CollectCoordinationResponse(version, 0L, this.createSerializedResults(Arrays.asList(1, 2, 3)));
        buffer.dealWithResponse(response, 0L);
        Assertions.assertThat((Integer)((Integer)buffer.next())).isNull();
        version = "another";
        response = new CollectCoordinationResponse(version, 0L, Collections.emptyList());
        buffer.dealWithResponse(response, 0L);
        List<Integer> expected = Arrays.asList(4, 5, 6);
        response = new CollectCoordinationResponse(version, 0L, this.createSerializedResults(expected));
        buffer.dealWithResponse(response, 0L);
        Assertions.assertThat((Integer)((Integer)buffer.next())).isNull();
        response = new CollectCoordinationResponse(version, 3L, Collections.emptyList());
        buffer.dealWithResponse(response, 0L);
        for (Integer expectedValue : expected) {
            Assertions.assertThat((Integer)((Integer)buffer.next())).isEqualTo((Object)expectedValue);
        }
        Assertions.assertThat((Integer)((Integer)buffer.next())).isNull();
    }

    @Test
    void testImmediateAccumulatorResult() throws Exception {
        String version = "version";
        UncheckpointedCollectResultBuffer buffer = new UncheckpointedCollectResultBuffer(serializer, false);
        List<Integer> expected = Arrays.asList(1, 2, 3);
        CollectCoordinationResponse response = new CollectCoordinationResponse(version, 0L, this.createSerializedResults(expected));
        buffer.dealWithResponse(response, 0L);
        buffer.complete();
        for (Integer expectedValue : expected) {
            Assertions.assertThat((Integer)((Integer)buffer.next())).isEqualTo((Object)expectedValue);
        }
        Assertions.assertThat((Integer)((Integer)buffer.next())).isNull();
    }

    private List<byte[]> createSerializedResults(List<Integer> values) throws Exception {
        ArrayList<byte[]> serializedResults = new ArrayList<byte[]>();
        for (int value : values) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper((OutputStream)baos);
            serializer.serialize((Object)value, (DataOutputView)wrapper);
            serializedResults.add(baos.toByteArray());
        }
        return serializedResults;
    }

    private static /* synthetic */ void lambda$testUncheckpointedNotFaultTolerance$0(AbstractCollectResultBuffer buffer, CollectCoordinationResponse anotherResponse) throws Throwable {
        buffer.dealWithResponse(anotherResponse, 0L);
    }
}

