/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.job;

import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class JobVertexBackPressureHandlerTest {
    private static final JobID TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE = new JobID();
    private static final JobID TEST_JOB_ID_BACK_PRESSURE_STATS_ABSENT = new JobID();
    private TestingRestfulGateway restfulGateway;
    private JobVertexBackPressureHandler jobVertexBackPressureHandler;

    @Before
    public void setUp() {
        this.restfulGateway = ((TestingRestfulGateway.Builder)new TestingRestfulGateway.Builder().setRequestOperatorBackPressureStatsFunction((jobId, jobVertexId) -> {
            if (jobId.equals((Object)TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE)) {
                return CompletableFuture.completedFuture(OperatorBackPressureStatsResponse.of((OperatorBackPressureStats)new OperatorBackPressureStats(4711, Integer.MAX_VALUE, new double[]{1.0, 0.5, 0.1})));
            }
            if (jobId.equals((Object)TEST_JOB_ID_BACK_PRESSURE_STATS_ABSENT)) {
                return CompletableFuture.completedFuture(OperatorBackPressureStatsResponse.of(null));
            }
            throw new AssertionError();
        })).build();
        this.jobVertexBackPressureHandler = new JobVertexBackPressureHandler(() -> CompletableFuture.completedFuture(this.restfulGateway), Time.seconds((long)10L), Collections.emptyMap(), (MessageHeaders)JobVertexBackPressureHeaders.getInstance());
    }

    @Test
    public void testGetBackPressure() throws Exception {
        HashMap<String, String> pathParameters = new HashMap<String, String>();
        pathParameters.put("jobid", TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString());
        pathParameters.put("vertexid", new JobVertexID().toString());
        HandlerRequest request = new HandlerRequest((RequestBody)EmptyRequestBody.getInstance(), (MessageParameters)new JobVertexMessageParameters(), pathParameters, Collections.emptyMap());
        CompletableFuture jobVertexBackPressureInfoCompletableFuture = this.jobVertexBackPressureHandler.handleRequest(request, (RestfulGateway)this.restfulGateway);
        JobVertexBackPressureInfo jobVertexBackPressureInfo = (JobVertexBackPressureInfo)jobVertexBackPressureInfoCompletableFuture.get();
        Assert.assertThat((Object)jobVertexBackPressureInfo.getStatus(), (Matcher)Matchers.equalTo((Object)JobVertexBackPressureInfo.VertexBackPressureStatus.OK));
        Assert.assertThat((Object)jobVertexBackPressureInfo.getBackpressureLevel(), (Matcher)Matchers.equalTo((Object)JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH));
        Assert.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getRatio).collect(Collectors.toList()), (Matcher)Matchers.contains((Object[])new Double[]{1.0, 0.5, 0.1}));
        Assert.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getBackpressureLevel).collect(Collectors.toList()), (Matcher)Matchers.contains((Object[])new JobVertexBackPressureInfo.VertexBackPressureLevel[]{JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH, JobVertexBackPressureInfo.VertexBackPressureLevel.LOW, JobVertexBackPressureInfo.VertexBackPressureLevel.OK}));
        Assert.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getSubtask).collect(Collectors.toList()), (Matcher)Matchers.contains((Object[])new Integer[]{0, 1, 2}));
    }

    @Test
    public void testAbsentBackPressure() throws Exception {
        HashMap<String, String> pathParameters = new HashMap<String, String>();
        pathParameters.put("jobid", TEST_JOB_ID_BACK_PRESSURE_STATS_ABSENT.toString());
        pathParameters.put("vertexid", new JobVertexID().toString());
        HandlerRequest request = new HandlerRequest((RequestBody)EmptyRequestBody.getInstance(), (MessageParameters)new JobVertexMessageParameters(), pathParameters, Collections.emptyMap());
        CompletableFuture jobVertexBackPressureInfoCompletableFuture = this.jobVertexBackPressureHandler.handleRequest(request, (RestfulGateway)this.restfulGateway);
        JobVertexBackPressureInfo jobVertexBackPressureInfo = (JobVertexBackPressureInfo)jobVertexBackPressureInfoCompletableFuture.get();
        Assert.assertThat((Object)jobVertexBackPressureInfo.getStatus(), (Matcher)Matchers.equalTo((Object)JobVertexBackPressureInfo.VertexBackPressureStatus.DEPRECATED));
    }
}

