package org.apache.flink.runtime.webmonitor.threadinfo;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
import org.apache.flink.runtime.messages.ThreadInfoSample;
import org.apache.flink.runtime.taskexecutor.IdleTestTask;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.util.JvmUtils;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest.class */
class ThreadInfoRequestCoordinatorTest {
    private static final String REQUEST_TIMEOUT_MESSAGE = "Request timeout.";
    private static final int DEFAULT_NUMBER_OF_SAMPLES = 1;
    private static final int DEFAULT_MAX_STACK_TRACE_DEPTH = 100;
    private static ScheduledExecutorService executorService;
    private ThreadInfoRequestCoordinator coordinator;
    private static final Duration REQUEST_TIMEOUT = Duration.ofMillis(100);
    private static final Duration DEFAULT_DELAY_BETWEEN_SAMPLES = Duration.ofMillis(50);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest$CompletionType.class */
    public enum CompletionType {
        SUCCESSFULLY,
        EXCEPTIONALLY,
        TIMEOUT,
        NEVER_COMPLETE
    }

    ThreadInfoRequestCoordinatorTest() {
    }

    @BeforeAll
    static void setUp() throws Exception {
        executorService = new ScheduledThreadPoolExecutor(1);
    }

    @AfterAll
    static void tearDown() throws Exception {
        if (executorService != null) {
            executorService.shutdown();
        }
    }

    @BeforeEach
    void initCoordinator() {
        this.coordinator = new ThreadInfoRequestCoordinator(executorService, REQUEST_TIMEOUT);
    }

    @AfterEach
    void shutdownCoordinator() {
        if (this.coordinator != null) {
            Assertions.assertThat(this.coordinator.getNumberOfPendingRequests()).isZero();
            this.coordinator.shutDown();
        }
    }

    @Test
    void testSuccessfulThreadInfoRequest() throws Exception {
        VertexThreadInfoStats vertexThreadInfoStats = (VertexThreadInfoStats) this.coordinator.triggerThreadInfoRequest(createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.SUCCESSFULLY), 1, DEFAULT_DELAY_BETWEEN_SAMPLES, DEFAULT_MAX_STACK_TRACE_DEPTH).get();
        Assertions.assertThat(vertexThreadInfoStats.getRequestId()).isEqualTo(0);
        Iterator it = vertexThreadInfoStats.getSamplesBySubtask().values().iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((ThreadInfoSample) ((Collection) it.next()).iterator().next()).getStackTrace()).isNotEmpty();
        }
    }

    @Test
    void testThreadInfoRequestWithException() throws Exception {
        CompletableFuture triggerThreadInfoRequest = this.coordinator.triggerThreadInfoRequest(createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.EXCEPTIONALLY), 1, DEFAULT_DELAY_BETWEEN_SAMPLES, DEFAULT_MAX_STACK_TRACE_DEPTH);
        triggerThreadInfoRequest.getClass();
        Assertions.assertThatThrownBy(triggerThreadInfoRequest::get, "The request must be failed.", new Object[0]).isInstanceOf(ExecutionException.class).hasCauseInstanceOf(RuntimeException.class);
    }

    @Test
    void testThreadInfoRequestTimeout() throws Exception {
        CompletableFuture triggerThreadInfoRequest = this.coordinator.triggerThreadInfoRequest(createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.TIMEOUT), 1, DEFAULT_DELAY_BETWEEN_SAMPLES, DEFAULT_MAX_STACK_TRACE_DEPTH);
        try {
            triggerThreadInfoRequest.getClass();
            Assertions.assertThatThrownBy(triggerThreadInfoRequest::get, "The request must be failed.", new Object[0]).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(REQUEST_TIMEOUT_MESSAGE)});
        } finally {
            this.coordinator.shutDown();
        }
    }

    @Test
    void testShutDown() throws Exception {
        Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> createMockSubtaskWithGateways = createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.NEVER_COMPLETE);
        ArrayList arrayList = new ArrayList();
        CompletableFuture triggerThreadInfoRequest = this.coordinator.triggerThreadInfoRequest(createMockSubtaskWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, DEFAULT_MAX_STACK_TRACE_DEPTH);
        CompletableFuture triggerThreadInfoRequest2 = this.coordinator.triggerThreadInfoRequest(createMockSubtaskWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, DEFAULT_MAX_STACK_TRACE_DEPTH);
        arrayList.add(triggerThreadInfoRequest);
        arrayList.add(triggerThreadInfoRequest2);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assertions.assertThat((CompletableFuture) it.next()).isNotDone();
        }
        this.coordinator.shutDown();
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Assertions.assertThat((CompletableFuture) it2.next()).isCompletedExceptionally();
        }
        Assertions.assertThat(this.coordinator.triggerThreadInfoRequest(createMockSubtaskWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, DEFAULT_MAX_STACK_TRACE_DEPTH)).isCompletedExceptionally();
    }

    private static CompletableFuture<TaskExecutorThreadInfoGateway> createMockTaskManagerGateway(CompletionType completionType) throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        switch (completionType) {
            case SUCCESSFULLY:
                HashSet hashSet = new HashSet();
                IdleTestTask.executeWithTerminationGuarantee(() -> {
                    hashSet.add(new IdleTestTask());
                    hashSet.add(new IdleTestTask());
                    Map map = (Map) hashSet.stream().collect(Collectors.toMap(idleTestTask -> {
                        return Long.valueOf(idleTestTask.getExecutingThread().getId());
                    }, (v0) -> {
                        return v0.getExecutionId();
                    }));
                    completableFuture.complete(new TaskThreadInfoResponse((Map) JvmUtils.createThreadInfoSample(map.keySet(), DEFAULT_MAX_STACK_TRACE_DEPTH).entrySet().stream().collect(Collectors.toMap(entry -> {
                        return (ExecutionAttemptID) map.get(entry.getKey());
                    }, entry2 -> {
                        return Collections.singletonList(entry2.getValue());
                    }))));
                }, hashSet);
                break;
            case EXCEPTIONALLY:
                completableFuture.completeExceptionally(new RuntimeException("Request failed."));
                break;
            case TIMEOUT:
                executorService.schedule(() -> {
                    return Boolean.valueOf(completableFuture.completeExceptionally(new TimeoutException(REQUEST_TIMEOUT_MESSAGE)));
                }, REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
                break;
            case NEVER_COMPLETE:
                break;
            default:
                throw new RuntimeException("Unknown completion type.");
        }
        return CompletableFuture.completedFuture((collection, threadInfoSamplesRequest, time) -> {
            return completableFuture;
        });
    }

    private static Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> createMockSubtaskWithGateways(CompletionType... completionTypeArr) throws Exception {
        HashMap hashMap = new HashMap();
        for (CompletionType completionType : completionTypeArr) {
            hashMap.put(ImmutableSet.of(ExecutionGraphTestUtils.createExecutionAttemptId(), ExecutionGraphTestUtils.createExecutionAttemptId()), createMockTaskManagerGateway(completionType));
        }
        return hashMap;
    }
}
