/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover.flip1;

import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.runtime.topology.Vertex;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ExecutionFailureHandlerTest
extends TestLogger {
    private static final long RESTART_DELAY_MS = 1234L;
    private SchedulingTopology schedulingTopology;
    private TestFailoverStrategy failoverStrategy;
    private TestRestartBackoffTimeStrategy backoffTimeStrategy;
    private ExecutionFailureHandler executionFailureHandler;

    @Before
    public void setUp() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        topology.newExecutionVertex();
        this.schedulingTopology = topology;
        this.failoverStrategy = new TestFailoverStrategy();
        this.backoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 1234L);
        this.executionFailureHandler = new ExecutionFailureHandler(this.schedulingTopology, (FailoverStrategy)this.failoverStrategy, (RestartBackoffTimeStrategy)this.backoffTimeStrategy);
    }

    @Test
    public void testNormalFailureHandling() {
        Set<ExecutionVertexID> tasksToRestart = Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0));
        this.failoverStrategy.setTasksToRestart(tasksToRestart);
        FailureHandlingResult result = this.executionFailureHandler.getFailureHandlingResult(new ExecutionVertexID(new JobVertexID(), 0), (Throwable)new Exception("test failure"));
        Assert.assertTrue((boolean)result.canRestart());
        Assert.assertEquals((long)1234L, (long)result.getRestartDelayMS());
        Assert.assertEquals(tasksToRestart, (Object)result.getVerticesToRestart());
        try {
            result.getError();
            Assert.fail((String)"Cannot get error when the restarting is accepted");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)this.executionFailureHandler.getNumberOfRestarts());
    }

    @Test
    public void testRestartingSuppressedFailureHandlingResult() {
        this.backoffTimeStrategy.setCanRestart(false);
        FailureHandlingResult result = this.executionFailureHandler.getFailureHandlingResult(new ExecutionVertexID(new JobVertexID(), 0), (Throwable)new Exception("test failure"));
        Assert.assertFalse((boolean)result.canRestart());
        Assert.assertNotNull((Object)result.getError());
        Assert.assertFalse((boolean)ExecutionFailureHandler.isUnrecoverableError((Throwable)result.getError()));
        try {
            result.getVerticesToRestart();
            Assert.fail((String)"get tasks to restart is not allowed when restarting is suppressed");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        try {
            result.getRestartDelayMS();
            Assert.fail((String)"get restart delay is not allowed when restarting is suppressed");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        Assert.assertEquals((long)0L, (long)this.executionFailureHandler.getNumberOfRestarts());
    }

    @Test
    public void testNonRecoverableFailureHandlingResult() {
        FailureHandlingResult result = this.executionFailureHandler.getFailureHandlingResult(new ExecutionVertexID(new JobVertexID(), 0), (Throwable)new Exception((Throwable)new SuppressRestartsException((Throwable)new Exception("test failure"))));
        Assert.assertFalse((boolean)result.canRestart());
        Assert.assertNotNull((Object)result.getError());
        Assert.assertTrue((boolean)ExecutionFailureHandler.isUnrecoverableError((Throwable)result.getError()));
        try {
            result.getVerticesToRestart();
            Assert.fail((String)"get tasks to restart is not allowed when restarting is suppressed");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        try {
            result.getRestartDelayMS();
            Assert.fail((String)"get restart delay is not allowed when restarting is suppressed");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        Assert.assertEquals((long)0L, (long)this.executionFailureHandler.getNumberOfRestarts());
    }

    @Test
    public void testUnrecoverableErrorCheck() {
        Assert.assertFalse((boolean)ExecutionFailureHandler.isUnrecoverableError((Throwable)new Exception()));
        Assert.assertTrue((boolean)ExecutionFailureHandler.isUnrecoverableError((Throwable)new SuppressRestartsException((Throwable)new Exception())));
        Assert.assertTrue((boolean)ExecutionFailureHandler.isUnrecoverableError((Throwable)new Exception((Throwable)new SuppressRestartsException((Throwable)new Exception()))));
    }

    @Test
    public void testGlobalFailureHandling() {
        FailureHandlingResult result = this.executionFailureHandler.getGlobalFailureHandlingResult((Throwable)new Exception("test failure"));
        Assert.assertEquals(IterableUtils.toStream((Iterable)this.schedulingTopology.getVertices()).map(Vertex::getId).collect(Collectors.toSet()), (Object)result.getVerticesToRestart());
    }

    private static class TestFailoverStrategy
    implements FailoverStrategy {
        private Set<ExecutionVertexID> tasksToRestart;

        public void setTasksToRestart(Set<ExecutionVertexID> tasksToRestart) {
            this.tasksToRestart = tasksToRestart;
        }

        public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause) {
            return this.tasksToRestart;
        }
    }
}

