package org.apache.flink.runtime.blob;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/blob/BlobServerPutTest.class */
public class BlobServerPutTest {

    @TempDir
    private Path tempDir;
    private final Random rnd = new Random();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/blob/BlobServerPutTest$BlockingInputStream.class */
    public static final class BlockingInputStream extends InputStream {
        private final CountDownLatch countDownLatch;
        private final byte[] data;
        private int index = 0;

        /* JADX INFO: Access modifiers changed from: package-private */
        public BlockingInputStream(CountDownLatch countDownLatch, byte[] bArr) {
            this.countDownLatch = (CountDownLatch) Preconditions.checkNotNull(countDownLatch);
            this.data = (byte[]) Preconditions.checkNotNull(bArr);
        }

        @Override // java.io.InputStream
        public int read() throws IOException {
            this.countDownLatch.countDown();
            try {
                this.countDownLatch.await();
                if (this.index >= this.data.length) {
                    return -1;
                }
                byte[] bArr = this.data;
                int i = this.index;
                this.index = i + 1;
                return bArr[i];
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("Blocking operation was interrupted.", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/blob/BlobServerPutTest$ChunkedInputStream.class */
    public static final class ChunkedInputStream extends InputStream {
        private final byte[][] data;
        private int x = 0;
        private int y = 0;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* JADX WARN: Type inference failed for: r1v3, types: [byte[], byte[][]] */
        public ChunkedInputStream(byte[] bArr, int i) {
            this.data = new byte[i];
            int length = bArr.length / i;
            int i2 = 0;
            int i3 = 0;
            while (i3 < i - 1) {
                this.data[i3] = new byte[length];
                System.arraycopy(bArr, i2, this.data[i3], 0, length);
                i3++;
                i2 += length;
            }
            this.data[i - 1] = new byte[bArr.length - i2];
            System.arraycopy(bArr, i2, this.data[i - 1], 0, this.data[i - 1].length);
        }

        @Override // java.io.InputStream
        public int read() {
            if (this.x >= this.data.length) {
                return -1;
            }
            byte[] bArr = this.data[this.x];
            if (this.y < bArr.length) {
                byte b = bArr[this.y];
                this.y++;
                return b;
            }
            this.y = 0;
            this.x++;
            return read();
        }

        @Override // java.io.InputStream
        public int read(byte[] bArr, int i, int i2) {
            if (i2 == 0) {
                return 0;
            }
            if (this.x >= this.data.length) {
                return -1;
            }
            byte[] bArr2 = this.data[this.x];
            if (this.y >= bArr2.length) {
                this.y = 0;
                this.x++;
                return read(bArr, i, i2);
            }
            int min = Math.min(i2, bArr2.length - this.y);
            System.arraycopy(bArr2, this.y, bArr, i, min);
            this.y += min;
            return min;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/blob/BlobServerPutTest$ContentAddressableGetStorageLocation.class */
    public static class ContentAddressableGetStorageLocation extends CheckedThread {
        private final BlobServer server;
        private final JobID jobId;
        private final BlobKey key;

        ContentAddressableGetStorageLocation(BlobServer blobServer, @Nullable JobID jobID, BlobKey blobKey) {
            this.server = blobServer;
            this.jobId = jobID;
            this.key = blobKey;
        }

        public void go() throws Exception {
            this.server.getStorageLocation(this.jobId, this.key);
        }
    }

    BlobServerPutTest() {
    }

    @Test
    void testServerContentAddressableGetStorageLocationConcurrentNoJob() throws Exception {
        testServerContentAddressableGetStorageLocationConcurrent(null);
    }

    @Test
    void testServerContentAddressableGetStorageLocationConcurrentForJob() throws Exception {
        testServerContentAddressableGetStorageLocationConcurrent(new JobID());
    }

    private void testServerContentAddressableGetStorageLocationConcurrent(@Nullable JobID jobID) throws Exception {
        BlobServer createServer = TestingBlobUtils.createServer(this.tempDir);
        try {
            createServer.start();
            TransientBlobKey transientBlobKey = new TransientBlobKey();
            PermanentBlobKey permanentBlobKey = new PermanentBlobKey();
            checkedThreadSimpleTest(new CheckedThread[]{new ContentAddressableGetStorageLocation(createServer, jobID, transientBlobKey), new ContentAddressableGetStorageLocation(createServer, jobID, transientBlobKey), new ContentAddressableGetStorageLocation(createServer, jobID, transientBlobKey), new ContentAddressableGetStorageLocation(createServer, jobID, permanentBlobKey), new ContentAddressableGetStorageLocation(createServer, jobID, permanentBlobKey), new ContentAddressableGetStorageLocation(createServer, jobID, permanentBlobKey)});
            if (createServer != null) {
                createServer.close();
            }
        } catch (Throwable th) {
            if (createServer != null) {
                try {
                    createServer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void checkedThreadSimpleTest(CheckedThread[] checkedThreadArr) throws Exception {
        for (CheckedThread checkedThread : checkedThreadArr) {
            checkedThread.start();
        }
        for (CheckedThread checkedThread2 : checkedThreadArr) {
            checkedThread2.sync();
        }
    }

    @Test
    void testPutBufferSuccessfulGet1() throws IOException {
        testPutBufferSuccessfulGet(null, null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testPutBufferSuccessfulGet2() throws IOException {
        testPutBufferSuccessfulGet(null, new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testPutBufferSuccessfulGet3() throws IOException {
        testPutBufferSuccessfulGet(new JobID(), new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testPutBufferSuccessfulGet4() throws IOException {
        testPutBufferSuccessfulGet(new JobID(), null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testPutBufferSuccessfulGetHa() throws IOException {
        testPutBufferSuccessfulGet(new JobID(), new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testPutBufferSuccessfulGet(@Nullable JobID jobID, @Nullable JobID jobID2, BlobKey.BlobType blobType) throws IOException {
        BlobServer createServer = TestingBlobUtils.createServer(this.tempDir);
        try {
            createServer.start();
            byte[] bArr = new byte[2000000];
            this.rnd.nextBytes(bArr);
            byte[] copyOfRange = Arrays.copyOfRange(bArr, 10, 54);
            BlobKey put = put((BlobService) createServer, jobID, bArr, blobType);
            Assertions.assertThat(put).isNotNull();
            BlobKey put2 = put((BlobService) createServer, jobID, bArr, blobType);
            Assertions.assertThat(put2).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals(put, put2);
            BlobKey put3 = put((BlobService) createServer, jobID, copyOfRange, blobType);
            Assertions.assertThat(put3).isNotNull();
            verifyContents((BlobService) createServer, jobID, put, bArr);
            verifyContents((BlobService) createServer, jobID, put2, bArr);
            verifyContents((BlobService) createServer, jobID, put3, copyOfRange);
            BlobKey put4 = put((BlobService) createServer, jobID2, bArr, blobType);
            Assertions.assertThat(put4).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals(put, put4);
            BlobKey put5 = put((BlobService) createServer, jobID2, copyOfRange, blobType);
            Assertions.assertThat(put5).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals(put3, put5);
            verifyContents((BlobService) createServer, jobID2, put4, bArr);
            verifyContents((BlobService) createServer, jobID2, put5, copyOfRange);
            verifyContents((BlobService) createServer, jobID, put, bArr);
            verifyContents((BlobService) createServer, jobID, put2, bArr);
            verifyContents((BlobService) createServer, jobID, put3, copyOfRange);
            verifyContents((BlobService) createServer, jobID2, put4, bArr);
            verifyContents((BlobService) createServer, jobID2, put5, copyOfRange);
            if (createServer != null) {
                createServer.close();
            }
        } catch (Throwable th) {
            if (createServer != null) {
                try {
                    createServer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testPutStreamSuccessfulGet1() throws IOException {
        testPutStreamSuccessfulGet(null, null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testPutStreamSuccessfulGet2() throws IOException {
        testPutStreamSuccessfulGet(null, new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testPutStreamSuccessfulGet3() throws IOException {
        testPutStreamSuccessfulGet(new JobID(), new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testPutStreamSuccessfulGet4() throws IOException {
        testPutStreamSuccessfulGet(new JobID(), null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testPutStreamSuccessfulGetHa() throws IOException {
        testPutStreamSuccessfulGet(new JobID(), new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testPutStreamSuccessfulGet(@Nullable JobID jobID, @Nullable JobID jobID2, BlobKey.BlobType blobType) throws IOException {
        BlobServer createServer = TestingBlobUtils.createServer(this.tempDir);
        try {
            createServer.start();
            byte[] bArr = new byte[2000000];
            this.rnd.nextBytes(bArr);
            byte[] copyOfRange = Arrays.copyOfRange(bArr, 10, 54);
            BlobKey put = put((BlobService) createServer, jobID, (InputStream) new ByteArrayInputStream(bArr), blobType);
            Assertions.assertThat(put).isNotNull();
            BlobKey put2 = put((BlobService) createServer, jobID, (InputStream) new ByteArrayInputStream(bArr), blobType);
            Assertions.assertThat(put2).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals(put, put2);
            BlobKey put3 = put((BlobService) createServer, jobID, (InputStream) new ByteArrayInputStream(copyOfRange), blobType);
            Assertions.assertThat(put3).isNotNull();
            verifyContents((BlobService) createServer, jobID, put, bArr);
            verifyContents((BlobService) createServer, jobID, put2, bArr);
            verifyContents((BlobService) createServer, jobID, put3, copyOfRange);
            BlobKey put4 = put((BlobService) createServer, jobID2, (InputStream) new ByteArrayInputStream(bArr), blobType);
            Assertions.assertThat(put4).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals(put, put4);
            BlobKey put5 = put((BlobService) createServer, jobID2, (InputStream) new ByteArrayInputStream(copyOfRange), blobType);
            Assertions.assertThat(put5).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals(put3, put5);
            verifyContents((BlobService) createServer, jobID2, put4, bArr);
            verifyContents((BlobService) createServer, jobID2, put5, copyOfRange);
            verifyContents((BlobService) createServer, jobID, put, bArr);
            verifyContents((BlobService) createServer, jobID, put2, bArr);
            verifyContents((BlobService) createServer, jobID, put3, copyOfRange);
            verifyContents((BlobService) createServer, jobID2, put4, bArr);
            verifyContents((BlobService) createServer, jobID2, put5, copyOfRange);
            if (createServer != null) {
                createServer.close();
            }
        } catch (Throwable th) {
            if (createServer != null) {
                try {
                    createServer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testPutChunkedStreamSuccessfulGet1() throws IOException {
        testPutChunkedStreamSuccessfulGet(null, null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testPutChunkedStreamSuccessfulGet2() throws IOException {
        testPutChunkedStreamSuccessfulGet(null, new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testPutChunkedStreamSuccessfulGet3() throws IOException {
        testPutChunkedStreamSuccessfulGet(new JobID(), new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testPutChunkedStreamSuccessfulGet4() throws IOException {
        testPutChunkedStreamSuccessfulGet(new JobID(), null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testPutChunkedStreamSuccessfulGetHa() throws IOException {
        testPutChunkedStreamSuccessfulGet(new JobID(), new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testPutChunkedStreamSuccessfulGet(@Nullable JobID jobID, @Nullable JobID jobID2, BlobKey.BlobType blobType) throws IOException {
        BlobServer createServer = TestingBlobUtils.createServer(this.tempDir);
        try {
            createServer.start();
            byte[] bArr = new byte[2000000];
            this.rnd.nextBytes(bArr);
            byte[] copyOfRange = Arrays.copyOfRange(bArr, 10, 54);
            BlobKey put = put((BlobService) createServer, jobID, (InputStream) new ChunkedInputStream(bArr, 19), blobType);
            Assertions.assertThat(put).isNotNull();
            BlobKey put2 = put((BlobService) createServer, jobID, (InputStream) new ChunkedInputStream(bArr, 19), blobType);
            Assertions.assertThat(put2).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals(put, put2);
            BlobKey put3 = put((BlobService) createServer, jobID, (InputStream) new ChunkedInputStream(copyOfRange, 19), blobType);
            Assertions.assertThat(put3).isNotNull();
            verifyContents((BlobService) createServer, jobID, put, bArr);
            verifyContents((BlobService) createServer, jobID, put2, bArr);
            verifyContents((BlobService) createServer, jobID, put3, copyOfRange);
            BlobKey put4 = put((BlobService) createServer, jobID2, (InputStream) new ChunkedInputStream(bArr, 19), blobType);
            Assertions.assertThat(put4).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals(put, put4);
            BlobKey put5 = put((BlobService) createServer, jobID2, (InputStream) new ChunkedInputStream(copyOfRange, 19), blobType);
            Assertions.assertThat(put5).isNotNull();
            BlobKeyTest.verifyKeyDifferentHashEquals(put3, put5);
            verifyContents((BlobService) createServer, jobID2, put4, bArr);
            verifyContents((BlobService) createServer, jobID2, put5, copyOfRange);
            verifyContents((BlobService) createServer, jobID, put, bArr);
            verifyContents((BlobService) createServer, jobID, put2, bArr);
            verifyContents((BlobService) createServer, jobID, put3, copyOfRange);
            verifyContents((BlobService) createServer, jobID2, put4, bArr);
            verifyContents((BlobService) createServer, jobID2, put5, copyOfRange);
            if (createServer != null) {
                createServer.close();
            }
        } catch (Throwable th) {
            if (createServer != null) {
                try {
                    createServer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsNoJob() throws IOException {
        testPutBufferFails(null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsForJob() throws IOException {
        testPutBufferFails(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsForJobHa() throws IOException {
        testPutBufferFails(new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testPutBufferFails(@Nullable JobID jobID, BlobKey.BlobType blobType) throws IOException {
        ((AbstractBooleanAssert) Assumptions.assumeThat(OperatingSystem.isWindows()).as("setWritable doesn't work on Windows", new Object[0])).isFalse();
        BlobServer createServer = TestingBlobUtils.createServer(this.tempDir);
        try {
            createServer.start();
            File parentFile = createServer.createTemporaryFilename().getParentFile().getParentFile();
            Assertions.assertThat(parentFile.setExecutable(true, false)).isTrue();
            Assertions.assertThat(parentFile.setReadable(true, false)).isTrue();
            Assertions.assertThat(parentFile.setWritable(false, false)).isTrue();
            byte[] bArr = new byte[2000000];
            this.rnd.nextBytes(bArr);
            Assertions.assertThatThrownBy(() -> {
                put((BlobService) createServer, jobID, bArr, blobType);
            }).isInstanceOf(AccessDeniedException.class);
            Assertions.assertThat(parentFile.setWritable(true, false)).isTrue();
            if (createServer != null) {
                createServer.close();
            }
        } catch (Throwable th) {
            if (createServer != null) {
                try {
                    createServer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsIncomingNoJob() throws IOException {
        testPutBufferFailsIncoming(null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsIncomingForJob() throws IOException {
        testPutBufferFailsIncoming(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsIncomingForJobHa() throws IOException {
        testPutBufferFailsIncoming(new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testPutBufferFailsIncoming(@Nullable JobID jobID, BlobKey.BlobType blobType) throws IOException {
        ((AbstractBooleanAssert) Assumptions.assumeThat(OperatingSystem.isWindows()).as("setWritable doesn't work on Windows", new Object[0])).isFalse();
        File file = null;
        try {
            BlobServer createServer = TestingBlobUtils.createServer(this.tempDir);
            try {
                createServer.start();
                File parentFile = createServer.createTemporaryFilename().getParentFile();
                Assertions.assertThat(parentFile.setExecutable(true, false)).isTrue();
                Assertions.assertThat(parentFile.setReadable(true, false)).isTrue();
                Assertions.assertThat(parentFile.setWritable(false, false)).isTrue();
                byte[] bArr = new byte[2000000];
                this.rnd.nextBytes(bArr);
                try {
                    Assertions.assertThatThrownBy(() -> {
                        put((BlobService) createServer, jobID, bArr, blobType);
                    }).isInstanceOf(IOException.class).hasMessageEndingWith(" (Permission denied)");
                    Assertions.assertThat(parentFile.getParentFile().list()).containsExactly(new String[]{"incoming"});
                    if (createServer != null) {
                        createServer.close();
                    }
                    if (parentFile != null) {
                        parentFile.setWritable(true, false);
                    }
                } catch (Throwable th) {
                    Assertions.assertThat(parentFile.getParentFile().list()).containsExactly(new String[]{"incoming"});
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th2) {
            if (0 != 0) {
                file.setWritable(true, false);
            }
            throw th2;
        }
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsStoreNoJob() throws IOException {
        testPutBufferFailsStore(null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsStoreForJob() throws IOException {
        testPutBufferFailsStore(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testPutBufferFailsStoreForJobHa() throws IOException {
        testPutBufferFailsStore(new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testPutBufferFailsStore(@Nullable JobID jobID, BlobKey.BlobType blobType) throws IOException {
        ((AbstractBooleanAssert) Assumptions.assumeThat(OperatingSystem.isWindows()).as("setWritable doesn't work on Windows", new Object[0])).isFalse();
        File file = null;
        try {
            BlobServer createServer = TestingBlobUtils.createServer(this.tempDir);
            try {
                createServer.start();
                File parentFile = createServer.getStorageLocation(jobID, BlobKey.createKey(blobType)).getParentFile();
                Assertions.assertThat(parentFile.setExecutable(true, false)).isTrue();
                Assertions.assertThat(parentFile.setReadable(true, false)).isTrue();
                Assertions.assertThat(parentFile.setWritable(false, false)).isTrue();
                byte[] bArr = new byte[2000000];
                this.rnd.nextBytes(bArr);
                try {
                    Assertions.assertThatThrownBy(() -> {
                        put((BlobService) createServer, jobID, bArr, blobType);
                    }).isInstanceOf(AccessDeniedException.class);
                    Assertions.assertThat(new File(parentFile.getParent(), "incoming").list()).isEmpty();
                    Assertions.assertThat(parentFile.list()).isEmpty();
                    if (createServer != null) {
                        createServer.close();
                    }
                    if (parentFile != null) {
                        parentFile.setWritable(true, false);
                    }
                } catch (Throwable th) {
                    Assertions.assertThat(new File(parentFile.getParent(), "incoming").list()).isEmpty();
                    Assertions.assertThat(parentFile.list()).isEmpty();
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th2) {
            if (0 != 0) {
                file.setWritable(true, false);
            }
            throw th2;
        }
    }

    @Test
    void testConcurrentPutOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
        testConcurrentPutOperations(null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testConcurrentPutOperationsForJob() throws IOException, ExecutionException, InterruptedException {
        testConcurrentPutOperations(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testConcurrentPutOperationsForJobHa() throws IOException, ExecutionException, InterruptedException {
        testConcurrentPutOperations(new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    @Test
    void testFailedBlobStorePutsDeletesLocalBlob() throws IOException {
        BlobKey.BlobType blobType = BlobKey.BlobType.PERMANENT_BLOB;
        JobID generate = JobID.generate();
        byte[] bArr = {1, 2, 3};
        File newFolder = TempDirUtils.newFolder(this.tempDir);
        BlobServer blobServer = new BlobServer(new Configuration(), newFolder, new TestingBlobStoreBuilder().setPutFunction((file, jobID, blobKey) -> {
            throw new IOException("Could not persist the file.");
        }).createTestingBlobStore());
        try {
            Assertions.assertThatThrownBy(() -> {
                put((BlobService) blobServer, generate, bArr, blobType);
            }).isInstanceOf(IOException.class);
            Assertions.assertThat(new File(BlobUtils.getStorageLocationPath(newFolder.getAbsolutePath(), generate))).isEmptyDirectory();
            blobServer.close();
        } catch (Throwable th) {
            try {
                blobServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void testConcurrentPutOperations(@Nullable JobID jobID, BlobKey.BlobType blobType) throws IOException, InterruptedException, ExecutionException {
        Configuration configuration = new Configuration();
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        TestingBlobStore createTestingBlobStore = new TestingBlobStoreBuilder().setPutFunction((file, jobID2, blobKey) -> {
            newKeySet.add(blobKey);
            return true;
        }).createTestingBlobStore();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        byte[] bArr = new byte[HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE];
        ArrayList arrayList = new ArrayList(2);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        try {
            BlobServer createServer = TestingBlobUtils.createServer(this.tempDir, configuration, createTestingBlobStore);
            try {
                createServer.start();
                for (int i = 0; i < 2; i++) {
                    arrayList.add(CompletableFuture.supplyAsync(() -> {
                        try {
                            BlobKey put = put((BlobService) createServer, jobID, (InputStream) new BlockingInputStream(countDownLatch, bArr), blobType);
                            verifyContents((BlobService) createServer, jobID, put, bArr);
                            return put;
                        } catch (IOException e) {
                            throw new CompletionException((Throwable) new FlinkException("Could not upload blob.", e));
                        }
                    }, newFixedThreadPool));
                }
                Collection collection = (Collection) FutureUtils.combineAll(arrayList).get();
                Iterator it = collection.iterator();
                Assertions.assertThat(it).hasNext();
                BlobKey blobKey2 = (BlobKey) it.next();
                while (it.hasNext()) {
                    BlobKeyTest.verifyKeyDifferentHashEquals(blobKey2, (BlobKey) it.next());
                }
                verifyContents((BlobService) createServer, jobID, blobKey2, bArr);
                if (blobType == BlobKey.BlobType.PERMANENT_BLOB) {
                    Assertions.assertThat(newKeySet).hasSameElementsAs(collection);
                } else {
                    Assertions.assertThat(newKeySet).isEmpty();
                }
                if (createServer != null) {
                    createServer.close();
                }
            } finally {
            }
        } finally {
            newFixedThreadPool.shutdownNow();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static BlobKey put(BlobService blobService, @Nullable JobID jobID, InputStream inputStream, BlobKey.BlobType blobType) throws IOException {
        if (blobType != BlobKey.BlobType.PERMANENT_BLOB) {
            return jobID == null ? blobService.getTransientBlobService().putTransient(inputStream) : blobService.getTransientBlobService().putTransient(jobID, inputStream);
        }
        if (blobService instanceof BlobServer) {
            return ((BlobServer) blobService).putPermanent(jobID, inputStream);
        }
        throw new UnsupportedOperationException("uploading streams is only possible at the BlobServer");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static BlobKey put(BlobService blobService, @Nullable JobID jobID, byte[] bArr, BlobKey.BlobType blobType) throws IOException {
        if (blobType != BlobKey.BlobType.PERMANENT_BLOB) {
            return jobID == null ? blobService.getTransientBlobService().putTransient(bArr) : blobService.getTransientBlobService().putTransient(jobID, bArr);
        }
        if (blobService instanceof BlobServer) {
            return ((BlobServer) blobService).putPermanent(jobID, bArr);
        }
        File file = Files.createTempFile("blob", ".jar", new FileAttribute[0]).toFile();
        try {
            FileUtils.writeByteArrayToFile(file, bArr);
            List uploadFiles = BlobClient.uploadFiles(new InetSocketAddress("localhost", blobService.getPort()), new Configuration(), jobID, Collections.singletonList(new org.apache.flink.core.fs.Path(file.getAbsolutePath())));
            Assertions.assertThat(uploadFiles).hasSize(1);
            BlobKey blobKey = (BlobKey) uploadFiles.get(0);
            file.delete();
            return blobKey;
        } catch (Throwable th) {
            file.delete();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void verifyContents(BlobService blobService, @Nullable JobID jobID, BlobKey blobKey, byte[] bArr) throws IOException {
        BlobClientTest.validateGetAndClose(Files.newInputStream(BlobServerGetTest.get(blobService, jobID, blobKey).toPath(), new OpenOption[0]), bArr);
    }

    static void verifyContents(BlobService blobService, @Nullable JobID jobID, BlobKey blobKey, InputStream inputStream) throws IOException {
        BlobClientTest.validateGetAndClose(Files.newInputStream(BlobServerGetTest.get(blobService, jobID, blobKey).toPath(), new OpenOption[0]), inputStream);
    }
}
