package org.apache.flink.runtime.rest.handler.job;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.class */
class JobVertexBackPressureHandlerTest {
    private static final JobID TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE = new JobID();
    private static final JobVertexID TEST_JOB_VERTEX_ID = new JobVertexID();
    private static final JobID TEST_JOB_ID_BACK_PRESSURE_STATS_ABSENT = new JobID();
    private TestingRestfulGateway restfulGateway;
    private JobVertexBackPressureHandler jobVertexBackPressureHandler;
    private MetricStore metricStore;

    JobVertexBackPressureHandlerTest() {
    }

    private static Collection<MetricDump> getMetricDumps() {
        ArrayList arrayList = new ArrayList();
        QueryScopeInfo.TaskQueryScopeInfo taskQueryScopeInfo = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 0, 0);
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo, "backPressuredTimeMsPerSecond", "1000"));
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo, "idleTimeMsPerSecond", "0"));
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo, "busyTimeMsPerSecond", "0"));
        QueryScopeInfo.TaskQueryScopeInfo taskQueryScopeInfo2 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 1, 0);
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo2, "backPressuredTimeMsPerSecond", "500"));
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo2, "idleTimeMsPerSecond", "100"));
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo2, "busyTimeMsPerSecond", "900"));
        QueryScopeInfo.TaskQueryScopeInfo taskQueryScopeInfo3 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 3, 0);
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo3, "backPressuredTimeMsPerSecond", "100"));
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo3, "idleTimeMsPerSecond", "200"));
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo3, "busyTimeMsPerSecond", "700"));
        return arrayList;
    }

    @BeforeEach
    void setUp() {
        this.metricStore = new MetricStore();
        Iterator<MetricDump> it = getMetricDumps().iterator();
        while (it.hasNext()) {
            this.metricStore.add(it.next());
        }
        this.jobVertexBackPressureHandler = new JobVertexBackPressureHandler(() -> {
            return CompletableFuture.completedFuture(this.restfulGateway);
        }, Duration.ofSeconds(10L), Collections.emptyMap(), JobVertexBackPressureHeaders.getInstance(), new MetricFetcher() { // from class: org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandlerTest.1
            private long updateCount = 0;

            public MetricStore getMetricStore() {
                return JobVertexBackPressureHandlerTest.this.metricStore;
            }

            public void update() {
                this.updateCount++;
            }

            public long getLastUpdateTime() {
                return this.updateCount;
            }
        });
    }

    private static Collection<MetricDump> getMultipleAttemptsMetricDumps() {
        ArrayList arrayList = new ArrayList();
        QueryScopeInfo.TaskQueryScopeInfo taskQueryScopeInfo = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 0, 0);
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo, "backPressuredTimeMsPerSecond", "1000"));
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo, "idleTimeMsPerSecond", "0"));
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo, "busyTimeMsPerSecond", "0"));
        QueryScopeInfo.TaskQueryScopeInfo taskQueryScopeInfo2 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 0, 1);
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo2, "backPressuredTimeMsPerSecond", "200"));
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo2, "idleTimeMsPerSecond", "100"));
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo2, "busyTimeMsPerSecond", "800"));
        QueryScopeInfo.TaskQueryScopeInfo taskQueryScopeInfo3 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 1, 0);
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo3, "backPressuredTimeMsPerSecond", "500"));
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo3, "idleTimeMsPerSecond", "100"));
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo3, "busyTimeMsPerSecond", "900"));
        QueryScopeInfo.TaskQueryScopeInfo taskQueryScopeInfo4 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 1, 1);
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo4, "backPressuredTimeMsPerSecond", "900"));
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo4, "idleTimeMsPerSecond", "0"));
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo4, "busyTimeMsPerSecond", "100"));
        QueryScopeInfo.TaskQueryScopeInfo taskQueryScopeInfo5 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 3, 0);
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo5, "backPressuredTimeMsPerSecond", "100"));
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo5, "idleTimeMsPerSecond", "200"));
        arrayList.add(new MetricDump.GaugeDump(taskQueryScopeInfo5, "busyTimeMsPerSecond", "700"));
        return arrayList;
    }

    @Test
    void testGetBackPressure() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("jobid", TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString());
        hashMap.put("vertexid", TEST_JOB_VERTEX_ID.toString());
        JobVertexBackPressureInfo jobVertexBackPressureInfo = (JobVertexBackPressureInfo) this.jobVertexBackPressureHandler.handleRequest(HandlerRequest.resolveParametersAndCreate(EmptyRequestBody.getInstance(), new JobVertexMessageParameters(), hashMap, Collections.emptyMap(), Collections.emptyList()), this.restfulGateway).get();
        Assertions.assertThat(jobVertexBackPressureInfo.getStatus()).isEqualTo(JobVertexBackPressureInfo.VertexBackPressureStatus.OK);
        Assertions.assertThat(jobVertexBackPressureInfo.getBackpressureLevel()).isEqualTo(JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH);
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getBackPressuredRatio();
        }).collect(Collectors.toList())).containsExactly(new Double[]{Double.valueOf(1.0d), Double.valueOf(0.5d), Double.valueOf(0.1d)});
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getIdleRatio();
        }).collect(Collectors.toList())).containsExactly(new Double[]{Double.valueOf(0.0d), Double.valueOf(0.1d), Double.valueOf(0.2d)});
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getBusyRatio();
        }).collect(Collectors.toList())).containsExactly(new Double[]{Double.valueOf(0.0d), Double.valueOf(0.9d), Double.valueOf(0.7d)});
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getBackpressureLevel();
        }).collect(Collectors.toList())).containsExactly(new JobVertexBackPressureInfo.VertexBackPressureLevel[]{JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH, JobVertexBackPressureInfo.VertexBackPressureLevel.LOW, JobVertexBackPressureInfo.VertexBackPressureLevel.OK});
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getSubtask();
        }).collect(Collectors.toList())).containsExactly(new Integer[]{0, 1, 3});
    }

    @Test
    void testAbsentBackPressure() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("jobid", TEST_JOB_ID_BACK_PRESSURE_STATS_ABSENT.toString());
        hashMap.put("vertexid", new JobVertexID().toString());
        Assertions.assertThat(((JobVertexBackPressureInfo) this.jobVertexBackPressureHandler.handleRequest(HandlerRequest.resolveParametersAndCreate(EmptyRequestBody.getInstance(), new JobVertexMessageParameters(), hashMap, Collections.emptyMap(), Collections.emptyList()), this.restfulGateway).get()).getStatus()).isEqualTo(JobVertexBackPressureInfo.VertexBackPressureStatus.DEPRECATED);
    }

    @Test
    void testGetBackPressureFromMultipleCurrentAttempts() throws Exception {
        final MetricStore metricStore = new MetricStore();
        Iterator<MetricDump> it = getMultipleAttemptsMetricDumps().iterator();
        while (it.hasNext()) {
            metricStore.add(it.next());
        }
        HashMap hashMap = new HashMap();
        hashMap.put(0, 1);
        hashMap.put(1, 0);
        metricStore.getRepresentativeAttempts().put(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), Collections.singletonMap(TEST_JOB_VERTEX_ID.toString(), hashMap));
        JobVertexBackPressureHandler jobVertexBackPressureHandler = new JobVertexBackPressureHandler(() -> {
            return CompletableFuture.completedFuture(this.restfulGateway);
        }, Duration.ofSeconds(10L), Collections.emptyMap(), JobVertexBackPressureHeaders.getInstance(), new MetricFetcher() { // from class: org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandlerTest.2
            private long updateCount = 0;

            public MetricStore getMetricStore() {
                return metricStore;
            }

            public void update() {
                this.updateCount++;
            }

            public long getLastUpdateTime() {
                return this.updateCount;
            }
        });
        HashMap hashMap2 = new HashMap();
        hashMap2.put("jobid", TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString());
        hashMap2.put("vertexid", TEST_JOB_VERTEX_ID.toString());
        JobVertexBackPressureInfo jobVertexBackPressureInfo = (JobVertexBackPressureInfo) jobVertexBackPressureHandler.handleRequest(HandlerRequest.resolveParametersAndCreate(EmptyRequestBody.getInstance(), new JobVertexMessageParameters(), hashMap2, Collections.emptyMap(), Collections.emptyList()), this.restfulGateway).get();
        Assertions.assertThat(jobVertexBackPressureInfo.getStatus()).isEqualTo(JobVertexBackPressureInfo.VertexBackPressureStatus.OK);
        Assertions.assertThat(jobVertexBackPressureInfo.getBackpressureLevel()).isEqualTo(JobVertexBackPressureInfo.VertexBackPressureLevel.LOW);
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getAttemptNumber();
        }).collect(Collectors.toList())).containsExactly(new Integer[]{1, 0, null});
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getOtherConcurrentAttempts();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).flatMap((v0) -> {
            return v0.stream();
        }).map((v0) -> {
            return v0.getAttemptNumber();
        }).collect(Collectors.toList())).containsExactly(new Integer[]{0, 1});
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getBackPressuredRatio();
        }).collect(Collectors.toList())).containsExactly(new Double[]{Double.valueOf(0.2d), Double.valueOf(0.5d), Double.valueOf(0.1d)});
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getOtherConcurrentAttempts();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).flatMap((v0) -> {
            return v0.stream();
        }).map((v0) -> {
            return v0.getBackPressuredRatio();
        }).collect(Collectors.toList())).containsExactly(new Double[]{Double.valueOf(1.0d), Double.valueOf(0.9d)});
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getIdleRatio();
        }).collect(Collectors.toList())).containsExactly(new Double[]{Double.valueOf(0.1d), Double.valueOf(0.1d), Double.valueOf(0.2d)});
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getOtherConcurrentAttempts();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).flatMap((v0) -> {
            return v0.stream();
        }).map((v0) -> {
            return v0.getIdleRatio();
        }).collect(Collectors.toList())).containsExactly(new Double[]{Double.valueOf(0.0d), Double.valueOf(0.0d)});
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getBusyRatio();
        }).collect(Collectors.toList())).containsExactly(new Double[]{Double.valueOf(0.8d), Double.valueOf(0.9d), Double.valueOf(0.7d)});
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getOtherConcurrentAttempts();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).flatMap((v0) -> {
            return v0.stream();
        }).map((v0) -> {
            return v0.getBusyRatio();
        }).collect(Collectors.toList())).containsExactly(new Double[]{Double.valueOf(0.0d), Double.valueOf(0.1d)});
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getBackpressureLevel();
        }).collect(Collectors.toList())).containsExactly(new JobVertexBackPressureInfo.VertexBackPressureLevel[]{JobVertexBackPressureInfo.VertexBackPressureLevel.LOW, JobVertexBackPressureInfo.VertexBackPressureLevel.LOW, JobVertexBackPressureInfo.VertexBackPressureLevel.OK});
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getOtherConcurrentAttempts();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).flatMap((v0) -> {
            return v0.stream();
        }).map((v0) -> {
            return v0.getBackpressureLevel();
        }).collect(Collectors.toList())).containsExactly(new JobVertexBackPressureInfo.VertexBackPressureLevel[]{JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH, JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH});
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getSubtask();
        }).collect(Collectors.toList())).containsExactly(new Integer[]{0, 1, 3});
        Assertions.assertThat((List) jobVertexBackPressureInfo.getSubtasks().stream().map((v0) -> {
            return v0.getOtherConcurrentAttempts();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).flatMap((v0) -> {
            return v0.stream();
        }).map((v0) -> {
            return v0.getSubtask();
        }).collect(Collectors.toList())).containsExactly(new Integer[]{0, 1});
    }
}
