/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.metrics;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.events.Event;
import org.apache.flink.events.EventBuilder;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.metrics.AllSubTasksRunningOrFinishedStateTimeMetrics;
import org.apache.flink.runtime.scheduler.metrics.ExecutionStatusMetricsRegistrar;
import org.apache.flink.runtime.scheduler.metrics.StateTimeMetricTest;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class AllSubTasksRunningOrFinishedStateTimeMetricsTest {
    private static final Executor DIRECT_EXECUTOR = Runnable::run;
    private static final Duration TIMEOUT = Duration.ofSeconds(10L);
    private static final MetricOptions.JobStatusMetricsSettings settings = StateTimeMetricTest.enable(MetricOptions.JobStatusMetrics.STATE, MetricOptions.JobStatusMetrics.CURRENT_TIME, MetricOptions.JobStatusMetrics.TOTAL_TIME);

    AllSubTasksRunningOrFinishedStateTimeMetricsTest() {
    }

    @Test
    void testInitialValues() {
        ManualClock clock = new ManualClock(Duration.ofMillis(5L).toNanos());
        AllSubTasksRunningOrFinishedStateTimeMetrics runningMetrics = new AllSubTasksRunningOrFinishedStateTimeMetrics(JobType.STREAMING, settings, (Clock)clock);
        Assertions.assertThat((long)runningMetrics.getCurrentTime()).isEqualTo(0L);
        Assertions.assertThat((long)runningMetrics.getTotalTime()).isEqualTo(0L);
        Assertions.assertThat((long)runningMetrics.getBinary()).isEqualTo(0L);
    }

    @Test
    void testScheduledToFinishedWithSingleExecutionNoFailure() {
        AllSubTasksRunningOrFinishedStateTimeMetrics metrics = new AllSubTasksRunningOrFinishedStateTimeMetrics(JobType.STREAMING, settings);
        Execution exec1 = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newExecution();
        metrics.onStateUpdate(exec1, ExecutionState.CREATED, ExecutionState.SCHEDULED);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.INITIALIZING, ExecutionState.RUNNING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(1L);
        metrics.onStateUpdate(exec1, ExecutionState.RUNNING, ExecutionState.FINISHED);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(1L);
    }

    @Test
    void testScheduledToFinishedWithSingleExecutionWithFailure() {
        AllSubTasksRunningOrFinishedStateTimeMetrics metrics = new AllSubTasksRunningOrFinishedStateTimeMetrics(JobType.STREAMING, settings);
        Execution exec1 = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newExecution();
        metrics.onStateUpdate(exec1, ExecutionState.CREATED, ExecutionState.SCHEDULED);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.INITIALIZING, ExecutionState.RUNNING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(1L);
        metrics.onStateUpdate(exec1, ExecutionState.RUNNING, ExecutionState.CANCELING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.CANCELING, ExecutionState.CANCELED);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        Execution exec2 = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newAttempt(exec1);
        metrics.onStateUpdate(exec2, ExecutionState.CREATED, ExecutionState.SCHEDULED);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec2, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec2, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec2, ExecutionState.INITIALIZING, ExecutionState.RUNNING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(1L);
        metrics.onStateUpdate(exec2, ExecutionState.RUNNING, ExecutionState.FINISHED);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(1L);
    }

    @Test
    void testScheduledToFinishedWithMultipleExecutionsNoFailure() {
        AllSubTasksRunningOrFinishedStateTimeMetrics metrics = new AllSubTasksRunningOrFinishedStateTimeMetrics(JobType.STREAMING, settings);
        Execution exec1 = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newExecution();
        Execution exec2 = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newExecution();
        metrics.onStateUpdate(exec1, ExecutionState.CREATED, ExecutionState.SCHEDULED);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec2, ExecutionState.CREATED, ExecutionState.SCHEDULED);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec2, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec2, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.INITIALIZING, ExecutionState.RUNNING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec2, ExecutionState.INITIALIZING, ExecutionState.RUNNING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(1L);
        metrics.onStateUpdate(exec1, ExecutionState.RUNNING, ExecutionState.FINISHED);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(1L);
        metrics.onStateUpdate(exec2, ExecutionState.RUNNING, ExecutionState.FINISHED);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(1L);
    }

    @Test
    void testScheduledToFinishedWithMultipleExecutionsWithFailure() {
        AllSubTasksRunningOrFinishedStateTimeMetrics metrics = new AllSubTasksRunningOrFinishedStateTimeMetrics(JobType.STREAMING, settings);
        Execution exec1 = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newExecution();
        Execution exec2 = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newExecution();
        metrics.onStateUpdate(exec1, ExecutionState.CREATED, ExecutionState.SCHEDULED);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec2, ExecutionState.CREATED, ExecutionState.SCHEDULED);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec2, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec2, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.INITIALIZING, ExecutionState.RUNNING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec2, ExecutionState.INITIALIZING, ExecutionState.RUNNING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(1L);
        metrics.onStateUpdate(exec1, ExecutionState.RUNNING, ExecutionState.CANCELING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.CANCELING, ExecutionState.CANCELED);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        Execution id1V2 = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newAttempt(exec1);
        metrics.onStateUpdate(id1V2, ExecutionState.CREATED, ExecutionState.SCHEDULED);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(id1V2, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(id1V2, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(0L);
        metrics.onStateUpdate(id1V2, ExecutionState.INITIALIZING, ExecutionState.RUNNING);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(1L);
        metrics.onStateUpdate(id1V2, ExecutionState.RUNNING, ExecutionState.FINISHED);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(1L);
        metrics.onStateUpdate(exec2, ExecutionState.RUNNING, ExecutionState.FINISHED);
        Assertions.assertThat((long)metrics.getBinary()).isEqualTo(1L);
    }

    @Test
    void testGetCurrentTime() {
        ManualClock clock = new ManualClock(Duration.ofMillis(5L).toNanos());
        AllSubTasksRunningOrFinishedStateTimeMetrics metrics = new AllSubTasksRunningOrFinishedStateTimeMetrics(JobType.STREAMING, settings, (Clock)clock);
        Execution exec1 = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newExecution();
        metrics.onStateUpdate(exec1, ExecutionState.CREATED, ExecutionState.SCHEDULED);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getCurrentTime()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getCurrentTime()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getCurrentTime()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.INITIALIZING, ExecutionState.RUNNING);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getCurrentTime()).isEqualTo(5L);
        clock.advanceTime(Duration.ofMillis(10L));
        Assertions.assertThat((long)metrics.getCurrentTime()).isEqualTo(15L);
        metrics.onStateUpdate(exec1, ExecutionState.RUNNING, ExecutionState.CANCELING);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getCurrentTime()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.CANCELING, ExecutionState.CANCELED);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getCurrentTime()).isEqualTo(0L);
        Execution exec2 = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newAttempt(exec1);
        metrics.onStateUpdate(exec2, ExecutionState.CREATED, ExecutionState.SCHEDULED);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getCurrentTime()).isEqualTo(0L);
        metrics.onStateUpdate(exec2, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getCurrentTime()).isEqualTo(0L);
        metrics.onStateUpdate(exec2, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getCurrentTime()).isEqualTo(0L);
        metrics.onStateUpdate(exec2, ExecutionState.INITIALIZING, ExecutionState.RUNNING);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getCurrentTime()).isEqualTo(5L);
        metrics.onStateUpdate(exec2, ExecutionState.RUNNING, ExecutionState.FINISHED);
        clock.advanceTime(Duration.ofMillis(10L));
        Assertions.assertThat((long)metrics.getCurrentTime()).isEqualTo(15L);
    }

    @Test
    void testGetTotalTime() {
        ManualClock clock = new ManualClock(Duration.ofMillis(5L).toNanos());
        AllSubTasksRunningOrFinishedStateTimeMetrics metrics = new AllSubTasksRunningOrFinishedStateTimeMetrics(JobType.STREAMING, settings, (Clock)clock);
        Execution exec1 = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newExecution();
        metrics.onStateUpdate(exec1, ExecutionState.CREATED, ExecutionState.SCHEDULED);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getTotalTime()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getTotalTime()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getTotalTime()).isEqualTo(0L);
        metrics.onStateUpdate(exec1, ExecutionState.INITIALIZING, ExecutionState.RUNNING);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getTotalTime()).isEqualTo(5L);
        clock.advanceTime(Duration.ofMillis(10L));
        Assertions.assertThat((long)metrics.getTotalTime()).isEqualTo(15L);
        metrics.onStateUpdate(exec1, ExecutionState.RUNNING, ExecutionState.CANCELING);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getTotalTime()).isEqualTo(15L);
        metrics.onStateUpdate(exec1, ExecutionState.CANCELING, ExecutionState.CANCELED);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getTotalTime()).isEqualTo(15L);
        Execution exec2 = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newAttempt(exec1);
        metrics.onStateUpdate(exec2, ExecutionState.CREATED, ExecutionState.SCHEDULED);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getTotalTime()).isEqualTo(15L);
        metrics.onStateUpdate(exec2, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getTotalTime()).isEqualTo(15L);
        metrics.onStateUpdate(exec2, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getTotalTime()).isEqualTo(15L);
        metrics.onStateUpdate(exec2, ExecutionState.INITIALIZING, ExecutionState.RUNNING);
        clock.advanceTime(Duration.ofMillis(5L));
        Assertions.assertThat((long)metrics.getTotalTime()).isEqualTo(20L);
        metrics.onStateUpdate(exec2, ExecutionState.RUNNING, ExecutionState.FINISHED);
        clock.advanceTime(Duration.ofMillis(10L));
        Assertions.assertThat((long)metrics.getTotalTime()).isEqualTo(30L);
    }

    @Test
    void testStatusEvents() {
        ManualClock clock = new ManualClock(Duration.ofMillis(5L).toNanos());
        AllSubTasksRunningOrFinishedStateTimeMetrics metrics = new AllSubTasksRunningOrFinishedStateTimeMetrics(JobType.STREAMING, settings, (Clock)clock);
        final ArrayList events = new ArrayList();
        metrics.registerMetrics((MetricGroup)new UnregisteredMetricsGroup(){

            public void addEvent(EventBuilder eventBuilder) {
                events.add(eventBuilder.build(this.getAllVariables()));
            }
        });
        Execution exec1 = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newExecution();
        metrics.onStateUpdate(exec1, ExecutionState.CREATED, ExecutionState.SCHEDULED);
        Assertions.assertThat(events).isEmpty();
        metrics.onStateUpdate(exec1, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
        Assertions.assertThat(events).isEmpty();
        metrics.onStateUpdate(exec1, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
        Assertions.assertThat(events).isEmpty();
        metrics.onStateUpdate(exec1, ExecutionState.INITIALIZING, ExecutionState.RUNNING);
        Assertions.assertThat(events).hasSize(1);
        Assertions.assertThat((Map)((Event)events.remove(0)).getAttributes()).containsEntry((Object)"status", (Object)AllSubTasksRunningOrFinishedStateTimeMetrics.Status.ALL_RUNNING_OR_FINISHED.toString());
        metrics.onStateUpdate(exec1, ExecutionState.RUNNING, ExecutionState.CANCELING);
        Assertions.assertThat(events).hasSize(1);
        Assertions.assertThat((Map)((Event)events.remove(0)).getAttributes()).containsEntry((Object)"status", (Object)AllSubTasksRunningOrFinishedStateTimeMetrics.Status.NOT_ALL_RUNNING_OR_FINISHED.toString());
        metrics.onStateUpdate(exec1, ExecutionState.CANCELING, ExecutionState.CANCELED);
        Assertions.assertThat(events).isEmpty();
        Execution exec2 = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newAttempt(exec1);
        metrics.onStateUpdate(exec2, ExecutionState.CREATED, ExecutionState.SCHEDULED);
        Assertions.assertThat(events).isEmpty();
        metrics.onStateUpdate(exec2, ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
        Assertions.assertThat(events).isEmpty();
        metrics.onStateUpdate(exec2, ExecutionState.DEPLOYING, ExecutionState.INITIALIZING);
        Assertions.assertThat(events).isEmpty();
        metrics.onStateUpdate(exec2, ExecutionState.INITIALIZING, ExecutionState.RUNNING);
        Assertions.assertThat(events).hasSize(1);
        Assertions.assertThat((Map)((Event)events.remove(0)).getAttributes()).containsEntry((Object)"status", (Object)AllSubTasksRunningOrFinishedStateTimeMetrics.Status.ALL_RUNNING_OR_FINISHED.toString());
        metrics.onStateUpdate(exec2, ExecutionState.RUNNING, ExecutionState.FINISHED);
        Assertions.assertThat(events).isEmpty();
    }

    @Test
    void testScaleDown() {
        AllSubTasksRunningOrFinishedStateTimeMetrics metrics = new AllSubTasksRunningOrFinishedStateTimeMetrics(JobType.STREAMING, settings);
        final ArrayList events = new ArrayList();
        metrics.registerMetrics((MetricGroup)new UnregisteredMetricsGroup(){

            public void addEvent(EventBuilder eventBuilder) {
                events.add(eventBuilder.build(this.getAllVariables()));
            }
        });
        JobVertex jobVertex = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newJobVertex();
        List<Execution> originalExecutions = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newExecutions(2, jobVertex);
        AllSubTasksRunningOrFinishedStateTimeMetricsTest.goThroughExecutionStatesToRunningJob((ExecutionStatusMetricsRegistrar)metrics, originalExecutions);
        Assertions.assertThat((Map)((Event)events.remove(0)).getAttributes()).containsEntry((Object)"status", (Object)AllSubTasksRunningOrFinishedStateTimeMetrics.Status.ALL_RUNNING_OR_FINISHED.toString());
        AllSubTasksRunningOrFinishedStateTimeMetricsTest.goThroughExecutionStatesFromRunningJobToCanceled((ExecutionStatusMetricsRegistrar)metrics, originalExecutions);
        Assertions.assertThat((Map)((Event)events.remove(0)).getAttributes()).containsEntry((Object)"status", (Object)AllSubTasksRunningOrFinishedStateTimeMetrics.Status.NOT_ALL_RUNNING_OR_FINISHED.toString());
        List<Execution> postScaleDownExecutions = AllSubTasksRunningOrFinishedStateTimeMetricsTest.newExecutions(1, jobVertex);
        AllSubTasksRunningOrFinishedStateTimeMetricsTest.goThroughExecutionStatesToRunningJob((ExecutionStatusMetricsRegistrar)metrics, postScaleDownExecutions);
        Assertions.assertThat((Map)((Event)events.remove(0)).getAttributes()).containsEntry((Object)"status", (Object)AllSubTasksRunningOrFinishedStateTimeMetrics.Status.ALL_RUNNING_OR_FINISHED.toString());
    }

    private static void goThroughExecutionStatesToRunningJob(ExecutionStatusMetricsRegistrar metrics, Collection<Execution> executions) {
        for (Execution execution : executions) {
            metrics.onStateUpdate(execution.getAttemptId(), ExecutionState.CREATED, ExecutionState.SCHEDULED);
        }
        for (Execution execution : executions) {
            metrics.onStateUpdate(execution.getAttemptId(), ExecutionState.SCHEDULED, ExecutionState.DEPLOYING);
            metrics.onStateUpdate(execution.getAttemptId(), ExecutionState.DEPLOYING, ExecutionState.RUNNING);
        }
    }

    private static void goThroughExecutionStatesFromRunningJobToCanceled(ExecutionStatusMetricsRegistrar metrics, Collection<Execution> executions) {
        for (Execution execution : executions) {
            metrics.onStateUpdate(execution.getAttemptId(), ExecutionState.RUNNING, ExecutionState.CANCELING);
            metrics.onStateUpdate(execution.getAttemptId(), ExecutionState.CANCELING, ExecutionState.CANCELED);
        }
    }

    private static Execution newExecution() {
        return AllSubTasksRunningOrFinishedStateTimeMetricsTest.newExecutions(1, AllSubTasksRunningOrFinishedStateTimeMetricsTest.newJobVertex()).get(0);
    }

    private static List<Execution> newExecutions(int parallelism, JobVertex jv) {
        try {
            DefaultExecutionGraph eg = TestingDefaultExecutionGraphBuilder.newBuilder().buildDynamicGraph(new DirectScheduledExecutorService());
            ExecutionJobVertex ejv = new ExecutionJobVertex((InternalExecutionGraphAccessor)eg, jv, (VertexParallelismInformation)new DefaultVertexParallelismInfo(parallelism, parallelism, ignored -> Optional.empty()), (CoordinatorStore)new CoordinatorStoreImpl(), UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
            return IntStream.range(0, parallelism).mapToObj(subtaskIndex -> AllSubTasksRunningOrFinishedStateTimeMetricsTest.newExecutionVertex(ejv, subtaskIndex)).map(ev -> new Execution(DIRECT_EXECUTOR, ev, 0, 0L, TIMEOUT)).collect(Collectors.toList());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static JobVertex newJobVertex() {
        return ExecutionGraphTestUtils.createJobVertex("task1", 1, NoOpInvokable.class);
    }

    private static ExecutionVertex newExecutionVertex(ExecutionJobVertex ejv, int subtaskIndex) {
        return new ExecutionVertex(ejv, subtaskIndex, new IntermediateResult[0], TIMEOUT, 0L, 1, 0);
    }

    private static Execution newAttempt(Execution execution) {
        return new Execution(DIRECT_EXECUTOR, execution.getVertex(), execution.getAttemptNumber() + 1, System.currentTimeMillis(), TIMEOUT);
    }
}

