/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CheckpointCoordinator {
    static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
    private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
    protected final Object lock = new Object();
    private final JobID job;
    private final ExecutionVertex[] tasksToTrigger;
    private final ExecutionVertex[] tasksToWaitFor;
    private final ExecutionVertex[] tasksToCommitTo;
    private final Map<Long, PendingCheckpoint> pendingCheckpoints;
    private final CompletedCheckpointStore completedCheckpointStore;
    private final ArrayDeque<Long> recentPendingCheckpoints;
    protected final CheckpointIDCounter checkpointIdCounter;
    private final ClassLoader userClassLoader;
    private final long baseInterval;
    private final long checkpointTimeout;
    private final long minPauseBetweenCheckpointsNanos;
    private final int maxConcurrentCheckpointAttempts;
    private final Timer timer;
    private ActorGateway jobStatusListener;
    private int numUnsuccessfulCheckpointsTriggers;
    private ScheduledTrigger currentPeriodicTrigger;
    private long lastCheckpointCompletionNanos;
    private boolean periodicScheduling;
    private boolean triggerRequestQueued;
    private volatile boolean shutdown;
    private final Thread shutdownHook;
    private final CheckpointStatsTracker statsTracker;
    protected final int numberKeyGroups;
    private final Executor executor;

    public CheckpointCoordinator(JobID job, long baseInterval, long checkpointTimeout, int numberKeyGroups, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, ExecutionVertex[] tasksToCommitTo, ClassLoader userClassLoader, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, RecoveryMode recoveryMode, Executor executor) {
        this(job, baseInterval, checkpointTimeout, 0L, Integer.MAX_VALUE, numberKeyGroups, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, userClassLoader, checkpointIDCounter, completedCheckpointStore, recoveryMode, new DisabledCheckpointStatsTracker(), executor);
    }

    public CheckpointCoordinator(JobID job, long baseInterval, long checkpointTimeout, long minPauseBetweenCheckpoints, int maxConcurrentCheckpointAttempts, int numberKeyGroups, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, ExecutionVertex[] tasksToCommitTo, ClassLoader userClassLoader, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, RecoveryMode recoveryMode, CheckpointStatsTracker statsTracker, Executor executor) {
        Preconditions.checkArgument((baseInterval > 0L ? 1 : 0) != 0, (Object)"Checkpoint timeout must be larger than zero");
        Preconditions.checkArgument((checkpointTimeout >= 1L ? 1 : 0) != 0, (Object)"Checkpoint timeout must be larger than zero");
        Preconditions.checkArgument((minPauseBetweenCheckpoints >= 0L ? 1 : 0) != 0, (Object)"minPauseBetweenCheckpoints must be >= 0");
        Preconditions.checkArgument((maxConcurrentCheckpointAttempts >= 1 ? 1 : 0) != 0, (Object)"maxConcurrentCheckpointAttempts must be >= 1");
        if (minPauseBetweenCheckpoints > 31536000000L) {
            minPauseBetweenCheckpoints = 31536000000L;
            LOG.warn("Reducing minimum pause between checkpoints to " + minPauseBetweenCheckpoints + " ms (1 year)");
        }
        this.job = (JobID)Preconditions.checkNotNull((Object)job);
        this.baseInterval = baseInterval;
        this.checkpointTimeout = checkpointTimeout;
        this.minPauseBetweenCheckpointsNanos = minPauseBetweenCheckpoints * 1000000L;
        this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts;
        this.tasksToTrigger = (ExecutionVertex[])Preconditions.checkNotNull((Object)tasksToTrigger);
        this.tasksToWaitFor = (ExecutionVertex[])Preconditions.checkNotNull((Object)tasksToWaitFor);
        this.tasksToCommitTo = (ExecutionVertex[])Preconditions.checkNotNull((Object)tasksToCommitTo);
        this.pendingCheckpoints = new LinkedHashMap<Long, PendingCheckpoint>();
        this.completedCheckpointStore = (CompletedCheckpointStore)Preconditions.checkNotNull((Object)completedCheckpointStore);
        this.recentPendingCheckpoints = new ArrayDeque(16);
        this.userClassLoader = userClassLoader;
        this.checkpointIdCounter = (CheckpointIDCounter)Preconditions.checkNotNull((Object)checkpointIDCounter);
        this.timer = new Timer("Checkpoint Timer", true);
        this.statsTracker = (CheckpointStatsTracker)Preconditions.checkNotNull((Object)statsTracker);
        if (recoveryMode == RecoveryMode.STANDALONE) {
            this.shutdownHook = new Thread(new Runnable(){

                @Override
                public void run() {
                    try {
                        CheckpointCoordinator.this.shutdown();
                    }
                    catch (Throwable t) {
                        LOG.error("Error during shutdown of checkpoint coordinator via JVM shutdown hook: " + t.getMessage(), t);
                    }
                }
            });
            try {
                Runtime.getRuntime().addShutdownHook(this.shutdownHook);
            }
            catch (IllegalStateException illegalStateException) {
            }
            catch (Throwable t) {
                LOG.error("Cannot register checkpoint coordinator shutdown hook.", t);
            }
        } else {
            this.shutdownHook = null;
        }
        this.numberKeyGroups = numberKeyGroups;
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
    }

    protected void onShutdown() {
    }

    protected void onCancelCheckpoint(long canceledCheckpointId) {
    }

    protected void onFullyAcknowledgedCheckpoint(CompletedCheckpoint checkpoint) {
    }

    public void shutdown() throws Exception {
        this.shutdown(true);
    }

    public void suspend() throws Exception {
        this.shutdown(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shutdown(boolean shutdownStoreAndCounter) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            try {
                if (!this.shutdown) {
                    this.shutdown = true;
                    LOG.info("Stopping checkpoint coordinator for job " + this.job);
                    this.periodicScheduling = false;
                    this.triggerRequestQueued = false;
                    this.timer.cancel();
                    if (this.jobStatusListener != null) {
                        this.jobStatusListener.tell(PoisonPill.getInstance());
                        this.jobStatusListener = null;
                    }
                    for (PendingCheckpoint pending : this.pendingCheckpoints.values()) {
                        pending.discard(this.userClassLoader);
                    }
                    this.pendingCheckpoints.clear();
                    if (shutdownStoreAndCounter) {
                        this.completedCheckpointStore.shutdown();
                        this.checkpointIdCounter.shutdown();
                    } else {
                        this.completedCheckpointStore.suspend();
                        this.checkpointIdCounter.suspend();
                    }
                    this.onShutdown();
                }
            }
            finally {
                if (this.shutdownHook != null && this.shutdownHook != Thread.currentThread()) {
                    try {
                        Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
                    }
                    catch (IllegalStateException illegalStateException) {
                    }
                    catch (Throwable t) {
                        LOG.warn("Error unregistering checkpoint coordinator shutdown hook.", t);
                    }
                }
            }
        }
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    public boolean triggerCheckpoint(long timestamp) throws Exception {
        return this.triggerCheckpoint(timestamp, -1L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean triggerCheckpoint(long timestamp, long nextCheckpointId) {
        long checkpointID;
        Object ee;
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                return false;
            }
            if (this.triggerRequestQueued) {
                LOG.warn("Trying to trigger another checkpoint while one was queued already");
                return false;
            }
            if (this.pendingCheckpoints.size() >= this.maxConcurrentCheckpointAttempts) {
                this.triggerRequestQueued = true;
                if (this.currentPeriodicTrigger != null) {
                    this.currentPeriodicTrigger.cancel();
                    this.currentPeriodicTrigger = null;
                }
                return false;
            }
            long earliestNext = this.lastCheckpointCompletionNanos + this.minPauseBetweenCheckpointsNanos;
            long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1000000L;
            if (durationTillNextMillis > 0L && this.baseInterval != Long.MAX_VALUE) {
                if (this.currentPeriodicTrigger != null) {
                    this.currentPeriodicTrigger.cancel();
                }
                this.currentPeriodicTrigger = new ScheduledTrigger();
                this.timer.scheduleAtFixedRate((TimerTask)this.currentPeriodicTrigger, durationTillNextMillis, this.baseInterval);
                return false;
            }
        }
        ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[this.tasksToTrigger.length];
        for (int i = 0; i < this.tasksToTrigger.length; ++i) {
            Execution ee2 = this.tasksToTrigger[i].getCurrentExecutionAttempt();
            if (ee2 == null || ee2.getState() != ExecutionState.RUNNING) {
                LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.", (Object)this.tasksToTrigger[i].getSimpleName());
                return false;
            }
            triggerIDs[i] = ee2.getAttemptId();
        }
        HashMap<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<ExecutionAttemptID, ExecutionVertex>(this.tasksToWaitFor.length);
        for (ExecutionVertex ev : this.tasksToWaitFor) {
            ee = ev.getCurrentExecutionAttempt();
            if (ee == null) {
                LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.", (Object)ev.getSimpleName());
                return false;
            }
            ackTasks.put(((Execution)ee).getAttemptId(), ev);
        }
        if (nextCheckpointId < 0L) {
            try {
                checkpointID = this.checkpointIdCounter.getAndIncrement();
            }
            catch (Throwable t) {
                int numUnsuccessful = ++this.numUnsuccessfulCheckpointsTriggers;
                LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
                return false;
            }
        } else {
            checkpointID = nextCheckpointId;
        }
        LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
        final PendingCheckpoint checkpoint = new PendingCheckpoint(this.job, checkpointID, timestamp, ackTasks, this.executor);
        TimerTask canceller = new TimerTask(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Object object = CheckpointCoordinator.this.lock;
                synchronized (object) {
                    if (!checkpoint.isDiscarded()) {
                        LOG.info("Checkpoint " + checkpointID + " expired before completing.");
                        checkpoint.discard(CheckpointCoordinator.this.userClassLoader);
                        CheckpointCoordinator.this.pendingCheckpoints.remove(checkpointID);
                        CheckpointCoordinator.this.rememberRecentCheckpointId(checkpointID);
                        CheckpointCoordinator.this.onCancelCheckpoint(checkpointID);
                        CheckpointCoordinator.this.triggerQueuedRequests();
                    }
                }
            }
        };
        try {
            ee = this.lock;
            synchronized (ee) {
                if (this.shutdown) {
                    return false;
                }
                if (this.triggerRequestQueued) {
                    LOG.warn("Trying to trigger another checkpoint while one was queued already");
                    return false;
                }
                if (this.pendingCheckpoints.size() >= this.maxConcurrentCheckpointAttempts) {
                    this.triggerRequestQueued = true;
                    if (this.currentPeriodicTrigger != null) {
                        this.currentPeriodicTrigger.cancel();
                        this.currentPeriodicTrigger = null;
                    }
                    return false;
                }
                long earliestNext = this.lastCheckpointCompletionNanos + this.minPauseBetweenCheckpointsNanos;
                long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1000000L;
                if (durationTillNextMillis > 0L && this.baseInterval != Long.MAX_VALUE) {
                    if (this.currentPeriodicTrigger != null) {
                        this.currentPeriodicTrigger.cancel();
                    }
                    this.currentPeriodicTrigger = new ScheduledTrigger();
                    this.timer.scheduleAtFixedRate((TimerTask)this.currentPeriodicTrigger, durationTillNextMillis, this.baseInterval);
                    return false;
                }
                this.pendingCheckpoints.put(checkpointID, checkpoint);
                this.timer.schedule(canceller, this.checkpointTimeout);
            }
            for (int i = 0; i < this.tasksToTrigger.length; ++i) {
                ExecutionAttemptID id = triggerIDs[i];
                TriggerCheckpoint message = new TriggerCheckpoint(this.job, id, checkpointID, timestamp);
                this.tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
            }
            this.numUnsuccessfulCheckpointsTriggers = 0;
            return true;
        }
        catch (Throwable t) {
            Object id = this.lock;
            synchronized (id) {
                this.pendingCheckpoints.remove(checkpointID);
            }
            int numUnsuccessful = ++this.numUnsuccessfulCheckpointsTriggers;
            LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
            if (!checkpoint.isDiscarded()) {
                checkpoint.discard(this.userClassLoader);
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean receiveDeclineMessage(DeclineCheckpoint message) {
        boolean isPendingCheckpoint;
        if (this.shutdown || message == null) {
            return false;
        }
        if (!this.job.equals((Object)message.getJob())) {
            LOG.error("Received DeclineCheckpoint message for wrong job: {}", (Object)message);
            return false;
        }
        long checkpointId = message.getCheckpointId();
        String reason = message.getReason() != null ? message.getReason().getMessage() : "";
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                return false;
            }
            PendingCheckpoint checkpoint = this.pendingCheckpoints.get(checkpointId);
            if (checkpoint != null && !checkpoint.isDiscarded()) {
                isPendingCheckpoint = true;
                LOG.info("Discarding checkpoint {} because of checkpoint decline from task {} : {}", new Object[]{checkpointId, message.getTaskExecutionId(), reason});
                this.pendingCheckpoints.remove(checkpointId);
                checkpoint.discard(this.userClassLoader);
                this.rememberRecentCheckpointId(checkpointId);
                this.onCancelCheckpoint(checkpointId);
                boolean haveMoreRecentPending = false;
                for (PendingCheckpoint p : this.pendingCheckpoints.values()) {
                    if (p.isDiscarded() || p.getCheckpointId() < checkpoint.getCheckpointId()) continue;
                    haveMoreRecentPending = true;
                    break;
                }
                if (!haveMoreRecentPending) {
                    this.triggerQueuedRequests();
                }
            } else {
                if (checkpoint != null) {
                    throw new IllegalStateException("Received message for discarded but non-removed checkpoint " + checkpointId);
                }
                if (this.recentPendingCheckpoints.contains(checkpointId)) {
                    isPendingCheckpoint = true;
                    LOG.info("Received another decline checkpoint message for now expired checkpoint attempt " + checkpointId);
                } else {
                    isPendingCheckpoint = false;
                }
            }
        }
        return isPendingCheckpoint;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
        if (this.shutdown || message == null) {
            return false;
        }
        if (!this.job.equals((Object)message.getJob())) {
            LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", (Object)this.job, (Object)message);
            return false;
        }
        long checkpointId = message.getCheckpointId();
        Object object = this.lock;
        synchronized (object) {
            boolean wasPendingCheckpoint;
            if (this.shutdown) {
                return false;
            }
            PendingCheckpoint checkpoint = this.pendingCheckpoints.get(checkpointId);
            if (checkpoint != null && !checkpoint.isDiscarded()) {
                switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize(), null)) {
                    case SUCCESS: {
                        if (!checkpoint.isFullyAcknowledged()) break;
                        this.completePendingCheckpoint(checkpoint);
                        break;
                    }
                    case DUPLICATE: {
                        LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.", new Object[]{message.getCheckpointId(), message.getTaskExecutionId(), message.getJob()});
                        break;
                    }
                    case UNKNOWN: {
                        LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, because the task's execution attempt id was unknown. Discarding the state handle to avoid lingering state.", new Object[]{message.getCheckpointId(), message.getTaskExecutionId(), message.getJob()});
                        this.discardState(message.getJob(), message.getTaskExecutionId(), checkpointId, message.getState());
                        break;
                    }
                    case DISCARDED: {
                        LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, because the pending checkpoint had been discarded. Discarding the state handle tp avoid lingering state.", new Object[]{message.getCheckpointId(), message.getTaskExecutionId(), message.getJob()});
                        this.discardState(message.getJob(), message.getTaskExecutionId(), checkpointId, message.getState());
                    }
                }
                return true;
            }
            if (checkpoint != null) {
                throw new IllegalStateException("Received message for discarded but non-removed checkpoint " + checkpointId);
            }
            if (this.recentPendingCheckpoints.contains(checkpointId)) {
                wasPendingCheckpoint = true;
                LOG.warn("Received late message for now expired checkpoint attempt {} from task {} and job {}.", new Object[]{checkpointId, message.getTaskExecutionId(), message.getJob()});
                this.discardState(message.getJob(), message.getTaskExecutionId(), checkpointId, message.getState());
            } else {
                LOG.debug("Received message for an unknown checkpoint {} from task {} and job {}.", new Object[]{checkpointId, message.getTaskExecutionId(), message.getState()});
                wasPendingCheckpoint = false;
            }
            return wasPendingCheckpoint;
        }
    }

    private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
        assert (Thread.holdsLock(this.lock));
        long checkpointId = pendingCheckpoint.getCheckpointId();
        CompletedCheckpoint completedCheckpoint = null;
        try {
            completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
            this.completedCheckpointStore.addCheckpoint(completedCheckpoint);
            this.rememberRecentCheckpointId(checkpointId);
            this.dropSubsumedCheckpoints(completedCheckpoint.getTimestamp());
            this.onFullyAcknowledgedCheckpoint(completedCheckpoint);
        }
        catch (Exception exception) {
            if (!pendingCheckpoint.isDiscarded()) {
                pendingCheckpoint.discard(this.userClassLoader);
            }
            if (completedCheckpoint != null) {
                final CompletedCheckpoint cc = completedCheckpoint;
                this.executor.execute(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            cc.discard(CheckpointCoordinator.this.userClassLoader);
                        }
                        catch (Exception nestedException) {
                            LOG.warn("Could not properly discard completed checkpoint {}.", (Object)cc.getCheckpointID(), (Object)nestedException);
                        }
                    }
                });
            }
            throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);
        }
        finally {
            this.pendingCheckpoints.remove(checkpointId);
            this.triggerQueuedRequests();
        }
        this.lastCheckpointCompletionNanos = System.nanoTime();
        LOG.info("Completed checkpoint {} (in {} ms).", (Object)checkpointId, (Object)completedCheckpoint.getDuration());
        if (LOG.isDebugEnabled()) {
            StringBuilder builder = new StringBuilder();
            builder.append("Checkpoint state: ");
            for (TaskState state : completedCheckpoint.getTaskStates().values()) {
                builder.append(state);
                builder.append(", ");
            }
            builder.delete(builder.length() - 2, builder.length());
            LOG.debug(builder.toString());
        }
        long timestamp = completedCheckpoint.getTimestamp();
        for (ExecutionVertex ev : this.tasksToCommitTo) {
            Execution ee = ev.getCurrentExecutionAttempt();
            if (ee == null) continue;
            ExecutionAttemptID attemptId = ee.getAttemptId();
            NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(this.job, attemptId, checkpointId, timestamp);
            ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
        }
        this.statsTracker.onCompletedCheckpoint(completedCheckpoint);
    }

    private void rememberRecentCheckpointId(long id) {
        if (this.recentPendingCheckpoints.size() >= 16) {
            this.recentPendingCheckpoints.removeFirst();
        }
        this.recentPendingCheckpoints.addLast(id);
    }

    private void dropSubsumedCheckpoints(long timestamp) {
        Iterator<Map.Entry<Long, PendingCheckpoint>> entries = this.pendingCheckpoints.entrySet().iterator();
        while (entries.hasNext()) {
            PendingCheckpoint p = entries.next().getValue();
            if (p.getCheckpointTimestamp() >= timestamp) continue;
            this.rememberRecentCheckpointId(p.getCheckpointId());
            p.discard(this.userClassLoader);
            this.onCancelCheckpoint(p.getCheckpointId());
            entries.remove();
        }
    }

    private void triggerQueuedRequests() {
        assert (Thread.holdsLock(this.lock));
        if (this.triggerRequestQueued) {
            this.triggerRequestQueued = false;
            ScheduledTrigger trigger = new ScheduledTrigger();
            if (this.periodicScheduling) {
                if (this.currentPeriodicTrigger != null) {
                    this.currentPeriodicTrigger.cancel();
                }
                this.currentPeriodicTrigger = trigger;
                this.timer.scheduleAtFixedRate((TimerTask)trigger, 0L, this.baseInterval);
            } else {
                this.timer.schedule((TimerTask)trigger, 0L);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean restoreLatestCheckpointedState(Map<JobVertexID, ExecutionJobVertex> tasks, boolean errorIfNoCheckpoint, boolean allOrNothingState) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                throw new IllegalStateException("CheckpointCoordinator is shut down");
            }
            this.completedCheckpointStore.recover();
            CompletedCheckpoint latest = this.completedCheckpointStore.getLatestCheckpoint();
            if (latest == null) {
                if (errorIfNoCheckpoint) {
                    throw new IllegalStateException("No completed checkpoint available");
                }
                return false;
            }
            for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : latest.getTaskStates().entrySet()) {
                TaskState taskState = taskGroupStateEntry.getValue();
                ExecutionJobVertex executionJobVertex = tasks.get((Object)taskGroupStateEntry.getKey());
                if (executionJobVertex != null) {
                    if (taskState.getParallelism() != executionJobVertex.getParallelism()) {
                        throw new RuntimeException("Cannot restore the latest checkpoint because the parallelism changed. The operator" + (Object)((Object)executionJobVertex.getJobVertexId()) + " has parallelism " + executionJobVertex.getParallelism() + " whereas the correspondingstate object has a parallelism of " + taskState.getParallelism());
                    }
                    int counter = 0;
                    List<Set<Integer>> keyGroupPartitions = this.createKeyGroupPartitions(this.numberKeyGroups, executionJobVertex.getParallelism());
                    for (int i = 0; i < executionJobVertex.getParallelism(); ++i) {
                        SubtaskState subtaskState = taskState.getState(i);
                        SerializedValue<StateHandle<?>> state = null;
                        if (subtaskState != null) {
                            ++counter;
                            state = subtaskState.getState();
                        }
                        Map<Integer, SerializedValue<StateHandle<?>>> kvStateForTaskMap = taskState.getUnwrappedKvStates(keyGroupPartitions.get(i));
                        Execution currentExecutionAttempt = executionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt();
                        currentExecutionAttempt.setInitialState(state, kvStateForTaskMap);
                    }
                    if (!allOrNothingState || counter <= 0 || counter >= executionJobVertex.getParallelism()) continue;
                    throw new IllegalStateException("The checkpoint contained state only for a subset of tasks for vertex " + executionJobVertex);
                }
                throw new IllegalStateException("There is no execution job vertex for the job vertex ID " + (Object)((Object)taskGroupStateEntry.getKey()));
            }
            return true;
        }
    }

    protected List<Set<Integer>> createKeyGroupPartitions(int numberKeyGroups, int parallelism) {
        ArrayList<Set<Integer>> result = new ArrayList<Set<Integer>>(parallelism);
        for (int p = 0; p < parallelism; ++p) {
            HashSet<Integer> keyGroupPartition = new HashSet<Integer>();
            for (int k = p; k < numberKeyGroups; k += parallelism) {
                keyGroupPartition.add(k);
            }
            result.add(keyGroupPartition);
        }
        return result;
    }

    public int getNumberOfPendingCheckpoints() {
        return this.pendingCheckpoints.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumberOfRetainedSuccessfulCheckpoints() {
        Object object = this.lock;
        synchronized (object) {
            return this.completedCheckpointStore.getNumberOfRetainedCheckpoints();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<Long, PendingCheckpoint> getPendingCheckpoints() {
        Object object = this.lock;
        synchronized (object) {
            return new HashMap<Long, PendingCheckpoint>(this.pendingCheckpoints);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            return this.completedCheckpointStore.getAllCheckpoints();
        }
    }

    protected long getAndIncrementCheckpointId() {
        try {
            return this.checkpointIdCounter.getAndIncrement();
        }
        catch (Throwable t) {
            int numUnsuccessful = ++this.numUnsuccessfulCheckpointsTriggers;
            LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
            return -1L;
        }
    }

    protected ActorGateway getJobStatusListener() {
        return this.jobStatusListener;
    }

    protected void setJobStatusListener(ActorGateway jobStatusListener) {
        this.jobStatusListener = jobStatusListener;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startCheckpointScheduler() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            this.stopCheckpointScheduler();
            try {
                this.checkpointIdCounter.start();
            }
            catch (Exception e) {
                String msg = "Failed to start checkpoint ID counter: " + e.getMessage();
                throw new RuntimeException(msg, e);
            }
            this.periodicScheduling = true;
            this.currentPeriodicTrigger = new ScheduledTrigger();
            this.timer.scheduleAtFixedRate((TimerTask)this.currentPeriodicTrigger, this.baseInterval, this.baseInterval);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopCheckpointScheduler() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            this.triggerRequestQueued = false;
            this.periodicScheduling = false;
            if (this.currentPeriodicTrigger != null) {
                this.currentPeriodicTrigger.cancel();
                this.currentPeriodicTrigger = null;
            }
            for (PendingCheckpoint p : this.pendingCheckpoints.values()) {
                p.discard(this.userClassLoader);
            }
            this.pendingCheckpoints.clear();
            this.numUnsuccessfulCheckpointsTriggers = 0;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) {
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            if (this.jobStatusListener == null) {
                Props props = Props.create(CheckpointCoordinatorDeActivator.class, (Object[])new Object[]{this, leaderSessionID});
                this.jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID);
            }
            return this.jobStatusListener;
        }
    }

    private void discardState(final JobID jobId, final ExecutionAttemptID executionAttemptID, final long checkpointId, final SerializedValue<StateHandle<?>> stateObject) {
        if (stateObject != null) {
            this.executor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        ((StateHandle)stateObject.deserializeValue(CheckpointCoordinator.this.userClassLoader)).discardState();
                    }
                    catch (Exception e) {
                        LOG.warn("Could not properly discard state object for checkpoint {} belonging to task {} of job {}.", new Object[]{checkpointId, executionAttemptID, jobId, e});
                    }
                }
            });
        }
    }

    private class ScheduledTrigger
    extends TimerTask {
        private ScheduledTrigger() {
        }

        @Override
        public void run() {
            try {
                CheckpointCoordinator.this.triggerCheckpoint(System.currentTimeMillis());
            }
            catch (Exception e) {
                LOG.error("Exception while triggering checkpoint", (Throwable)e);
            }
        }
    }
}

