package org.apache.flink.streaming.api.operators.collect;

import java.io.IOException;
import java.net.BindException;
import java.net.ServerSocket;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper;
import org.apache.flink.streaming.api.operators.collect.utils.CollectTestUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.class */
class CollectSinkFunctionTest {
    private static final TypeSerializer<Integer> serializer = IntSerializer.INSTANCE;
    private CollectSinkFunctionTestWrapper<Integer> functionWrapper;

    CollectSinkFunctionTest() {
    }

    @BeforeEach
    void before() throws Exception {
        this.functionWrapper = new CollectSinkFunctionTestWrapper<>(serializer, 12);
    }

    @AfterEach
    void after() throws Exception {
        this.functionWrapper.closeWrapper();
    }

    @Test
    void testIncreasingToken() throws Exception {
        this.functionWrapper.openFunction();
        for (int i = 0; i < 6; i++) {
            this.functionWrapper.invoke(Integer.valueOf(i));
        }
        String initializeVersion = initializeVersion();
        assertResponseEquals(this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 0L), initializeVersion, 0L, Arrays.asList(0, 1, 2));
        assertResponseEquals(this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 4L), initializeVersion, 0L, Arrays.asList(4, 5));
        assertResponseEquals(this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 6L), initializeVersion, 0L, Collections.emptyList());
        this.functionWrapper.closeFunctionNormally();
    }

    @Test
    void testDuplicatedToken() throws Exception {
        this.functionWrapper.openFunction();
        for (int i = 0; i < 6; i++) {
            this.functionWrapper.invoke(Integer.valueOf(i));
        }
        String initializeVersion = initializeVersion();
        assertResponseEquals(this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 0L), initializeVersion, 0L, Arrays.asList(0, 1, 2));
        assertResponseEquals(this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 4L), initializeVersion, 0L, Arrays.asList(4, 5));
        assertResponseEquals(this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 4L), initializeVersion, 0L, Arrays.asList(4, 5));
        this.functionWrapper.closeFunctionNormally();
    }

    @Test
    void testInvalidToken() throws Exception {
        this.functionWrapper.openFunction();
        for (int i = 0; i < 6; i++) {
            this.functionWrapper.invoke(Integer.valueOf(i));
        }
        String initializeVersion = initializeVersion();
        this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 4L);
        assertResponseEquals(this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 3L), initializeVersion, 0L, Collections.emptyList());
        this.functionWrapper.closeFunctionNormally();
    }

    @Test
    void testInvalidVersion() throws Exception {
        this.functionWrapper.openFunction();
        for (int i = 0; i < 6; i++) {
            this.functionWrapper.invoke(Integer.valueOf(i));
        }
        assertResponseEquals(this.functionWrapper.sendRequestAndGetResponse("invalid version", 0L), initializeVersion(), 0L, Collections.emptyList());
        this.functionWrapper.closeFunctionNormally();
    }

    @Test
    void testConfiguredPortIsUsed() throws Exception {
        ServerSocket serverSocket = new ServerSocket(0);
        Throwable th = null;
        try {
            this.functionWrapper.getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration().set(TaskManagerOptions.COLLECT_PORT, Integer.valueOf(serverSocket.getLocalPort()));
            Assertions.assertThatThrownBy(() -> {
                this.functionWrapper.openFunction();
            }).isInstanceOf(BindException.class).hasMessageContaining("Address already in use");
            if (serverSocket != null) {
                if (0 == 0) {
                    serverSocket.close();
                    return;
                }
                try {
                    serverSocket.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    serverSocket.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testCheckpoint() throws Exception {
        this.functionWrapper.openFunctionWithState();
        for (int i = 0; i < 2; i++) {
            this.functionWrapper.invoke(Integer.valueOf(i));
        }
        String initializeVersion = initializeVersion();
        assertResponseEquals(this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 0L), initializeVersion, 0L, Arrays.asList(0, 1));
        for (int i2 = 2; i2 < 6; i2++) {
            this.functionWrapper.invoke(Integer.valueOf(i2));
        }
        assertResponseEquals(this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 3L), initializeVersion, 0L, Arrays.asList(3, 4, 5));
        this.functionWrapper.checkpointFunction(1L);
        assertResponseEquals(this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 4L), initializeVersion, 0L, Arrays.asList(4, 5));
        this.functionWrapper.checkpointComplete(1L);
        assertResponseEquals(this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 4L), initializeVersion, 3L, Arrays.asList(4, 5));
        this.functionWrapper.closeFunctionNormally();
    }

    @Test
    void testRestart() throws Exception {
        this.functionWrapper.openFunctionWithState();
        for (int i = 0; i < 3; i++) {
            this.functionWrapper.invoke(Integer.valueOf(i));
        }
        String initializeVersion = initializeVersion();
        this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 1L);
        this.functionWrapper.checkpointFunction(1L);
        this.functionWrapper.checkpointComplete(1L);
        assertResponseEquals(this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 1L), initializeVersion, 1L, Arrays.asList(1, 2));
        for (int i2 = 3; i2 < 6; i2++) {
            this.functionWrapper.invoke(Integer.valueOf(i2));
        }
        assertResponseEquals(this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 2L), initializeVersion, 1L, Arrays.asList(2, 3, 4));
        this.functionWrapper.closeFunctionAbnormally();
        this.functionWrapper.openFunctionWithState();
        String initializeVersion2 = initializeVersion();
        assertResponseEquals(this.functionWrapper.sendRequestAndGetResponse(initializeVersion2, 1L), initializeVersion2, 1L, Arrays.asList(1, 2));
        for (int i3 = 6; i3 < 9; i3++) {
            this.functionWrapper.invoke(Integer.valueOf(i3));
        }
        assertResponseEquals(this.functionWrapper.sendRequestAndGetResponse(initializeVersion2, 2L), initializeVersion2, 1L, Arrays.asList(2, 6, 7));
        this.functionWrapper.closeFunctionNormally();
    }

    @Test
    void testAccumulatorResultWithoutCheckpoint() throws Exception {
        testAccumulatorResultWithoutCheckpoint(2, Arrays.asList(2, 3, 4, 5));
    }

    @Test
    void testEmptyAccumulatorResult() throws Exception {
        testAccumulatorResultWithoutCheckpoint(6, Collections.emptyList());
    }

    private void testAccumulatorResultWithoutCheckpoint(int i, List<Integer> list) throws Exception {
        this.functionWrapper.openFunction();
        for (int i2 = 0; i2 < 6; i2++) {
            this.functionWrapper.invoke(Integer.valueOf(i2));
        }
        String initializeVersion = initializeVersion();
        this.functionWrapper.sendRequestAndGetResponse(initializeVersion, i);
        this.functionWrapper.closeFunctionNormally();
        CollectTestUtils.assertAccumulatorResult(this.functionWrapper.getAccumulatorResults(), i, initializeVersion, 0L, list, serializer);
    }

    @Test
    void testAccumulatorResultWithCheckpoint() throws Exception {
        this.functionWrapper.openFunctionWithState();
        for (int i = 0; i < 6; i++) {
            this.functionWrapper.invoke(Integer.valueOf(i));
        }
        String initializeVersion = initializeVersion();
        this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 3L);
        this.functionWrapper.checkpointFunction(1L);
        this.functionWrapper.checkpointComplete(1L);
        for (int i2 = 6; i2 < 9; i2++) {
            this.functionWrapper.invoke(Integer.valueOf(i2));
        }
        this.functionWrapper.sendRequestAndGetResponse(initializeVersion, 5L);
        this.functionWrapper.closeFunctionNormally();
        CollectTestUtils.assertAccumulatorResult(this.functionWrapper.getAccumulatorResults(), 5L, initializeVersion, 3L, Arrays.asList(5, 6, 7, 8), serializer);
    }

    private String initializeVersion() throws Exception {
        return this.functionWrapper.sendRequestAndGetResponse("", 0L).getVersion();
    }

    private void assertResponseEquals(CollectCoordinationResponse collectCoordinationResponse, String str, long j, List<Integer> list) throws IOException {
        CollectTestUtils.assertResponseEquals(collectCoordinationResponse, str, j, list, serializer);
    }
}
