package org.apache.flink.streaming.api.functions.source;

import java.io.EOFException;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.commons.io.IOUtils;
import org.apache.flink.streaming.api.functions.source.legacy.SocketTextStreamFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.NetUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.class */
class SocketTextStreamFunctionTest {
    private static final String LOCALHOST = "127.0.0.1";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest$SocketSourceThread.class */
    public static class SocketSourceThread extends Thread {
        private final Object sync = new Object();
        private final SocketTextStreamFunction socketSource;
        private final String[] expectedData;
        private volatile Throwable error;
        private volatile int numElementsReceived;
        private volatile boolean canceled;
        private volatile boolean done;

        public SocketSourceThread(SocketTextStreamFunction socketTextStreamFunction, String... strArr) {
            this.socketSource = socketTextStreamFunction;
            this.expectedData = strArr;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    this.socketSource.run(new SourceFunction.SourceContext<String>() { // from class: org.apache.flink.streaming.api.functions.source.SocketTextStreamFunctionTest.SocketSourceThread.1
                        private final Object lock = new Object();

                        public void collect(String str) {
                            int i = SocketSourceThread.this.numElementsReceived;
                            synchronized (SocketSourceThread.this.sync) {
                                SocketSourceThread.this.numElementsReceived++;
                                SocketSourceThread.this.sync.notifyAll();
                            }
                            if (SocketSourceThread.this.expectedData == null || SocketSourceThread.this.expectedData.length <= i) {
                                return;
                            }
                            Assertions.assertThat(str).isEqualTo(SocketSourceThread.this.expectedData[i]);
                        }

                        public void collectWithTimestamp(String str, long j) {
                            collect(str);
                        }

                        public void emitWatermark(Watermark watermark) {
                            throw new UnsupportedOperationException();
                        }

                        public void markAsTemporarilyIdle() {
                            throw new UnsupportedOperationException();
                        }

                        public Object getCheckpointLock() {
                            return this.lock;
                        }

                        public void close() {
                        }
                    });
                    synchronized (this.sync) {
                        this.done = true;
                        this.sync.notifyAll();
                    }
                } catch (Throwable th) {
                    synchronized (this.sync) {
                        if (!this.canceled) {
                            this.error = th;
                        }
                        this.sync.notifyAll();
                        synchronized (this.sync) {
                            this.done = true;
                            this.sync.notifyAll();
                        }
                    }
                }
            } catch (Throwable th2) {
                synchronized (this.sync) {
                    this.done = true;
                    this.sync.notifyAll();
                    throw th2;
                }
            }
        }

        public void cancel() {
            synchronized (this.sync) {
                this.canceled = true;
                this.socketSource.cancel();
                interrupt();
            }
        }

        public void waitForNumElements(int i) throws InterruptedException {
            synchronized (this.sync) {
                while (this.error == null && !this.canceled && !this.done && this.numElementsReceived < i) {
                    this.sync.wait();
                }
                if (this.error != null) {
                    throw new RuntimeException("Error in source thread", this.error);
                }
                if (this.canceled) {
                    throw new RuntimeException("canceled");
                }
                if (this.done) {
                    throw new RuntimeException("Exited cleanly before expected number of elements");
                }
            }
        }

        public void waitUntilDone() throws InterruptedException {
            join();
            if (this.error != null) {
                throw new RuntimeException("Error in source thread", this.error);
            }
        }
    }

    SocketTextStreamFunctionTest() {
    }

    @Test
    void testSocketSourceSimpleOutput() throws Exception {
        ServerSocket serverSocket = new ServerSocket(0);
        Socket socket = null;
        try {
            SocketSourceThread socketSourceThread = new SocketSourceThread(new SocketTextStreamFunction(LOCALHOST, serverSocket.getLocalPort(), "\n", 0L), "test1", "check");
            socketSourceThread.start();
            socket = NetUtils.acceptWithoutTimeout(serverSocket);
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(socket.getOutputStream());
            outputStreamWriter.write("test1\n");
            outputStreamWriter.write("check\n");
            outputStreamWriter.flush();
            socketSourceThread.waitForNumElements(2);
            socketSourceThread.cancel();
            socketSourceThread.interrupt();
            socketSourceThread.waitUntilDone();
            socket.close();
            if (socket != null) {
                IOUtils.closeQuietly(socket);
            }
            IOUtils.closeQuietly(serverSocket);
        } catch (Throwable th) {
            if (socket != null) {
                IOUtils.closeQuietly(socket);
            }
            IOUtils.closeQuietly(serverSocket);
            throw th;
        }
    }

    @Test
    void testExitNoRetries() throws Exception {
        ServerSocket serverSocket = new ServerSocket(0);
        Socket socket = null;
        try {
            SocketSourceThread socketSourceThread = new SocketSourceThread(new SocketTextStreamFunction(LOCALHOST, serverSocket.getLocalPort(), "\n", 0L), new String[0]);
            socketSourceThread.start();
            socket = NetUtils.acceptWithoutTimeout(serverSocket);
            socket.close();
            try {
                socketSourceThread.waitUntilDone();
            } catch (Exception e) {
                Assertions.assertThat(e).hasCauseInstanceOf(EOFException.class);
            }
            if (socket != null) {
                IOUtils.closeQuietly(socket);
            }
            IOUtils.closeQuietly(serverSocket);
        } catch (Throwable th) {
            if (socket != null) {
                IOUtils.closeQuietly(socket);
            }
            IOUtils.closeQuietly(serverSocket);
            throw th;
        }
    }

    @Test
    void testSocketSourceOutputWithRetries() throws Exception {
        ServerSocket serverSocket = new ServerSocket(0);
        Socket socket = null;
        try {
            SocketSourceThread socketSourceThread = new SocketSourceThread(new SocketTextStreamFunction(LOCALHOST, serverSocket.getLocalPort(), "\n", 10L, 100L), "test1", "check");
            socketSourceThread.start();
            NetUtils.acceptWithoutTimeout(serverSocket).close();
            Socket acceptWithoutTimeout = NetUtils.acceptWithoutTimeout(serverSocket);
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(acceptWithoutTimeout.getOutputStream());
            outputStreamWriter.write("test1\n");
            outputStreamWriter.close();
            acceptWithoutTimeout.close();
            NetUtils.acceptWithoutTimeout(serverSocket).close();
            socket = NetUtils.acceptWithoutTimeout(serverSocket);
            OutputStreamWriter outputStreamWriter2 = new OutputStreamWriter(socket.getOutputStream());
            outputStreamWriter2.write("check\n");
            outputStreamWriter2.flush();
            socketSourceThread.waitForNumElements(2);
            socketSourceThread.cancel();
            socketSourceThread.waitUntilDone();
            if (socket != null) {
                IOUtils.closeQuietly(socket);
            }
            IOUtils.closeQuietly(serverSocket);
        } catch (Throwable th) {
            if (socket != null) {
                IOUtils.closeQuietly(socket);
            }
            IOUtils.closeQuietly(serverSocket);
            throw th;
        }
    }

    @Test
    void testSocketSourceOutputInfiniteRetries() throws Exception {
        ServerSocket serverSocket = new ServerSocket(0);
        Socket socket = null;
        try {
            SocketSourceThread socketSourceThread = new SocketSourceThread(new SocketTextStreamFunction(LOCALHOST, serverSocket.getLocalPort(), "\n", -1L, 100L), "test1", "check");
            socketSourceThread.start();
            NetUtils.acceptWithoutTimeout(serverSocket).close();
            Socket acceptWithoutTimeout = NetUtils.acceptWithoutTimeout(serverSocket);
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(acceptWithoutTimeout.getOutputStream());
            outputStreamWriter.write("test1\n");
            outputStreamWriter.close();
            acceptWithoutTimeout.close();
            NetUtils.acceptWithoutTimeout(serverSocket).close();
            socket = NetUtils.acceptWithoutTimeout(serverSocket);
            OutputStreamWriter outputStreamWriter2 = new OutputStreamWriter(socket.getOutputStream());
            outputStreamWriter2.write("check\n");
            outputStreamWriter2.flush();
            socketSourceThread.waitForNumElements(2);
            socketSourceThread.cancel();
            socketSourceThread.waitUntilDone();
            if (socket != null) {
                IOUtils.closeQuietly(socket);
            }
            IOUtils.closeQuietly(serverSocket);
        } catch (Throwable th) {
            if (socket != null) {
                IOUtils.closeQuietly(socket);
            }
            IOUtils.closeQuietly(serverSocket);
            throw th;
        }
    }

    @Test
    void testSocketSourceOutputAcrossRetries() throws Exception {
        ServerSocket serverSocket = new ServerSocket(0);
        Socket socket = null;
        try {
            SocketSourceThread socketSourceThread = new SocketSourceThread(new SocketTextStreamFunction(LOCALHOST, serverSocket.getLocalPort(), "\n", 10L, 100L), "test1", "check1", "check2");
            socketSourceThread.start();
            NetUtils.acceptWithoutTimeout(serverSocket).close();
            Socket acceptWithoutTimeout = NetUtils.acceptWithoutTimeout(serverSocket);
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(acceptWithoutTimeout.getOutputStream());
            outputStreamWriter.write("te");
            outputStreamWriter.close();
            acceptWithoutTimeout.close();
            NetUtils.acceptWithoutTimeout(serverSocket).close();
            socket = NetUtils.acceptWithoutTimeout(serverSocket);
            OutputStreamWriter outputStreamWriter2 = new OutputStreamWriter(socket.getOutputStream());
            outputStreamWriter2.write("st1\n");
            outputStreamWriter2.write("check1\n");
            outputStreamWriter2.write("check2\n");
            outputStreamWriter2.flush();
            socketSourceThread.waitForNumElements(2);
            socketSourceThread.cancel();
            socketSourceThread.waitUntilDone();
            if (socket != null) {
                IOUtils.closeQuietly(socket);
            }
            IOUtils.closeQuietly(serverSocket);
        } catch (Throwable th) {
            if (socket != null) {
                IOUtils.closeQuietly(socket);
            }
            IOUtils.closeQuietly(serverSocket);
            throw th;
        }
    }
}
