/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.EOFException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.security.MessageDigest;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobCachePutTest;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobKeyTest;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobServerConnection;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.TestingBlobUtils;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class BlobClientTest {
    private static final int TEST_BUFFER_SIZE = 17000;
    static BlobServer blobServer;
    static Configuration clientConfig;
    @TempDir
    static java.nio.file.Path tempDir;

    BlobClientTest() {
    }

    @BeforeAll
    static void startServer() throws IOException {
        blobServer = TestingBlobUtils.createServer(tempDir);
        blobServer.start();
        clientConfig = new Configuration();
    }

    @AfterAll
    static void stopServer() throws IOException {
        if (blobServer != null) {
            blobServer.close();
        }
    }

    private static byte[] createTestBuffer() {
        byte[] buf = new byte[17000];
        for (int i = 0; i < buf.length; ++i) {
            buf[i] = (byte)(i % 128);
        }
        return buf;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static byte[] prepareTestFile(File file) throws IOException {
        MessageDigest md = BlobUtils.createMessageDigest();
        byte[] buf = new byte[17000];
        for (int i = 0; i < buf.length; ++i) {
            buf[i] = (byte)(i % 128);
        }
        try (FileOutputStream fos = null;){
            fos = new FileOutputStream(file);
            for (int i = 0; i < 20; ++i) {
                fos.write(buf);
                md.update(buf);
            }
        }
        return md.digest();
    }

    static void validateGetAndClose(InputStream actualInputStream, byte[] expectedBuf) throws IOException {
        try {
            byte[] receivedBuffer = new byte[expectedBuf.length];
            int bytesReceived = 0;
            while (true) {
                int read;
                if ((read = actualInputStream.read(receivedBuffer, bytesReceived, receivedBuffer.length - bytesReceived)) < 0) {
                    throw new EOFException();
                }
                if ((bytesReceived += read) != receivedBuffer.length) continue;
                Assertions.assertThat((int)actualInputStream.read()).isEqualTo(-1);
                Assertions.assertThat((byte[])receivedBuffer).isEqualTo((Object)expectedBuf);
                return;
            }
        }
        finally {
            actualInputStream.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void validateGetAndClose(InputStream actualInputStream, InputStream expectedInputStream) throws IOException {
        try {
            int r1;
            do {
                r1 = actualInputStream.read();
                int r2 = expectedInputStream.read();
                Assertions.assertThat((int)r1).isEqualTo(r2);
            } while (r1 >= 0);
        }
        finally {
            actualInputStream.close();
            expectedInputStream.close();
        }
    }

    static void validateGetAndClose(InputStream actualInputStream, File expectedFile) throws IOException {
        BlobClientTest.validateGetAndClose(actualInputStream, Files.newInputStream(expectedFile.toPath(), new OpenOption[0]));
    }

    protected boolean isSSLEnabled() {
        return false;
    }

    @Test
    void testContentAddressableBufferTransientBlob() throws IOException, InterruptedException {
        this.testContentAddressableBuffer(BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testContentAddressableBufferPermantBlob() throws IOException, InterruptedException {
        this.testContentAddressableBuffer(BlobKey.BlobType.PERMANENT_BLOB);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testContentAddressableBuffer(BlobKey.BlobType blobType) throws IOException, InterruptedException {
        BlobClient client = null;
        try {
            byte[] testBuffer = BlobClientTest.createTestBuffer();
            MessageDigest md = BlobUtils.createMessageDigest();
            md.update(testBuffer);
            byte[] digest = md.digest();
            InetSocketAddress serverAddress = new InetSocketAddress(this.getBlobServer().getAddress().getHostName(), this.getBlobServer().getPort());
            client = new BlobClient(serverAddress, this.getBlobClientConfig());
            JobID jobId = new JobID();
            BlobKey receivedKey1 = null;
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                receivedKey1 = client.putBuffer(null, testBuffer, 0, testBuffer.length, blobType);
                Assertions.assertThat((byte[])receivedKey1.getHash()).isEqualTo((Object)digest);
            }
            BlobKey receivedKey2 = client.putBuffer(jobId, testBuffer, 0, testBuffer.length, blobType);
            Assertions.assertThat((byte[])receivedKey2.getHash()).isEqualTo((Object)digest);
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                BlobKeyTest.verifyKeyDifferentHashEquals(receivedKey1, receivedKey2);
            }
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                BlobClientTest.validateGetAndClose(client.getInternal(null, receivedKey1), testBuffer);
                BlobCachePutTest.verifyDeletedEventually(this.getBlobServer(), null, receivedKey1);
            }
            BlobClientTest.validateGetAndClose(client.getInternal(jobId, receivedKey2), testBuffer);
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                BlobCachePutTest.verifyDeletedEventually(this.getBlobServer(), jobId, receivedKey2);
            }
            BlobClient finalClient1 = client;
            Assertions.assertThatThrownBy(() -> finalClient1.getInternal(null, BlobKey.createKey((BlobKey.BlobType)blobType))).isInstanceOf(IOException.class);
            BlobClient finalClient2 = client = new BlobClient(serverAddress, this.getBlobClientConfig());
            Assertions.assertThatThrownBy(() -> finalClient2.getInternal(jobId, BlobKey.createKey((BlobKey.BlobType)blobType))).isInstanceOf(IOException.class);
        }
        finally {
            if (client != null) {
                try {
                    client.close();
                }
                catch (Throwable throwable) {}
            }
        }
    }

    protected Configuration getBlobClientConfig() {
        return clientConfig;
    }

    protected BlobServer getBlobServer() {
        return blobServer;
    }

    @Test
    void testContentAddressableStreamTransientBlob() throws IOException, InterruptedException {
        this.testContentAddressableStream(BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testContentAddressableStreamPermanentBlob() throws IOException, InterruptedException {
        this.testContentAddressableStream(BlobKey.BlobType.PERMANENT_BLOB);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testContentAddressableStream(BlobKey.BlobType blobType) throws IOException, InterruptedException {
        File testFile = tempDir.resolve("test_file").toFile();
        byte[] digest = BlobClientTest.prepareTestFile(testFile);
        InputStream is = null;
        try (BlobClient client = new BlobClient(new InetSocketAddress(this.getBlobServer().getAddress().getHostName(), this.getBlobServer().getPort()), this.getBlobClientConfig());){
            JobID jobId = new JobID();
            BlobKey receivedKey1 = null;
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                is = Files.newInputStream(testFile.toPath(), new OpenOption[0]);
                receivedKey1 = client.putInputStream(null, is, blobType);
                Assertions.assertThat((byte[])receivedKey1.getHash()).isEqualTo((Object)digest);
            }
            is = Files.newInputStream(testFile.toPath(), new OpenOption[0]);
            BlobKey receivedKey2 = client.putInputStream(jobId, is, blobType);
            is.close();
            is = null;
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                BlobKeyTest.verifyKeyDifferentHashEquals(receivedKey1, receivedKey2);
                BlobClientTest.validateGetAndClose(client.getInternal(null, receivedKey1), testFile);
                BlobCachePutTest.verifyDeletedEventually(this.getBlobServer(), null, receivedKey1);
            }
            BlobClientTest.validateGetAndClose(client.getInternal(jobId, receivedKey2), testFile);
            if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                BlobCachePutTest.verifyDeletedEventually(this.getBlobServer(), jobId, receivedKey2);
            }
        }
        finally {
            if (is != null) {
                try {
                    is.close();
                }
                catch (Throwable throwable) {}
            }
        }
    }

    @Test
    void testGetFailsDuringStreamingNoJobTransientBlob() throws IOException {
        this.testGetFailsDuringStreaming(null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testGetFailsDuringStreamingForJobTransientBlob() throws IOException {
        this.testGetFailsDuringStreaming(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testGetFailsDuringStreamingForJobPermanentBlob() throws IOException {
        this.testGetFailsDuringStreaming(new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testGetFailsDuringStreaming(@Nullable JobID jobId, BlobKey.BlobType blobType) throws IOException {
        ((AbstractBooleanAssert)Assumptions.assumeThat((boolean)this.isSSLEnabled()).as("This test can deadlock when using SSL. See FLINK-19369.", new Object[0])).isFalse();
        try (BlobClient client = new BlobClient(new InetSocketAddress(this.getBlobServer().getAddress().getHostName(), this.getBlobServer().getPort()), this.getBlobClientConfig());){
            byte[] data = new byte[5000000];
            Random rnd = new Random();
            rnd.nextBytes(data);
            BlobKey key = client.putBuffer(jobId, data, 0, data.length, blobType);
            Assertions.assertThat((Comparable)key).isNotNull();
            InputStream is = client.getInternal(jobId, key);
            byte[] receiveBuffer = new byte[data.length];
            int firstChunkLen = 50000;
            BlobUtils.readFully((InputStream)is, (byte[])receiveBuffer, (int)0, (int)firstChunkLen, null);
            BlobUtils.readFully((InputStream)is, (byte[])receiveBuffer, (int)firstChunkLen, (int)firstChunkLen, null);
            for (BlobServerConnection conn : this.getBlobServer().getCurrentActiveConnections()) {
                conn.close();
            }
            try {
                BlobUtils.readFully((InputStream)is, (byte[])receiveBuffer, (int)(2 * firstChunkLen), (int)(data.length - 2 * firstChunkLen), null);
                Assertions.assertThat((byte[])receiveBuffer).isEqualTo((Object)data);
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    @Test
    void testUploadJarFilesHelper() throws Exception {
        BlobClientTest.uploadJarFile(this.getBlobServer(), this.getBlobClientConfig());
    }

    static void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig) throws Exception {
        File testFile = File.createTempFile("testfile", ".dat");
        testFile.deleteOnExit();
        BlobClientTest.prepareTestFile(testFile);
        InetSocketAddress serverAddress = new InetSocketAddress(blobServer.getAddress().getHostName(), blobServer.getPort());
        BlobClientTest.uploadJarFile(serverAddress, blobClientConfig, testFile);
        BlobClientTest.uploadJarFile(serverAddress, blobClientConfig, testFile);
    }

    private static void uploadJarFile(InetSocketAddress serverAddress, Configuration blobClientConfig, File testFile) throws IOException {
        JobID jobId = new JobID();
        List blobKeys = BlobClient.uploadFiles((InetSocketAddress)serverAddress, (Configuration)blobClientConfig, (JobID)jobId, Collections.singletonList(new Path(testFile.toURI())));
        Assertions.assertThat((List)blobKeys).hasSize(1);
        try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig);){
            BlobClientTest.validateGetAndClose(blobClient.getInternal(jobId, (BlobKey)blobKeys.get(0)), testFile);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testSocketTimeout() throws IOException {
        Configuration clientConfig = this.getBlobClientConfig();
        int oldSoTimeout = (Integer)clientConfig.get(BlobServerOptions.SO_TIMEOUT);
        clientConfig.set(BlobServerOptions.SO_TIMEOUT, (Object)50);
        try (TestBlobServer testBlobServer = new TestBlobServer(clientConfig, tempDir.resolve("test_server").toFile(), (BlobStore)new VoidBlobStore(), 10000L);){
            testBlobServer.start();
            InetSocketAddress serverAddress = new InetSocketAddress(this.getBlobServer().getAddress().getHostName(), testBlobServer.getPort());
            try (BlobClient client = new BlobClient(serverAddress, clientConfig);){
                client.getInternal(new JobID(), BlobKey.createKey((BlobKey.BlobType)BlobKey.BlobType.TRANSIENT_BLOB));
                Assertions.fail((String)"Should throw an exception.");
            }
            catch (Throwable t) {
                Assertions.assertThat((Optional)ExceptionUtils.findThrowable((Throwable)t, SocketTimeoutException.class)).isPresent();
            }
        }
        finally {
            clientConfig.set(BlobServerOptions.SO_TIMEOUT, (Object)oldSoTimeout);
        }
    }

    @Test
    void testUnresolvedInetSocketAddress() throws Exception {
        try (BlobClient client = new BlobClient(InetSocketAddress.createUnresolved(this.getBlobServer().getAddress().getHostName(), this.getBlobServer().getPort()), this.getBlobClientConfig());){
            Assertions.assertThat((boolean)client.isConnected()).isTrue();
        }
    }

    @Test
    void testWildcardBindingAddress() throws Exception {
        Configuration config = new Configuration();
        config.set(JobManagerOptions.BIND_HOST, (Object)"0.0.0.0");
        File tempServerDir = tempDir.resolve("wildcard_test").toFile();
        tempServerDir.mkdirs();
        try (BlobServer testServer = new BlobServer(config, tempServerDir, (BlobStore)new VoidBlobStore());){
            testServer.start();
            InetAddress address = testServer.getAddress();
            ((AbstractStringAssert)Assertions.assertThat((String)address.getHostAddress()).as("Should not return wildcard address", new Object[0])).isNotEqualTo((Object)"0.0.0.0");
        }
    }

    @Test
    void testReturnsConfiguredBindAddress() throws Exception {
        String loopbackAddress = InetAddress.getLoopbackAddress().getHostAddress();
        Configuration config = new Configuration();
        config.set(JobManagerOptions.BIND_HOST, (Object)loopbackAddress);
        File tempServerDir = tempDir.resolve("bind_address_test").toFile();
        tempServerDir.mkdirs();
        try (BlobServer testServer = new BlobServer(config, tempServerDir, (BlobStore)new VoidBlobStore());){
            testServer.start();
            InetAddress address = testServer.getAddress();
            ((AbstractStringAssert)Assertions.assertThat((String)address.getHostAddress()).as("Should return the bound address", new Object[0])).isEqualTo(loopbackAddress);
        }
    }

    static class TestBlobServer
    extends BlobServer {
        private final long blockingMillis;

        TestBlobServer(Configuration config, File storageDirectory, BlobStore blobStore, long blockingMillis) throws IOException {
            super(config, storageDirectory, blobStore);
            this.blockingMillis = blockingMillis;
        }

        File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
            if (this.blockingMillis > 0L) {
                try {
                    Thread.sleep(this.blockingMillis);
                }
                catch (InterruptedException e) {
                    throw new IOException(e);
                }
            }
            return super.getFileInternal(jobId, blobKey);
        }
    }
}

