/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.taskexecutor.BackPressureSampleService;
import org.apache.flink.runtime.taskexecutor.BackPressureSampleableTask;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class BackPressureSampleServiceTest
extends TestLogger {
    private static ScheduledExecutorService scheduledExecutorService;
    private static BackPressureSampleService backPressureSampleService;

    @BeforeClass
    public static void setUp() throws Exception {
        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        ScheduledExecutorServiceAdapter scheduledExecutor = new ScheduledExecutorServiceAdapter(scheduledExecutorService);
        backPressureSampleService = new BackPressureSampleService(10, Time.milliseconds((long)10L), (ScheduledExecutor)scheduledExecutor);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (scheduledExecutorService != null) {
            scheduledExecutorService.shutdown();
        }
    }

    @Test(timeout=10000L)
    public void testSampleTaskBackPressure() throws Exception {
        double backPressureRatio = (Double)backPressureSampleService.sampleTaskBackPressure((BackPressureSampleableTask)new TestTask()).get();
        Assert.assertEquals((double)0.5, (double)backPressureRatio, (double)0.0);
    }

    @Test(timeout=10000L)
    public void testTaskStopsWithPartialSampling() throws Exception {
        double backPressureRatio = (Double)backPressureSampleService.sampleTaskBackPressure((BackPressureSampleableTask)new NotRunningAfterFirstSamplingTask()).get();
        Assert.assertEquals((double)1.0, (double)backPressureRatio, (double)0.0);
    }

    @Test(expected=IllegalStateException.class)
    public void testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling() {
        backPressureSampleService.sampleTaskBackPressure((BackPressureSampleableTask)new NeverRunningTask());
        Assert.fail((String)"Exception expected.");
    }

    private static class NeverRunningTask
    extends TestTask {
        private NeverRunningTask() {
        }

        @Override
        public boolean isRunning() {
            return false;
        }
    }

    private static class NotRunningAfterFirstSamplingTask
    extends TestTask {
        private NotRunningAfterFirstSamplingTask() {
        }

        @Override
        public boolean isRunning() {
            return this.counter == 0L;
        }
    }

    private static class TestTask
    implements BackPressureSampleableTask {
        protected volatile long counter = 0L;

        private TestTask() {
        }

        public boolean isRunning() {
            return true;
        }

        public boolean isBackPressured() {
            return this.counter++ % 2L == 0L;
        }
    }
}

