/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobCacheSizeTracker;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobServerPutTest;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.TestingBlobUtils;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.AbstractComparableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class PermanentBlobCacheSizeLimitTest {
    private static final Random RANDOM = new Random();
    private static final BlobKey.BlobType BLOB_TYPE = BlobKey.BlobType.PERMANENT_BLOB;
    private static final int BLOB_SIZE = 10000;
    private static final int MAX_NUM_OF_ACCEPTED_BLOBS = 2;
    private static final int TOTAL_NUM_OF_BLOBS = 3;
    @TempDir
    Path tempDir;

    PermanentBlobCacheSizeLimitTest() {
    }

    @Test
    void testTrackSizeLimitAndDeleteExcessSequentially() throws Exception {
        Configuration config = new Configuration();
        try (BlobServer server = TestingBlobUtils.createServer(this.tempDir, config);
             BlobCacheService cache = this.initBlobCacheServiceWithSizeLimit(config, new InetSocketAddress("localhost", server.getPort()));){
            server.start();
            BlobInfo[] blobs = PermanentBlobCacheSizeLimitTest.putBlobsIntoBlobServer(server);
            for (int i = 0; i < 3; ++i) {
                PermanentBlobCacheSizeLimitTest.readFileAndVerifyContent((BlobService)cache, blobs[i].jobId, blobs[i].blobKey, blobs[i].data);
                blobs[i].blobFile = PermanentBlobCacheSizeLimitTest.getFile(cache, blobs[i].jobId, blobs[i].blobKey);
                Assertions.assertThat((File)blobs[i].blobFile).exists();
            }
            Assertions.assertThat((File)blobs[0].blobFile).doesNotExist();
            Assertions.assertThat((File)blobs[1].blobFile).exists();
            PermanentBlobCacheSizeLimitTest.readFileAndVerifyContent((BlobService)cache, blobs[1].jobId, blobs[1].blobKey, blobs[1].data);
            blobs[0].blobKey = BlobServerPutTest.put((BlobService)server, blobs[0].jobId, blobs[0].data, BLOB_TYPE);
            PermanentBlobCacheSizeLimitTest.readFileAndVerifyContent((BlobService)cache, blobs[0].jobId, blobs[0].blobKey, blobs[0].data);
            blobs[0].blobFile = PermanentBlobCacheSizeLimitTest.getFile(cache, blobs[0].jobId, blobs[0].blobKey);
            Assertions.assertThat((File)blobs[0].blobFile).exists();
            Assertions.assertThat((File)blobs[1].blobFile).exists();
            Assertions.assertThat((File)blobs[2].blobFile).doesNotExist();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testTrackSizeLimitAndDeleteExcessConcurrently() throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        Configuration config = new Configuration();
        try (BlobServer server = TestingBlobUtils.createServer(this.tempDir, config);
             BlobCacheService cache = this.initBlobCacheServiceWithSizeLimit(config, new InetSocketAddress("localhost", server.getPort()));){
            server.start();
            BlobInfo[] blobs = PermanentBlobCacheSizeLimitTest.putBlobsIntoBlobServer(server);
            ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(3);
            int i = 0;
            while (i < 3) {
                int idx = i++;
                CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                    try {
                        PermanentBlobCacheSizeLimitTest.readFileAndVerifyContent((BlobService)cache, blobs[idx].jobId, blobs[idx].blobKey, blobs[idx].data);
                        blobs[idx].blobFile = PermanentBlobCacheSizeLimitTest.getFile(cache, blobs[idx].jobId, blobs[idx].blobKey);
                        return null;
                    }
                    catch (IOException e) {
                        throw new CompletionException(e);
                    }
                }, executor);
                futures.add(future);
            }
            FutureUtils.ConjunctFuture conjunctFuture = FutureUtils.waitForAll(futures);
            conjunctFuture.get();
            int exists = 0;
            int nonExists = 0;
            for (int i2 = 0; i2 < 3; ++i2) {
                if (blobs[i2].blobFile.exists()) {
                    ++exists;
                    continue;
                }
                ++nonExists;
            }
            Assertions.assertThat((int)exists).isEqualTo(2);
            Assertions.assertThat((int)nonExists).isEqualTo(1);
        }
        finally {
            executor.shutdownNow();
        }
    }

    private BlobCacheService initBlobCacheServiceWithSizeLimit(Configuration config, @Nullable InetSocketAddress serverAddress) throws IOException {
        PermanentBlobCache permanentBlobCache = new PermanentBlobCache(config, this.tempDir.resolve("permanent_cache").toFile(), (BlobView)new VoidBlobStore(), serverAddress, new BlobCacheSizeTracker(20000L));
        TransientBlobCache transientBlobCache = new TransientBlobCache(config, this.tempDir.resolve("transient_cache").toFile(), serverAddress);
        return new BlobCacheService(permanentBlobCache, transientBlobCache);
    }

    private static BlobInfo[] putBlobsIntoBlobServer(BlobServer server) throws IOException {
        BlobInfo[] blobs = new BlobInfo[3];
        for (int i = 0; i < 3; ++i) {
            blobs[i] = new BlobInfo();
            blobs[i].blobKey = BlobServerPutTest.put((BlobService)server, blobs[i].jobId, blobs[i].data, BLOB_TYPE);
            Assertions.assertThat((Comparable)blobs[i].blobKey).isNotNull();
        }
        return blobs;
    }

    private static void readFileAndVerifyContent(BlobService blobService, JobID jobId, BlobKey blobKey, byte[] expected) throws IOException {
        Assertions.assertThat((Comparable)jobId).isNotNull();
        ((AbstractComparableAssert)Assertions.assertThat((Comparable)blobKey).isNotNull()).isInstanceOf(PermanentBlobKey.class);
        byte[] target = blobService.getPermanentBlobService().readFile(jobId, (PermanentBlobKey)blobKey);
        Assertions.assertThat((byte[])target).isEqualTo((Object)expected);
    }

    private static File getFile(BlobCacheService blobCacheService, JobID jobId, BlobKey blobKey) throws IOException {
        return blobCacheService.getPermanentBlobService().getStorageLocation(jobId, blobKey);
    }

    private static class BlobInfo {
        private final JobID jobId = new JobID();
        private final byte[] data = new byte[10000];
        private BlobKey blobKey;
        private File blobFile;

        private BlobInfo() {
            RANDOM.nextBytes(this.data);
        }
    }
}

