package org.apache.flink.runtime.webmonitor.threadinfo;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
import org.apache.flink.runtime.messages.ThreadInfoSample;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.taskmanager.TaskTest;
import org.apache.flink.runtime.util.JvmUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest.class */
public class ThreadInfoRequestCoordinatorTest extends TestLogger {
    private static final String REQUEST_TIMEOUT_MESSAGE = "Request timeout.";
    private static final int DEFAULT_NUMBER_OF_SAMPLES = 1;
    private static final int DEFAULT_MAX_STACK_TRACE_DEPTH = 100;
    private static ScheduledExecutorService executorService;
    private ThreadInfoRequestCoordinator coordinator;

    @Rule
    public Timeout caseTimeout = new Timeout(10, TimeUnit.SECONDS);
    private static final Duration REQUEST_TIMEOUT = Duration.ofMillis(100);
    private static final Duration DEFAULT_DELAY_BETWEEN_SAMPLES = Duration.ofMillis(50);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoRequestCoordinatorTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$webmonitor$threadinfo$ThreadInfoRequestCoordinatorTest$CompletionType = new int[CompletionType.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$webmonitor$threadinfo$ThreadInfoRequestCoordinatorTest$CompletionType[CompletionType.SUCCESSFULLY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$webmonitor$threadinfo$ThreadInfoRequestCoordinatorTest$CompletionType[CompletionType.EXCEPTIONALLY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$webmonitor$threadinfo$ThreadInfoRequestCoordinatorTest$CompletionType[CompletionType.TIMEOUT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$webmonitor$threadinfo$ThreadInfoRequestCoordinatorTest$CompletionType[CompletionType.NEVER_COMPLETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/ThreadInfoRequestCoordinatorTest$CompletionType.class */
    public enum CompletionType {
        SUCCESSFULLY,
        EXCEPTIONALLY,
        TIMEOUT,
        NEVER_COMPLETE
    }

    @BeforeClass
    public static void setUp() throws Exception {
        executorService = new ScheduledThreadPoolExecutor(1);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (executorService != null) {
            executorService.shutdown();
        }
    }

    @Before
    public void initCoordinator() throws Exception {
        this.coordinator = new ThreadInfoRequestCoordinator(executorService, REQUEST_TIMEOUT);
    }

    @After
    public void shutdownCoordinator() throws Exception {
        if (this.coordinator != null) {
            Assert.assertEquals(0L, this.coordinator.getNumberOfPendingRequests());
            this.coordinator.shutDown();
        }
    }

    @Test
    public void testSuccessfulThreadInfoRequest() throws Exception {
        JobVertexThreadInfoStats jobVertexThreadInfoStats = (JobVertexThreadInfoStats) this.coordinator.triggerThreadInfoRequest(createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.SUCCESSFULLY), 1, DEFAULT_DELAY_BETWEEN_SAMPLES, DEFAULT_MAX_STACK_TRACE_DEPTH).get();
        Assert.assertEquals(0L, jobVertexThreadInfoStats.getRequestId());
        Iterator it = jobVertexThreadInfoStats.getSamplesBySubtask().values().iterator();
        while (it.hasNext()) {
            MatcherAssert.assertThat(((ThreadInfoSample) ((List) it.next()).get(0)).getStackTrace(), Matchers.not(Matchers.emptyArray()));
        }
    }

    @Test
    public void testThreadInfoRequestWithException() throws Exception {
        try {
            this.coordinator.triggerThreadInfoRequest(createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.EXCEPTIONALLY), 1, DEFAULT_DELAY_BETWEEN_SAMPLES, DEFAULT_MAX_STACK_TRACE_DEPTH).get();
            Assert.fail("Exception expected.");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof RuntimeException);
        }
    }

    @Test
    public void testThreadInfoRequestTimeout() throws Exception {
        try {
            try {
                this.coordinator.triggerThreadInfoRequest(createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.TIMEOUT), 1, DEFAULT_DELAY_BETWEEN_SAMPLES, DEFAULT_MAX_STACK_TRACE_DEPTH).get();
                Assert.fail("Exception expected.");
                this.coordinator.shutDown();
            } catch (ExecutionException e) {
                Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, REQUEST_TIMEOUT_MESSAGE).isPresent());
                this.coordinator.shutDown();
            }
        } catch (Throwable th) {
            this.coordinator.shutDown();
            throw th;
        }
    }

    @Test
    public void testShutDown() throws Exception {
        Map<ExecutionAttemptID, CompletableFuture<TaskExecutorThreadInfoGateway>> createMockSubtaskWithGateways = createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.TIMEOUT);
        ArrayList arrayList = new ArrayList();
        CompletableFuture triggerThreadInfoRequest = this.coordinator.triggerThreadInfoRequest(createMockSubtaskWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, DEFAULT_MAX_STACK_TRACE_DEPTH);
        CompletableFuture triggerThreadInfoRequest2 = this.coordinator.triggerThreadInfoRequest(createMockSubtaskWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, DEFAULT_MAX_STACK_TRACE_DEPTH);
        arrayList.add(triggerThreadInfoRequest);
        arrayList.add(triggerThreadInfoRequest2);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertFalse(((CompletableFuture) it.next()).isDone());
        }
        this.coordinator.shutDown();
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Assert.assertTrue(((CompletableFuture) it2.next()).isCompletedExceptionally());
        }
        Assert.assertTrue(this.coordinator.triggerThreadInfoRequest(createMockSubtaskWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, DEFAULT_MAX_STACK_TRACE_DEPTH).isCompletedExceptionally());
    }

    private static CompletableFuture<TaskExecutorThreadInfoGateway> createMockTaskManagerGateway(CompletionType completionType) {
        CompletableFuture completableFuture = new CompletableFuture();
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$webmonitor$threadinfo$ThreadInfoRequestCoordinatorTest$CompletionType[completionType.ordinal()]) {
            case 1:
                completableFuture.complete(new TaskThreadInfoResponse(Collections.singletonList((ThreadInfoSample) JvmUtils.createThreadInfoSample(Thread.currentThread().getId(), DEFAULT_MAX_STACK_TRACE_DEPTH).get())));
                break;
            case TaskTest.InvokableDeclingingCheckpoints.REJECTED_EXECUTION_CHECKPOINT_ID /* 2 */:
                completableFuture.completeExceptionally(new RuntimeException("Request failed."));
                break;
            case TaskTest.InvokableDeclingingCheckpoints.THROWING_CHECKPOINT_ID /* 3 */:
                executorService.schedule(() -> {
                    return Boolean.valueOf(completableFuture.completeExceptionally(new TimeoutException(REQUEST_TIMEOUT_MESSAGE)));
                }, REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
                break;
            case 4:
                break;
            default:
                throw new RuntimeException("Unknown completion type.");
        }
        return CompletableFuture.completedFuture((executionAttemptID, threadInfoSamplesRequest, time) -> {
            return completableFuture;
        });
    }

    private static Map<ExecutionAttemptID, CompletableFuture<TaskExecutorThreadInfoGateway>> createMockSubtaskWithGateways(CompletionType... completionTypeArr) {
        HashMap hashMap = new HashMap();
        for (CompletionType completionType : completionTypeArr) {
            hashMap.put(new ExecutionAttemptID(), createMockTaskManagerGateway(completionType));
        }
        return hashMap;
    }
}
