package org.apache.flink.streaming.api.operators.collect;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.FlinkCompletableFutureAssert;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.streaming.api.operators.collect.utils.CollectTestUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.CompletableFutureAssert;
import org.junit.jupiter.api.Test;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.class */
public class CollectSinkOperatorCoordinatorTest {
    private static final TypeSerializer<Row> serializer = new RowTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}).createSerializer(new ExecutionConfig());

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest$TestingSinkFunction.class */
    private static class TestingSinkFunction implements AutoCloseable {
        static final String DEFAULT_SINK_FUNCTION_RESPONSE_VERSION = "version";
        static final int DEFAULT_SINK_FUNCTION_RESPONSE_OFFSET = 2;
        private final ServerSocket serverSocket;

        @Nullable
        private CompletableFuture<SocketConnection> connectionFuture;

        public static TestingSinkFunction createSinkFunctionAndInitializeCoordinator(CollectSinkOperatorCoordinator collectSinkOperatorCoordinator) throws Exception {
            TestingSinkFunction testingSinkFunction = new TestingSinkFunction();
            testingSinkFunction.registerSinkFunctionWith(collectSinkOperatorCoordinator);
            return testingSinkFunction;
        }

        public static TestingSinkFunction createTestingSinkFunctionWithoutConnection() throws IOException {
            return new TestingSinkFunction(serverSocket -> {
                return null;
            });
        }

        public TestingSinkFunction() throws IOException {
            this(TestingSinkFunction::acceptSocketAsync);
        }

        private TestingSinkFunction(Function<ServerSocket, CompletableFuture<SocketConnection>> function) throws IOException {
            this.serverSocket = new ServerSocket(0);
            this.connectionFuture = function.apply(this.serverSocket);
        }

        private static CompletableFuture<SocketConnection> acceptSocketAsync(ServerSocket serverSocket) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return new SocketConnection(NetUtils.acceptWithoutTimeout(serverSocket));
                } catch (IOException e) {
                    throw new CompletionException(e);
                }
            });
        }

        private CompletableFuture<SocketConnection> getConnectionFuture() {
            Preconditions.checkState(this.connectionFuture != null, "The accepting Socket is already closed. The calling operation is not possible anymore.");
            return this.connectionFuture;
        }

        private InetSocketAddress getSocketAddress() {
            return new InetSocketAddress(InetAddress.getLoopbackAddress(), this.serverSocket.getLocalPort());
        }

        public void registerSinkFunctionWith(CollectSinkOperatorCoordinator collectSinkOperatorCoordinator) throws Exception {
            collectSinkOperatorCoordinator.handleEventFromOperator(0, 0, new CollectSinkAddressEvent(getSocketAddress()));
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            closeAcceptingSocket();
            this.serverSocket.close();
        }

        public void closeAcceptingSocket() throws Exception {
            if (this.connectionFuture != null) {
                this.connectionFuture.get().close();
                this.connectionFuture = null;
            }
        }

        public void waitForConnectionToBeEstablished() throws ExecutionException, InterruptedException {
            getConnectionFuture().get();
        }

        public void handleRequest(List<Row> list) {
            handleRequest(DEFAULT_SINK_FUNCTION_RESPONSE_VERSION, DEFAULT_SINK_FUNCTION_RESPONSE_OFFSET, list);
        }

        public void handleRequest(String str, int i, List<Row> list) {
            handleRequestAsync(str, i, CompletableFuture.completedFuture(list)).join();
        }

        public CompletableFuture<Void> handleRequestWithoutResponse() {
            return internalConnectWithRequestHandlingAsync().thenApply(socketConnection -> {
                return null;
            });
        }

        private CompletableFuture<SocketConnection> internalConnectWithRequestHandlingAsync() {
            return getConnectionFuture().thenApply(socketConnection -> {
                try {
                    new CollectCoordinationRequest(socketConnection.getDataInputView());
                    return socketConnection;
                } catch (IOException e) {
                    throw new CompletionException(e);
                }
            });
        }

        public CompletableFuture<Void> handleRequestAsync(String str, int i, CompletableFuture<List<Row>> completableFuture) {
            return internalConnectWithRequestHandlingAsync().thenCombineAsync((CompletionStage) completableFuture, (socketConnection, list) -> {
                if (socketConnection == null) {
                    throw new CompletionException(new IllegalStateException("No SocketConnection established."));
                }
                try {
                    new CollectCoordinationResponse(str, i, CollectTestUtils.toBytesList(list, CollectSinkOperatorCoordinatorTest.serializer)).serialize(socketConnection.getDataOutputView());
                    return null;
                } catch (IOException e) {
                    throw new CompletionException(e);
                }
            });
        }
    }

    CollectSinkOperatorCoordinatorTest() {
    }

    @Test
    void testNoAddress() throws Exception {
        CollectSinkOperatorCoordinator collectSinkOperatorCoordinator = new CollectSinkOperatorCoordinator();
        Throwable th = null;
        try {
            collectSinkOperatorCoordinator.start();
            assertEmptyResponseGeneratedFromCoordinator(collectSinkOperatorCoordinator.handleCoordinationRequest(createRequestForCoordinatorGeneratedResponse("version")), "version");
            if (collectSinkOperatorCoordinator != null) {
                if (0 == 0) {
                    collectSinkOperatorCoordinator.close();
                    return;
                }
                try {
                    collectSinkOperatorCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (collectSinkOperatorCoordinator != null) {
                if (0 != 0) {
                    try {
                        collectSinkOperatorCoordinator.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    collectSinkOperatorCoordinator.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testClosingTheCoordinatorAfterRequestWasReceivedBySinkFunction() throws Exception {
        TestingSinkFunction testingSinkFunction = new TestingSinkFunction();
        Throwable th = null;
        try {
            CollectSinkOperatorCoordinator collectSinkOperatorCoordinator = new CollectSinkOperatorCoordinator();
            collectSinkOperatorCoordinator.start();
            testingSinkFunction.registerSinkFunctionWith(collectSinkOperatorCoordinator);
            CompletableFuture<Void> handleRequestWithoutResponse = testingSinkFunction.handleRequestWithoutResponse();
            ((CompletableFutureAssert) Assertions.assertThat(handleRequestWithoutResponse).as("The SocketServer waits for the request to be sent.", new Object[0])).isNotDone();
            CompletableFuture handleCoordinationRequest = collectSinkOperatorCoordinator.handleCoordinationRequest(createRequestForCoordinatorGeneratedResponse("version"));
            ((CompletableFutureAssert) Assertions.assertThat(handleCoordinationRequest).as("The response shouldn't complete because the SinkFunction doesn't send any response.", new Object[0])).isNotDone();
            ((FlinkCompletableFutureAssert) FlinkAssertions.assertThatFuture(handleRequestWithoutResponse).as("The SocketServer should eventually have handled the request without sending a response back.", new Object[0])).eventuallySucceeds();
            collectSinkOperatorCoordinator.close();
            assertEmptyResponseGeneratedFromCoordinator(handleCoordinationRequest, "version");
            if (testingSinkFunction != null) {
                if (0 == 0) {
                    testingSinkFunction.close();
                    return;
                }
                try {
                    testingSinkFunction.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testingSinkFunction != null) {
                if (0 != 0) {
                    try {
                        testingSinkFunction.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testingSinkFunction.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testSuccessfulResponse() throws Exception {
        CollectSinkOperatorCoordinator collectSinkOperatorCoordinator = new CollectSinkOperatorCoordinator();
        Throwable th = null;
        try {
            TestingSinkFunction createSinkFunctionAndInitializeCoordinator = TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(collectSinkOperatorCoordinator);
            Throwable th2 = null;
            try {
                try {
                    collectSinkOperatorCoordinator.start();
                    List<Row> asList = Arrays.asList(Row.of(new Object[]{1, "aaa"}), Row.of(new Object[]{2, "bbb"}));
                    CompletableFuture handleCoordinationRequest = collectSinkOperatorCoordinator.handleCoordinationRequest(createRequestForSinkFunctionGeneratedResponse());
                    Assertions.assertThat(handleCoordinationRequest).isNotDone();
                    createSinkFunctionAndInitializeCoordinator.handleRequest(asList);
                    assertResponseWithDefaultMetadataFromSinkFunction(handleCoordinationRequest, asList);
                    if (createSinkFunctionAndInitializeCoordinator != null) {
                        if (0 != 0) {
                            try {
                                createSinkFunctionAndInitializeCoordinator.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createSinkFunctionAndInitializeCoordinator.close();
                        }
                    }
                    if (collectSinkOperatorCoordinator != null) {
                        if (0 == 0) {
                            collectSinkOperatorCoordinator.close();
                            return;
                        }
                        try {
                            collectSinkOperatorCoordinator.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (createSinkFunctionAndInitializeCoordinator != null) {
                    if (th2 != null) {
                        try {
                            createSinkFunctionAndInitializeCoordinator.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        createSinkFunctionAndInitializeCoordinator.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (collectSinkOperatorCoordinator != null) {
                if (0 != 0) {
                    try {
                        collectSinkOperatorCoordinator.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    collectSinkOperatorCoordinator.close();
                }
            }
            throw th8;
        }
    }

    @Test
    void testClosingTheServerSocketOfTheSinkFunction() throws Exception {
        CollectSinkOperatorCoordinator collectSinkOperatorCoordinator = new CollectSinkOperatorCoordinator();
        Throwable th = null;
        try {
            collectSinkOperatorCoordinator.start();
            TestingSinkFunction createSinkFunctionAndInitializeCoordinator = TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(collectSinkOperatorCoordinator);
            CompletableFuture handleCoordinationRequest = collectSinkOperatorCoordinator.handleCoordinationRequest(createRequestForCoordinatorGeneratedResponse("version"));
            Assertions.assertThat(handleCoordinationRequest).isNotDone();
            FlinkAssertions.assertThatFuture(createSinkFunctionAndInitializeCoordinator.handleRequestWithoutResponse()).eventuallySucceeds();
            createSinkFunctionAndInitializeCoordinator.close();
            assertEmptyResponseGeneratedFromSinkFunction(handleCoordinationRequest);
            if (collectSinkOperatorCoordinator != null) {
                if (0 == 0) {
                    collectSinkOperatorCoordinator.close();
                    return;
                }
                try {
                    collectSinkOperatorCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (collectSinkOperatorCoordinator != null) {
                if (0 != 0) {
                    try {
                        collectSinkOperatorCoordinator.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    collectSinkOperatorCoordinator.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testClosingTheListeningSocketInTheSinkFunction() throws Exception {
        CollectSinkOperatorCoordinator collectSinkOperatorCoordinator = new CollectSinkOperatorCoordinator();
        Throwable th = null;
        try {
            collectSinkOperatorCoordinator.start();
            TestingSinkFunction createSinkFunctionAndInitializeCoordinator = TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(collectSinkOperatorCoordinator);
            Throwable th2 = null;
            try {
                try {
                    CompletableFuture handleCoordinationRequest = collectSinkOperatorCoordinator.handleCoordinationRequest(createRequestForCoordinatorGeneratedResponse("version"));
                    Assertions.assertThat(handleCoordinationRequest).isNotDone();
                    createSinkFunctionAndInitializeCoordinator.handleRequestWithoutResponse();
                    createSinkFunctionAndInitializeCoordinator.waitForConnectionToBeEstablished();
                    createSinkFunctionAndInitializeCoordinator.closeAcceptingSocket();
                    assertEmptyResponseGeneratedFromCoordinator(handleCoordinationRequest, "version");
                    if (createSinkFunctionAndInitializeCoordinator != null) {
                        if (0 != 0) {
                            try {
                                createSinkFunctionAndInitializeCoordinator.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createSinkFunctionAndInitializeCoordinator.close();
                        }
                    }
                    if (collectSinkOperatorCoordinator != null) {
                        if (0 == 0) {
                            collectSinkOperatorCoordinator.close();
                            return;
                        }
                        try {
                            collectSinkOperatorCoordinator.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (createSinkFunctionAndInitializeCoordinator != null) {
                    if (th2 != null) {
                        try {
                            createSinkFunctionAndInitializeCoordinator.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        createSinkFunctionAndInitializeCoordinator.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (collectSinkOperatorCoordinator != null) {
                if (0 != 0) {
                    try {
                        collectSinkOperatorCoordinator.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    collectSinkOperatorCoordinator.close();
                }
            }
            throw th8;
        }
    }

    @Test
    void testCoordinatorNotConnectingToTheSinkFunctionSocket() throws Exception {
        TestingSinkFunction createTestingSinkFunctionWithoutConnection = TestingSinkFunction.createTestingSinkFunctionWithoutConnection();
        Throwable th = null;
        try {
            CollectSinkOperatorCoordinator collectSinkOperatorCoordinator = new CollectSinkOperatorCoordinator();
            collectSinkOperatorCoordinator.start();
            createTestingSinkFunctionWithoutConnection.registerSinkFunctionWith(collectSinkOperatorCoordinator);
            CompletableFuture handleCoordinationRequest = collectSinkOperatorCoordinator.handleCoordinationRequest(createRequestForCoordinatorGeneratedResponse("version"));
            Assertions.assertThat(handleCoordinationRequest).isNotDone();
            collectSinkOperatorCoordinator.close();
            assertEmptyResponseGeneratedFromCoordinator(handleCoordinationRequest, "version");
            if (createTestingSinkFunctionWithoutConnection != null) {
                if (0 == 0) {
                    createTestingSinkFunctionWithoutConnection.close();
                    return;
                }
                try {
                    createTestingSinkFunctionWithoutConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTestingSinkFunctionWithoutConnection != null) {
                if (0 != 0) {
                    try {
                        createTestingSinkFunctionWithoutConnection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTestingSinkFunctionWithoutConnection.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testReconnectAfterSinkFunctionSocketDisconnect() throws Exception {
        CollectSinkOperatorCoordinator collectSinkOperatorCoordinator = new CollectSinkOperatorCoordinator();
        Throwable th = null;
        try {
            collectSinkOperatorCoordinator.start();
            TestingSinkFunction createSinkFunctionAndInitializeCoordinator = TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(collectSinkOperatorCoordinator);
            CompletableFuture handleCoordinationRequest = collectSinkOperatorCoordinator.handleCoordinationRequest(createRequestForCoordinatorGeneratedResponse("version"));
            createSinkFunctionAndInitializeCoordinator.waitForConnectionToBeEstablished();
            collectSinkOperatorCoordinator.executionAttemptFailed(0, 0, (Throwable) null);
            createSinkFunctionAndInitializeCoordinator.closeAcceptingSocket();
            assertEmptyResponseGeneratedFromCoordinator(handleCoordinationRequest, "version");
            assertEmptyResponseGeneratedFromCoordinator(collectSinkOperatorCoordinator.handleCoordinationRequest(createRequestForCoordinatorGeneratedResponse("another-version")), "another-version");
            TestingSinkFunction createSinkFunctionAndInitializeCoordinator2 = TestingSinkFunction.createSinkFunctionAndInitializeCoordinator(collectSinkOperatorCoordinator);
            CompletableFuture handleCoordinationRequest2 = collectSinkOperatorCoordinator.handleCoordinationRequest(createRequestForSinkFunctionGeneratedResponse());
            List<Row> asList = Arrays.asList(Row.of(new Object[]{1, "aaa"}), Row.of(new Object[]{2, "bbb"}));
            createSinkFunctionAndInitializeCoordinator2.handleRequest(asList);
            assertResponseWithDefaultMetadataFromSinkFunction(handleCoordinationRequest2, asList);
            createSinkFunctionAndInitializeCoordinator2.close();
            if (collectSinkOperatorCoordinator != null) {
                if (0 == 0) {
                    collectSinkOperatorCoordinator.close();
                    return;
                }
                try {
                    collectSinkOperatorCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (collectSinkOperatorCoordinator != null) {
                if (0 != 0) {
                    try {
                        collectSinkOperatorCoordinator.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    collectSinkOperatorCoordinator.close();
                }
            }
            throw th3;
        }
    }

    private static CoordinationRequest createRequestForSinkFunctionGeneratedResponse() {
        return createRequestForCoordinatorGeneratedResponse("random-version");
    }

    private static CoordinationRequest createRequestForCoordinatorGeneratedResponse(String str) {
        return new CollectCoordinationRequest(str, 123L);
    }

    private static void assertEmptyResponseGeneratedFromSinkFunction(CompletableFuture<CoordinationResponse> completableFuture) throws Exception {
        assertEmptyResponseGeneratedFromCoordinator(completableFuture, "version");
    }

    private static void assertEmptyResponseGeneratedFromCoordinator(CompletableFuture<CoordinationResponse> completableFuture, String str) throws Exception {
        assertResponse(completableFuture, str, -1L, Collections.emptyList());
    }

    private static void assertResponseWithDefaultMetadataFromSinkFunction(CompletableFuture<CoordinationResponse> completableFuture, List<Row> list) throws Exception {
        assertResponse(completableFuture, "version", 2L, list);
    }

    private static void assertResponse(CompletableFuture<CoordinationResponse> completableFuture, String str, long j, List<Row> list) throws Exception {
        CollectCoordinationResponse collectCoordinationResponse = completableFuture.get();
        Assertions.assertThat(collectCoordinationResponse.getVersion()).isEqualTo(str);
        Assertions.assertThat(collectCoordinationResponse.getLastCheckpointedOffset()).isEqualTo(j);
        List results = collectCoordinationResponse.getResults(serializer);
        Assertions.assertThat(results).hasSize(list.size());
        for (int i = 0; i < results.size(); i++) {
            Row row = list.get(i);
            Row row2 = (Row) results.get(i);
            Assertions.assertThat(row2.getArity()).isEqualTo(row.getArity());
            for (int i2 = 0; i2 < row2.getArity(); i2++) {
                Assertions.assertThat(row2.getField(i2)).isEqualTo(row.getField(i2));
            }
        }
    }
}
