/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.time.Duration;
import java.util.Comparator;
import java.util.HashSet;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;

public class TestProcessingTimeService
implements TimerService {
    private volatile boolean isTerminated;
    private volatile boolean isQuiesced;
    private final PriorityQueue<Tuple2<Long, CallbackTask>> priorityQueue;
    private final ManualMSClock clock = new ManualMSClock(Long.MIN_VALUE);

    public TestProcessingTimeService() {
        this.priorityQueue = new PriorityQueue<Tuple2<Long, CallbackTask>>(16, new Comparator<Tuple2<Long, CallbackTask>>(){

            @Override
            public int compare(Tuple2<Long, CallbackTask> o1, Tuple2<Long, CallbackTask> o2) {
                return Long.compare((Long)o1.f0, (Long)o2.f0);
            }
        });
    }

    public void advance(long delta) throws Exception {
        this.clock.advanceTime(Duration.ofMillis(delta));
        this.maybeFireTimers();
    }

    public void setCurrentTime(long timestamp) throws Exception {
        this.clock.setCurrentTime(timestamp, TimeUnit.MILLISECONDS);
        this.maybeFireTimers();
    }

    private void maybeFireTimers() throws Exception {
        if (!this.isQuiesced) {
            while (!this.priorityQueue.isEmpty() && this.getCurrentProcessingTime() >= (Long)this.priorityQueue.peek().f0) {
                Tuple2<Long, CallbackTask> entry = this.priorityQueue.poll();
                CallbackTask callbackTask = (CallbackTask)entry.f1;
                if (callbackTask.isDone()) continue;
                callbackTask.onProcessingTime((Long)entry.f0);
                if (!(callbackTask instanceof PeriodicCallbackTask)) continue;
                this.priorityQueue.offer((Tuple2<Long, CallbackTask>)Tuple2.of((Object)((PeriodicCallbackTask)callbackTask).nextTimestamp((Long)entry.f0), (Object)callbackTask));
            }
        }
    }

    public Clock getClock() {
        return this.clock;
    }

    public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeService.ProcessingTimeCallback target) {
        if (this.isTerminated) {
            throw new IllegalStateException("terminated");
        }
        if (this.isQuiesced) {
            return new CallbackTask(null);
        }
        CallbackTask callbackTask = new CallbackTask(target);
        this.priorityQueue.offer((Tuple2<Long, CallbackTask>)Tuple2.of((Object)timestamp, (Object)callbackTask));
        return callbackTask;
    }

    public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeService.ProcessingTimeCallback callback, long initialDelay, long period) {
        if (this.isTerminated) {
            throw new IllegalStateException("terminated");
        }
        if (this.isQuiesced) {
            return new CallbackTask(null);
        }
        PeriodicCallbackTask periodicCallbackTask = new PeriodicCallbackTask(callback, period);
        this.priorityQueue.offer((Tuple2<Long, CallbackTask>)Tuple2.of((Object)(this.getCurrentProcessingTime() + initialDelay), (Object)periodicCallbackTask));
        return periodicCallbackTask;
    }

    public ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeService.ProcessingTimeCallback callback, long initialDelay, long period) {
        return this.scheduleAtFixedRate(callback, initialDelay, period);
    }

    public boolean isTerminated() {
        return this.isTerminated;
    }

    public CompletableFuture<Void> quiesce() {
        if (!this.isTerminated) {
            this.isQuiesced = true;
            this.priorityQueue.clear();
        }
        return CompletableFuture.completedFuture(null);
    }

    public void shutdownService() {
        this.isTerminated = true;
    }

    public boolean shutdownServiceUninterruptible(long timeoutMs) {
        this.shutdownService();
        return true;
    }

    public int getNumActiveTimers() {
        int count = 0;
        for (Tuple2<Long, CallbackTask> entry : this.priorityQueue) {
            if (((CallbackTask)entry.f1).isDone()) continue;
            ++count;
        }
        return count;
    }

    public Set<Long> getActiveTimerTimestamps() {
        HashSet<Long> actualTimestamps = new HashSet<Long>();
        for (Tuple2<Long, CallbackTask> entry : this.priorityQueue) {
            if (((CallbackTask)entry.f1).isDone()) continue;
            actualTimestamps.add((Long)entry.f0);
        }
        return actualTimestamps;
    }

    private static class ManualMSClock
    extends Clock {
        private final AtomicLong currentTime;

        public ManualMSClock(long startTime) {
            this.currentTime = new AtomicLong(startTime);
        }

        public long absoluteTimeMillis() {
            return this.currentTime.get();
        }

        public long relativeTimeMillis() {
            return this.currentTime.get();
        }

        public long relativeTimeNanos() {
            return this.currentTime.get() * 1000000L;
        }

        public void advanceTime(Duration duration) {
            this.currentTime.addAndGet(duration.toMillis());
        }

        public void setCurrentTime(long time, TimeUnit timeUnit) {
            this.currentTime.set(timeUnit.toMillis(time));
        }
    }

    private static class PeriodicCallbackTask
    extends CallbackTask {
        private final long period;

        private PeriodicCallbackTask(ProcessingTimeService.ProcessingTimeCallback processingTimeCallback, long period) {
            super(processingTimeCallback);
            Preconditions.checkArgument((period > 0L ? 1 : 0) != 0, (Object)"The period must be greater than 0.");
            this.period = period;
        }

        @Override
        public void onProcessingTime(long timestamp) throws Exception {
            this.processingTimeCallback.onProcessingTime(timestamp);
        }

        public long nextTimestamp(long currentTimestamp) {
            return currentTimestamp + this.period;
        }
    }

    private static class CallbackTask
    implements ScheduledFuture<Object> {
        protected final ProcessingTimeService.ProcessingTimeCallback processingTimeCallback;
        private AtomicReference<CallbackTaskState> state = new AtomicReference<CallbackTaskState>(CallbackTaskState.CREATED);

        private CallbackTask(ProcessingTimeService.ProcessingTimeCallback processingTimeCallback) {
            this.processingTimeCallback = processingTimeCallback;
        }

        public void onProcessingTime(long timestamp) throws Exception {
            this.processingTimeCallback.onProcessingTime(timestamp);
            this.state.compareAndSet(CallbackTaskState.CREATED, CallbackTaskState.DONE);
        }

        @Override
        public long getDelay(TimeUnit unit) {
            throw new UnsupportedOperationException();
        }

        @Override
        public int compareTo(Delayed o) {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return this.state.compareAndSet(CallbackTaskState.CREATED, CallbackTaskState.CANCELLED);
        }

        @Override
        public boolean isCancelled() {
            return this.state.get() == CallbackTaskState.CANCELLED;
        }

        @Override
        public boolean isDone() {
            return this.state.get() != CallbackTaskState.CREATED;
        }

        @Override
        public Object get() throws InterruptedException, ExecutionException {
            throw new UnsupportedOperationException();
        }

        @Override
        public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            throw new UnsupportedOperationException();
        }

        static enum CallbackTaskState {
            CREATED,
            CANCELLED,
            DONE;

        }
    }
}

