package org.apache.flink.runtime.rest.handler.taskmanager;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerFileMessageParameters;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.FileUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.class */
class AbstractTaskManagerFileHandlerTest {
    private static final ResourceID EXPECTED_TASK_MANAGER_ID = ResourceID.generate();
    private static final DefaultFullHttpRequest HTTP_REQUEST = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/foobar");

    @TempDir
    private static File temporaryFolder;
    private static BlobServer blobServer;
    private static HandlerRequest<EmptyRequestBody> handlerRequest;
    private String fileContent1;
    private TransientBlobKey transientBlobKey1;
    private String fileContent2;
    private TransientBlobKey transientBlobKey2;

    AbstractTaskManagerFileHandlerTest() {
    }

    @BeforeAll
    static void setup() throws IOException, HandlerRequestException {
        blobServer = new BlobServer(new Configuration(), temporaryFolder, new VoidBlobStore());
        handlerRequest = HandlerRequest.resolveParametersAndCreate(EmptyRequestBody.getInstance(), new TaskManagerFileMessageParameters(), Collections.singletonMap("taskmanagerid", EXPECTED_TASK_MANAGER_ID.getResourceIdString()), Collections.emptyMap(), Collections.emptyList());
    }

    @BeforeEach
    void setupTest() throws IOException {
        this.fileContent1 = UUID.randomUUID().toString();
        this.transientBlobKey1 = storeFileInBlobServer(createFileWithContent(this.fileContent1));
        this.fileContent2 = UUID.randomUUID().toString();
        this.transientBlobKey2 = storeFileInBlobServer(createFileWithContent(this.fileContent2));
    }

    @AfterAll
    static void teardown() throws IOException {
        if (blobServer != null) {
            blobServer.close();
            blobServer = null;
        }
    }

    @Test
    void testFileServing() throws Exception {
        Duration ofMillis = Duration.ofMillis(1000L);
        ArrayDeque arrayDeque = new ArrayDeque(1);
        arrayDeque.add(CompletableFuture.completedFuture(this.transientBlobKey1));
        TestingTaskManagerFileHandler createTestTaskManagerFileHandler = createTestTaskManagerFileHandler(ofMillis, arrayDeque, EXPECTED_TASK_MANAGER_ID);
        File newFile = TempDirUtils.newFile(temporaryFolder.toPath());
        createTestTaskManagerFileHandler.respondToRequest(new TestingChannelHandlerContext(newFile), HTTP_REQUEST, handlerRequest, null);
        Assertions.assertThat(newFile).isNotEmpty();
        Assertions.assertThat(FileUtils.readFileUtf8(newFile)).isEqualTo(this.fileContent1);
    }

    @Test
    void testFileCaching() throws Exception {
        File runFileCachingTest = runFileCachingTest(Duration.ofMillis(5000L), Duration.ofMillis(0L));
        Assertions.assertThat(runFileCachingTest).isNotEmpty();
        Assertions.assertThat(FileUtils.readFileUtf8(runFileCachingTest)).isEqualTo(this.fileContent1);
    }

    @Test
    void testFileCacheExpiration() throws Exception {
        Duration ofMillis = Duration.ofMillis(5L);
        File runFileCachingTest = runFileCachingTest(ofMillis, ofMillis);
        Assertions.assertThat(runFileCachingTest).isNotEmpty();
        Assertions.assertThat(FileUtils.readFileUtf8(runFileCachingTest)).isEqualTo(this.fileContent2);
    }

    private File runFileCachingTest(Duration duration, Duration duration2) throws Exception {
        ArrayDeque arrayDeque = new ArrayDeque(2);
        arrayDeque.add(CompletableFuture.completedFuture(this.transientBlobKey1));
        arrayDeque.add(CompletableFuture.completedFuture(this.transientBlobKey2));
        TestingTaskManagerFileHandler createTestTaskManagerFileHandler = createTestTaskManagerFileHandler(duration, arrayDeque, EXPECTED_TASK_MANAGER_ID);
        File newFile = TempDirUtils.newFile(temporaryFolder.toPath());
        TestingChannelHandlerContext testingChannelHandlerContext = new TestingChannelHandlerContext(newFile);
        createTestTaskManagerFileHandler.respondToRequest(testingChannelHandlerContext, HTTP_REQUEST, handlerRequest, null);
        Thread.sleep(duration2.toMillis());
        createTestTaskManagerFileHandler.respondToRequest(testingChannelHandlerContext, HTTP_REQUEST, handlerRequest, null);
        return newFile;
    }

    private TestingTaskManagerFileHandler createTestTaskManagerFileHandler(Duration duration, Queue<CompletableFuture<TransientBlobKey>> queue, ResourceID resourceID) {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        return new TestingTaskManagerFileHandler(() -> {
            return CompletableFuture.completedFuture(null);
        }, TestingUtils.infiniteTime(), Collections.emptyMap(), new TestUntypedMessageHeaders(), () -> {
            return CompletableFuture.completedFuture(testingResourceManagerGateway);
        }, blobServer, duration, queue, resourceID);
    }

    private static File createFileWithContent(String str) throws IOException {
        File newFile = TempDirUtils.newFile(temporaryFolder.toPath());
        FileOutputStream fileOutputStream = new FileOutputStream(newFile);
        try {
            fileOutputStream.write(str.getBytes("UTF-8"));
            fileOutputStream.close();
            return newFile;
        } catch (Throwable th) {
            try {
                fileOutputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private static TransientBlobKey storeFileInBlobServer(File file) throws IOException {
        FileInputStream fileInputStream = new FileInputStream(file);
        try {
            TransientBlobKey putTransient = blobServer.getTransientBlobService().putTransient(fileInputStream);
            fileInputStream.close();
            return putTransient;
        } catch (Throwable th) {
            try {
                fileInputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
