package org.apache.flink.streaming.runtime.tasks;

import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.class */
class SystemProcessingTimeServiceTest {
    SystemProcessingTimeServiceTest() {
    }

    @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
    @Test
    void testScheduleAtFixedRate() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        SystemProcessingTimeService createSystemProcessingTimeService = createSystemProcessingTimeService((AtomicReference<Throwable>) atomicReference);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        try {
            createSystemProcessingTimeService.scheduleAtFixedRate(j -> {
                countDownLatch.countDown();
            }, 0L, 10L);
            countDownLatch.await();
            Assertions.assertThat((Throwable) atomicReference.get()).isNull();
            createSystemProcessingTimeService.shutdownService();
        } catch (Throwable th) {
            createSystemProcessingTimeService.shutdownService();
            throw th;
        }
    }

    @Test
    void testQuiesceAndAwaitingCancelsScheduledAtFixRateFuture() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        SystemProcessingTimeService createSystemProcessingTimeService = createSystemProcessingTimeService((AtomicReference<Throwable>) atomicReference);
        try {
            ScheduledFuture scheduleAtFixedRate = createSystemProcessingTimeService.scheduleAtFixedRate(j -> {
            }, 0L, 10L);
            Assertions.assertThat(scheduleAtFixedRate).isNotDone();
            createSystemProcessingTimeService.quiesce().get();
            Objects.requireNonNull(scheduleAtFixedRate);
            Assertions.assertThatThrownBy(scheduleAtFixedRate::get).as("scheduled future is not cancelled", new Object[0]).isInstanceOf(CancellationException.class);
            Assertions.assertThat(createSystemProcessingTimeService.scheduleAtFixedRate(j2 -> {
                throw new Exception("Test exception.");
            }, 0L, 100L)).isNotNull();
            Assertions.assertThat(createSystemProcessingTimeService.getNumTasksScheduled()).isZero();
            Assertions.assertThat((Throwable) atomicReference.get()).isNull();
            createSystemProcessingTimeService.shutdownService();
        } catch (Throwable th) {
            createSystemProcessingTimeService.shutdownService();
            throw th;
        }
    }

    @Test
    void testImmediateShutdown() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        SystemProcessingTimeService createSystemProcessingTimeService = createSystemProcessingTimeService((CompletableFuture<Throwable>) completableFuture);
        try {
            Assertions.assertThat(createSystemProcessingTimeService.isTerminated()).isFalse();
            OneShotLatch oneShotLatch = new OneShotLatch();
            createSystemProcessingTimeService.registerTimer(System.currentTimeMillis(), j -> {
                oneShotLatch.trigger();
                Thread.sleep(100000000L);
            });
            oneShotLatch.await();
            createSystemProcessingTimeService.shutdownService();
            Assertions.assertThat(createSystemProcessingTimeService.isTerminated()).isTrue();
            Assertions.assertThat(createSystemProcessingTimeService.getNumTasksScheduled()).isZero();
            Assertions.assertThatThrownBy(() -> {
                createSystemProcessingTimeService.registerTimer(System.currentTimeMillis() + 1000, j2 -> {
                    Assertions.fail("should not be called");
                });
            }).isInstanceOf(IllegalStateException.class);
            Assertions.assertThatThrownBy(() -> {
                createSystemProcessingTimeService.scheduleAtFixedRate(j2 -> {
                    Assertions.fail("should not be called");
                }, 0L, 100L);
            }).isInstanceOf(IllegalStateException.class);
            Assertions.assertThat((Throwable) completableFuture.get(30L, TimeUnit.SECONDS)).isInstanceOf(InterruptedException.class);
            createSystemProcessingTimeService.shutdownService();
        } catch (Throwable th) {
            createSystemProcessingTimeService.shutdownService();
            throw th;
        }
    }

    @Test
    void testQuiescing() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        SystemProcessingTimeService createSystemProcessingTimeService = createSystemProcessingTimeService((AtomicReference<Throwable>) atomicReference);
        try {
            OneShotLatch oneShotLatch = new OneShotLatch();
            ReentrantLock reentrantLock = new ReentrantLock();
            createSystemProcessingTimeService.registerTimer(createSystemProcessingTimeService.getCurrentProcessingTime() + 20, j -> {
                reentrantLock.lock();
                try {
                    oneShotLatch.trigger();
                    Thread.sleep(5L);
                    reentrantLock.unlock();
                } catch (Throwable th) {
                    reentrantLock.unlock();
                    throw th;
                }
            });
            oneShotLatch.await();
            createSystemProcessingTimeService.quiesce().get();
            Assertions.assertThat(reentrantLock.tryLock()).isTrue();
            Assertions.assertThat(createSystemProcessingTimeService.registerTimer(createSystemProcessingTimeService.getCurrentProcessingTime() - 5, j2 -> {
                throw new Exception("test");
            })).isNotNull();
            Assertions.assertThat(createSystemProcessingTimeService.getNumTasksScheduled()).isZero();
            Assertions.assertThat((Throwable) atomicReference.get()).isNull();
            createSystemProcessingTimeService.shutdownService();
        } catch (Throwable th) {
            createSystemProcessingTimeService.shutdownService();
            throw th;
        }
    }

    @Test
    void testFutureCancellation() {
        AtomicReference atomicReference = new AtomicReference();
        SystemProcessingTimeService createSystemProcessingTimeService = createSystemProcessingTimeService((AtomicReference<Throwable>) atomicReference);
        try {
            Assertions.assertThat(createSystemProcessingTimeService.getNumTasksScheduled()).isZero();
            ScheduledFuture registerTimer = createSystemProcessingTimeService.registerTimer(System.currentTimeMillis() + 100000000, j -> {
            });
            Assertions.assertThat(createSystemProcessingTimeService.getNumTasksScheduled()).isOne();
            registerTimer.cancel(false);
            Assertions.assertThat(createSystemProcessingTimeService.getNumTasksScheduled()).isZero();
            ScheduledFuture scheduleAtFixedRate = createSystemProcessingTimeService.scheduleAtFixedRate(j2 -> {
            }, 10000000000L, 50L);
            Assertions.assertThat(createSystemProcessingTimeService.getNumTasksScheduled()).isOne();
            scheduleAtFixedRate.cancel(false);
            Assertions.assertThat(createSystemProcessingTimeService.getNumTasksScheduled()).isZero();
            Assertions.assertThat((Throwable) atomicReference.get()).isNull();
            createSystemProcessingTimeService.shutdownService();
        } catch (Throwable th) {
            createSystemProcessingTimeService.shutdownService();
            throw th;
        }
    }

    @Test
    void testShutdownAndWaitPending() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        SystemProcessingTimeService createBlockingSystemProcessingTimeService = createBlockingSystemProcessingTimeService(oneShotLatch, atomicBoolean);
        Assertions.assertThat(createBlockingSystemProcessingTimeService.isTerminated()).isFalse();
        Assertions.assertThat(createBlockingSystemProcessingTimeService.shutdownAndAwaitPending(1L, TimeUnit.SECONDS)).isFalse();
        oneShotLatch.trigger();
        Assertions.assertThat(createBlockingSystemProcessingTimeService.shutdownAndAwaitPending(60L, TimeUnit.SECONDS)).isTrue();
        Assertions.assertThat(atomicBoolean).isTrue();
        Assertions.assertThat(createBlockingSystemProcessingTimeService.isTerminated()).isTrue();
    }

    @Test
    void testShutdownServiceUninterruptible() {
        OneShotLatch oneShotLatch = new OneShotLatch();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        SystemProcessingTimeService createBlockingSystemProcessingTimeService = createBlockingSystemProcessingTimeService(oneShotLatch, atomicBoolean);
        Assertions.assertThat(createBlockingSystemProcessingTimeService.isTerminated()).isFalse();
        Thread currentThread = Thread.currentThread();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
        Thread thread = new Thread(() -> {
            while (atomicBoolean2.get()) {
                currentThread.interrupt();
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                }
            }
        });
        thread.start();
        long nanoTime = System.nanoTime();
        Assertions.assertThat(createBlockingSystemProcessingTimeService.isTerminated()).isFalse();
        Assertions.assertThat(createBlockingSystemProcessingTimeService.shutdownServiceUninterruptible(50L)).isFalse();
        Assertions.assertThat(createBlockingSystemProcessingTimeService.isTerminated()).isTrue();
        Assertions.assertThat(atomicBoolean).isFalse();
        Assertions.assertThat(System.nanoTime() - nanoTime).isGreaterThanOrEqualTo(50000000L);
        atomicBoolean2.set(false);
        do {
            try {
                thread.join();
            } catch (InterruptedException e) {
            }
        } while (thread.isAlive());
        Thread.interrupted();
        oneShotLatch.trigger();
        Assertions.assertThat(createBlockingSystemProcessingTimeService.shutdownServiceUninterruptible(50L)).isTrue();
        Assertions.assertThat(atomicBoolean).isTrue();
    }

    private static SystemProcessingTimeService createSystemProcessingTimeService(CompletableFuture<Throwable> completableFuture) {
        Preconditions.checkArgument(!completableFuture.isDone());
        Objects.requireNonNull(completableFuture);
        return new SystemProcessingTimeService((v1) -> {
            r2.complete(v1);
        });
    }

    private static SystemProcessingTimeService createSystemProcessingTimeService(AtomicReference<Throwable> atomicReference) {
        Preconditions.checkArgument(atomicReference.get() == null);
        return new SystemProcessingTimeService(exc -> {
            atomicReference.compareAndSet(null, exc);
        });
    }

    private static SystemProcessingTimeService createBlockingSystemProcessingTimeService(OneShotLatch oneShotLatch, AtomicBoolean atomicBoolean) {
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        Preconditions.checkState(!atomicBoolean.get());
        SystemProcessingTimeService systemProcessingTimeService = new SystemProcessingTimeService(exc -> {
        });
        systemProcessingTimeService.scheduleAtFixedRate(j -> {
            oneShotLatch2.trigger();
            boolean z = false;
            while (!z) {
                try {
                    oneShotLatch.await();
                    z = true;
                } catch (InterruptedException e) {
                }
            }
            atomicBoolean.set(true);
        }, 0L, 10L);
        try {
            oneShotLatch2.await();
        } catch (InterruptedException e) {
            Assertions.fail("Problem while starting up service.");
        }
        return systemProcessingTimeService;
    }
}
