package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/blob/BlobCacheGetTest.class */
class BlobCacheGetTest {

    @TempDir
    private Path tempDir;
    private final Random rnd = new Random();

    BlobCacheGetTest() {
    }

    @Test
    void testGetTransientFailsDuringLookup1() throws IOException, InterruptedException {
        testGetFailsDuringLookup(null, new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testGetTransientFailsDuringLookup2() throws IOException, InterruptedException {
        testGetFailsDuringLookup(new JobID(), new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testGetTransientFailsDuringLookup3() throws IOException, InterruptedException {
        testGetFailsDuringLookup(new JobID(), null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Test
    void testGetFailsDuringLookupHa() throws IOException, InterruptedException {
        testGetFailsDuringLookup(new JobID(), new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testGetFailsDuringLookup(JobID jobID, JobID jobID2, BlobKey.BlobType blobType) throws IOException, InterruptedException {
        Tuple2<BlobServer, BlobCacheService> createServerAndCache = TestingBlobUtils.createServerAndCache(this.tempDir);
        BlobServer blobServer = (BlobServer) createServerAndCache.f0;
        try {
            BlobCacheService blobCacheService = (BlobCacheService) createServerAndCache.f1;
            try {
                blobServer.start();
                byte[] bArr = new byte[2000000];
                this.rnd.nextBytes(bArr);
                BlobKey put = BlobServerPutTest.put((BlobService) blobServer, jobID, bArr, blobType);
                Assertions.assertThat(put).isNotNull();
                BlobKeyTest.verifyType(blobType, put);
                Assertions.assertThat(blobServer.getStorageLocation(jobID, put).delete()).isTrue();
                BlobServerGetTest.verifyDeleted(blobCacheService, jobID, put);
                BlobKey put2 = BlobServerPutTest.put((BlobService) blobServer, jobID2, bArr, blobType);
                Assertions.assertThat(put2).isNotNull();
                BlobKeyTest.verifyKeyDifferentHashEquals(put, put2);
                BlobServerGetTest.get(blobCacheService, jobID2, put2);
                BlobServerGetTest.verifyDeleted(blobCacheService, jobID, put);
                if (blobType == BlobKey.BlobType.PERMANENT_BLOB) {
                    Assertions.assertThat(blobServer.getStorageLocation(jobID2, put2)).exists();
                    Assertions.assertThat(blobCacheService.getPermanentBlobService().getStorageLocation(jobID2, put2).delete()).isTrue();
                    BlobServerGetTest.get(blobCacheService, jobID2, put2);
                    Assertions.assertThat(blobCacheService.getPermanentBlobService().getStorageLocation(jobID2, put2).delete()).isTrue();
                    Assertions.assertThat(blobServer.getStorageLocation(jobID2, put2).delete()).isTrue();
                    BlobServerGetTest.verifyDeleted(blobCacheService, jobID2, put2);
                } else {
                    BlobCachePutTest.verifyDeletedEventually(blobServer, jobID2, put2);
                    Assertions.assertThat(blobCacheService.getTransientBlobService().getStorageLocation(jobID2, put2).delete()).isTrue();
                    BlobServerGetTest.verifyDeleted(blobCacheService, jobID2, put2);
                }
                if (blobCacheService != null) {
                    blobCacheService.close();
                }
                if (blobServer != null) {
                    blobServer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (blobServer != null) {
                try {
                    blobServer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetFailsIncomingNoJob() throws IOException {
        testGetFailsIncoming(null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetFailsIncomingForJob() throws IOException {
        testGetFailsIncoming(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetFailsIncomingForJobHa() throws IOException {
        testGetFailsIncoming(new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testGetFailsIncoming(@Nullable JobID jobID, BlobKey.BlobType blobType) throws IOException {
        ((AbstractBooleanAssert) Assumptions.assumeThat(OperatingSystem.isWindows()).as("setWritable doesn't work on Windows", new Object[0])).isFalse();
        Tuple2<BlobServer, BlobCacheService> createServerAndCache = TestingBlobUtils.createServerAndCache(this.tempDir);
        File file = null;
        try {
            BlobServer blobServer = (BlobServer) createServerAndCache.f0;
            try {
                BlobCacheService blobCacheService = (BlobCacheService) createServerAndCache.f1;
                try {
                    blobServer.start();
                    byte[] bArr = new byte[2000000];
                    this.rnd.nextBytes(bArr);
                    BlobKey put = BlobServerPutTest.put((BlobService) blobServer, jobID, bArr, blobType);
                    BlobKeyTest.verifyType(blobType, put);
                    File parentFile = blobType == BlobKey.BlobType.PERMANENT_BLOB ? blobCacheService.getPermanentBlobService().createTemporaryFilename().getParentFile() : blobCacheService.getTransientBlobService().createTemporaryFilename().getParentFile();
                    Assertions.assertThat(parentFile.setExecutable(true, false)).isTrue();
                    Assertions.assertThat(parentFile.setReadable(true, false)).isTrue();
                    Assertions.assertThat(parentFile.setWritable(false, false)).isTrue();
                    try {
                        Assertions.assertThatThrownBy(() -> {
                            BlobServerGetTest.get(blobCacheService, jobID, put);
                        }).isInstanceOf(IOException.class).hasMessageStartingWith("Failed to fetch BLOB");
                        HashSet hashSet = new HashSet();
                        hashSet.add("incoming");
                        if (jobID != null) {
                            hashSet.add("job_" + jobID);
                            Assertions.assertThat(new File(parentFile.getParentFile(), "job_" + jobID).list()).isEmpty();
                        } else {
                            hashSet.add("no_job");
                            Assertions.assertThat(new File(parentFile.getParentFile(), "no_job").list()).isEmpty();
                        }
                        Assertions.assertThat(parentFile.getParentFile().list()).isNotNull().isNotEmpty().containsExactlyInAnyOrderElementsOf(hashSet);
                        Assertions.assertThat(blobServer.getStorageLocation(jobID, put)).exists();
                        if (blobCacheService != null) {
                            blobCacheService.close();
                        }
                        if (blobServer != null) {
                            blobServer.close();
                        }
                        if (parentFile != null) {
                            parentFile.setWritable(true, false);
                        }
                    } catch (Throwable th) {
                        HashSet hashSet2 = new HashSet();
                        hashSet2.add("incoming");
                        if (jobID != null) {
                            hashSet2.add("job_" + jobID);
                            Assertions.assertThat(new File(parentFile.getParentFile(), "job_" + jobID).list()).isEmpty();
                        } else {
                            hashSet2.add("no_job");
                            Assertions.assertThat(new File(parentFile.getParentFile(), "no_job").list()).isEmpty();
                        }
                        Assertions.assertThat(parentFile.getParentFile().list()).isNotNull().isNotEmpty().containsExactlyInAnyOrderElementsOf(hashSet2);
                        Assertions.assertThat(blobServer.getStorageLocation(jobID, put)).exists();
                        throw th;
                    }
                } catch (Throwable th2) {
                    if (blobCacheService != null) {
                        try {
                            blobCacheService.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        } catch (Throwable th4) {
            if (0 != 0) {
                file.setWritable(true, false);
            }
            throw th4;
        }
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetTransientFailsStoreNoJob() throws IOException, InterruptedException {
        testGetFailsStore(null, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetTransientFailsStoreForJob() throws IOException, InterruptedException {
        testGetFailsStore(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetPermanentFailsStoreForJob() throws IOException, InterruptedException {
        testGetFailsStore(new JobID(), BlobKey.BlobType.PERMANENT_BLOB);
    }

    private void testGetFailsStore(@Nullable JobID jobID, BlobKey.BlobType blobType) throws IOException, InterruptedException {
        ((AbstractBooleanAssert) Assumptions.assumeThat(OperatingSystem.isWindows()).as("setWritable doesn't work on Windows", new Object[0])).isFalse();
        Tuple2<BlobServer, BlobCacheService> createServerAndCache = TestingBlobUtils.createServerAndCache(this.tempDir);
        File file = null;
        try {
            BlobServer blobServer = (BlobServer) createServerAndCache.f0;
            try {
                BlobCacheService blobCacheService = (BlobCacheService) createServerAndCache.f1;
                try {
                    blobServer.start();
                    byte[] bArr = new byte[2000000];
                    this.rnd.nextBytes(bArr);
                    BlobKey put = BlobServerPutTest.put((BlobService) blobServer, jobID, bArr, blobType);
                    BlobKeyTest.verifyType(blobType, put);
                    File parentFile = blobType == BlobKey.BlobType.PERMANENT_BLOB ? blobCacheService.getPermanentBlobService().getStorageLocation(jobID, new PermanentBlobKey()).getParentFile() : blobCacheService.getTransientBlobService().getStorageLocation(jobID, new TransientBlobKey()).getParentFile();
                    Assertions.assertThat(parentFile.setExecutable(true, false)).isTrue();
                    Assertions.assertThat(parentFile.setReadable(true, false)).isTrue();
                    Assertions.assertThat(parentFile.setWritable(false, false)).isTrue();
                    try {
                        Assertions.assertThatThrownBy(() -> {
                            BlobServerGetTest.get(blobCacheService, jobID, put);
                        }).isInstanceOf(AccessDeniedException.class);
                        Assertions.assertThat(new File(parentFile.getParent(), "incoming").list()).isEmpty();
                        Assertions.assertThat(parentFile.list()).isEmpty();
                        if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                            BlobCachePutTest.verifyDeletedEventually(blobServer, jobID, put);
                        } else {
                            Assertions.assertThat(blobServer.getStorageLocation(jobID, put)).exists();
                        }
                        if (blobCacheService != null) {
                            blobCacheService.close();
                        }
                        if (blobServer != null) {
                            blobServer.close();
                        }
                        if (parentFile != null) {
                            parentFile.setWritable(true, false);
                        }
                    } catch (Throwable th) {
                        Assertions.assertThat(new File(parentFile.getParent(), "incoming").list()).isEmpty();
                        Assertions.assertThat(parentFile.list()).isEmpty();
                        if (blobType == BlobKey.BlobType.TRANSIENT_BLOB) {
                            BlobCachePutTest.verifyDeletedEventually(blobServer, jobID, put);
                        } else {
                            Assertions.assertThat(blobServer.getStorageLocation(jobID, put)).exists();
                        }
                        throw th;
                    }
                } catch (Throwable th2) {
                    if (blobCacheService != null) {
                        try {
                            blobCacheService.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        } catch (Throwable th4) {
            if (0 != 0) {
                file.setWritable(true, false);
            }
            throw th4;
        }
    }

    @Test
    void testGetFailsHaStoreForJobHa() throws IOException {
        JobID jobID = new JobID();
        Tuple2<BlobServer, BlobCacheService> createServerAndCache = TestingBlobUtils.createServerAndCache(this.tempDir);
        BlobServer blobServer = (BlobServer) createServerAndCache.f0;
        try {
            BlobCacheService blobCacheService = (BlobCacheService) createServerAndCache.f1;
            try {
                blobServer.start();
                byte[] bArr = new byte[2000000];
                this.rnd.nextBytes(bArr);
                PermanentBlobKey put = BlobServerPutTest.put((BlobService) blobServer, jobID, bArr, BlobKey.BlobType.PERMANENT_BLOB);
                Assertions.assertThat(blobServer.getStorageLocation(jobID, put).delete()).isTrue();
                File parentFile = blobServer.createTemporaryFilename().getParentFile();
                try {
                    Assertions.assertThatThrownBy(() -> {
                        BlobServerGetTest.get(blobCacheService, jobID, put);
                    }).isInstanceOf(IOException.class).hasMessageStartingWith("Failed to fetch BLOB");
                    HashSet hashSet = new HashSet();
                    hashSet.add("incoming");
                    hashSet.add("job_" + jobID);
                    Assertions.assertThat(parentFile.getParentFile().list()).isNotNull().isNotEmpty().containsExactlyInAnyOrderElementsOf(hashSet);
                    Assertions.assertThat(new File(parentFile.getParentFile(), "job_" + jobID).list()).isEmpty();
                    if (blobCacheService != null) {
                        blobCacheService.close();
                    }
                    if (blobServer != null) {
                        blobServer.close();
                    }
                } catch (Throwable th) {
                    HashSet hashSet2 = new HashSet();
                    hashSet2.add("incoming");
                    hashSet2.add("job_" + jobID);
                    Assertions.assertThat(parentFile.getParentFile().list()).isNotNull().isNotEmpty().containsExactlyInAnyOrderElementsOf(hashSet2);
                    Assertions.assertThat(new File(parentFile.getParentFile(), "job_" + jobID).list()).isEmpty();
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th2) {
            if (blobServer != null) {
                try {
                    blobServer.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetTransientRemoteDeleteFailsNoJob() throws IOException {
        testGetTransientRemoteDeleteFails(null);
    }

    @Tag("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser")
    @Test
    void testGetTransientRemoteDeleteFailsForJob() throws IOException {
        testGetTransientRemoteDeleteFails(new JobID());
    }

    private void testGetTransientRemoteDeleteFails(@Nullable JobID jobID) throws IOException {
        ((AbstractBooleanAssert) Assumptions.assumeThat(OperatingSystem.isWindows()).as("setWritable doesn't work on Windows", new Object[0])).isFalse();
        Tuple2<BlobServer, BlobCacheService> createServerAndCache = TestingBlobUtils.createServerAndCache(this.tempDir);
        File file = null;
        File file2 = null;
        BlobServer blobServer = (BlobServer) createServerAndCache.f0;
        try {
            BlobCacheService blobCacheService = (BlobCacheService) createServerAndCache.f1;
            try {
                blobServer.start();
                try {
                    byte[] bArr = new byte[2000000];
                    this.rnd.nextBytes(bArr);
                    TransientBlobKey put = BlobServerPutTest.put((BlobService) blobServer, jobID, bArr, BlobKey.BlobType.TRANSIENT_BLOB);
                    Assertions.assertThat(put).isNotNull();
                    file = blobServer.getStorageLocation(jobID, put);
                    file2 = file.getParentFile();
                    Assertions.assertThat(file.setWritable(false, false)).isTrue();
                    Assertions.assertThat(file2.setWritable(false, false)).isTrue();
                    BlobServerPutTest.verifyContents((BlobService) blobCacheService, jobID, (BlobKey) put, bArr);
                    Assertions.assertThat(BlobServerDeleteTest.delete(blobCacheService, jobID, put)).isTrue();
                    Assertions.assertThat(blobCacheService.getTransientBlobService().getStorageLocation(jobID, put)).doesNotExist();
                    BlobServerPutTest.verifyContents((BlobService) blobServer, jobID, (BlobKey) put, bArr);
                    BlobServerPutTest.verifyContents((BlobService) blobCacheService, jobID, (BlobKey) put, bArr);
                    if (file != null && file2 != null) {
                        file.setWritable(true, false);
                        file2.setWritable(true, false);
                    }
                    if (blobCacheService != null) {
                        blobCacheService.close();
                    }
                    if (blobServer != null) {
                        blobServer.close();
                    }
                } catch (Throwable th) {
                    if (file != null && file2 != null) {
                        file.setWritable(true, false);
                        file2.setWritable(true, false);
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (blobCacheService != null) {
                    try {
                        blobCacheService.close();
                    } catch (Throwable th3) {
                        th2.addSuppressed(th3);
                    }
                }
                throw th2;
            }
        } catch (Throwable th4) {
            if (blobServer != null) {
                try {
                    blobServer.close();
                } catch (Throwable th5) {
                    th4.addSuppressed(th5);
                }
            }
            throw th4;
        }
    }

    @Test
    void testConcurrentGetOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
        testConcurrentGetOperations(null, BlobKey.BlobType.TRANSIENT_BLOB, false);
    }

    @Test
    void testConcurrentGetOperationsForJob() throws IOException, ExecutionException, InterruptedException {
        testConcurrentGetOperations(new JobID(), BlobKey.BlobType.TRANSIENT_BLOB, false);
    }

    @Test
    void testConcurrentGetOperationsForJobHa() throws IOException, ExecutionException, InterruptedException {
        testConcurrentGetOperations(new JobID(), BlobKey.BlobType.PERMANENT_BLOB, false);
    }

    @Test
    void testConcurrentGetOperationsForJobHa2() throws IOException, ExecutionException, InterruptedException {
        testConcurrentGetOperations(new JobID(), BlobKey.BlobType.PERMANENT_BLOB, true);
    }

    private void testConcurrentGetOperations(JobID jobID, BlobKey.BlobType blobType, boolean z) throws IOException, InterruptedException, ExecutionException {
        BlobStore voidBlobStore = new VoidBlobStore();
        BlobStore voidBlobStore2 = new VoidBlobStore();
        ArrayList arrayList = new ArrayList(3);
        byte[] bArr = {1, 2, 3, 4, 99, 42};
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        Tuple2<BlobServer, BlobCacheService> createServerAndCache = TestingBlobUtils.createServerAndCache(this.tempDir, voidBlobStore, z ? voidBlobStore : voidBlobStore2);
        try {
            BlobServer blobServer = (BlobServer) createServerAndCache.f0;
            try {
                BlobCacheService blobCacheService = (BlobCacheService) createServerAndCache.f1;
                try {
                    blobServer.start();
                    BlobKey put = BlobServerPutTest.put((BlobService) blobServer, jobID, bArr, blobType);
                    for (int i = 0; i < 3; i++) {
                        arrayList.add(CompletableFuture.supplyAsync(() -> {
                            try {
                                File file = BlobServerGetTest.get(blobCacheService, jobID, put);
                                BlobClientTest.validateGetAndClose(Files.newInputStream(file.toPath(), new OpenOption[0]), bArr);
                                return file;
                            } catch (IOException e) {
                                throw new CompletionException((Throwable) new FlinkException("Could not read blob for key " + put + ".", e));
                            }
                        }, newFixedThreadPool));
                    }
                    FutureUtils.ConjunctFuture combineAll = FutureUtils.combineAll(arrayList);
                    if (blobType == BlobKey.BlobType.PERMANENT_BLOB) {
                        combineAll.get();
                    } else {
                        int i2 = 0;
                        Iterator it = arrayList.iterator();
                        while (it.hasNext()) {
                            try {
                                ((CompletableFuture) it.next()).get();
                                i2++;
                            } catch (Throwable th) {
                                if (!(ExceptionUtils.getRootCause(th) instanceof FileNotFoundException)) {
                                    org.apache.flink.util.ExceptionUtils.rethrowIOException(th);
                                }
                            }
                        }
                        Assertions.assertThat(i2).isGreaterThanOrEqualTo(1);
                    }
                    if (blobCacheService != null) {
                        blobCacheService.close();
                    }
                    if (blobServer != null) {
                        blobServer.close();
                    }
                } catch (Throwable th2) {
                    if (blobCacheService != null) {
                        try {
                            blobCacheService.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        } finally {
            newFixedThreadPool.shutdownNow();
        }
    }
}
