package org.apache.flink.runtime.checkpoint;

import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinator.class */
public class CheckpointCoordinator {
    static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
    private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
    protected final Object lock;
    private final JobID job;
    private final ExecutionVertex[] tasksToTrigger;
    private final ExecutionVertex[] tasksToWaitFor;
    private final ExecutionVertex[] tasksToCommitTo;
    private final Map<Long, PendingCheckpoint> pendingCheckpoints;
    private final CompletedCheckpointStore completedCheckpointStore;
    private final ArrayDeque<Long> recentPendingCheckpoints;
    protected final CheckpointIDCounter checkpointIdCounter;
    private final ClassLoader userClassLoader;
    private final long baseInterval;
    private final long checkpointTimeout;
    private final long minPauseBetweenCheckpoints;
    private final int maxConcurrentCheckpointAttempts;
    private final Timer timer;
    private ActorGateway jobStatusListener;
    private int numUnsuccessfulCheckpointsTriggers;
    private ScheduledTrigger currentPeriodicTrigger;
    private boolean periodicScheduling;
    private boolean triggerRequestQueued;
    private volatile boolean shutdown;
    private final Thread shutdownHook;
    private final CheckpointStatsTracker statsTracker;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinator$ScheduledTrigger.class */
    public class ScheduledTrigger extends TimerTask {
        private ScheduledTrigger() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            try {
                CheckpointCoordinator.this.triggerCheckpoint(System.currentTimeMillis());
            } catch (Exception e) {
                CheckpointCoordinator.LOG.error("Exception while triggering checkpoint", (Throwable) e);
            }
        }
    }

    public CheckpointCoordinator(JobID jobID, long j, long j2, ExecutionVertex[] executionVertexArr, ExecutionVertex[] executionVertexArr2, ExecutionVertex[] executionVertexArr3, ClassLoader classLoader, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, RecoveryMode recoveryMode) throws Exception {
        this(jobID, j, j2, 0L, Integer.MAX_VALUE, executionVertexArr, executionVertexArr2, executionVertexArr3, classLoader, checkpointIDCounter, completedCheckpointStore, recoveryMode, new DisabledCheckpointStatsTracker());
    }

    public CheckpointCoordinator(JobID jobID, long j, long j2, long j3, int i, ExecutionVertex[] executionVertexArr, ExecutionVertex[] executionVertexArr2, ExecutionVertex[] executionVertexArr3, ClassLoader classLoader, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, RecoveryMode recoveryMode, CheckpointStatsTracker checkpointStatsTracker) throws Exception {
        this.lock = new Object();
        Preconditions.checkArgument(j > 0, "Checkpoint timeout must be larger than zero");
        Preconditions.checkArgument(j2 >= 1, "Checkpoint timeout must be larger than zero");
        Preconditions.checkArgument(j3 >= 0, "minPauseBetweenCheckpoints must be >= 0");
        Preconditions.checkArgument(i >= 1, "maxConcurrentCheckpointAttempts must be >= 1");
        this.job = (JobID) Preconditions.checkNotNull(jobID);
        this.baseInterval = j;
        this.checkpointTimeout = j2;
        this.minPauseBetweenCheckpoints = j3;
        this.maxConcurrentCheckpointAttempts = i;
        this.tasksToTrigger = (ExecutionVertex[]) Preconditions.checkNotNull(executionVertexArr);
        this.tasksToWaitFor = (ExecutionVertex[]) Preconditions.checkNotNull(executionVertexArr2);
        this.tasksToCommitTo = (ExecutionVertex[]) Preconditions.checkNotNull(executionVertexArr3);
        this.pendingCheckpoints = new LinkedHashMap();
        this.completedCheckpointStore = (CompletedCheckpointStore) Preconditions.checkNotNull(completedCheckpointStore);
        this.recentPendingCheckpoints = new ArrayDeque<>(16);
        this.userClassLoader = classLoader;
        this.checkpointIdCounter = (CheckpointIDCounter) Preconditions.checkNotNull(checkpointIDCounter);
        checkpointIDCounter.start();
        this.timer = new Timer("Checkpoint Timer", true);
        this.statsTracker = (CheckpointStatsTracker) Preconditions.checkNotNull(checkpointStatsTracker);
        if (recoveryMode != RecoveryMode.STANDALONE) {
            this.shutdownHook = null;
            return;
        }
        this.shutdownHook = new Thread(new Runnable() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinator.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    CheckpointCoordinator.this.shutdown();
                } catch (Throwable th) {
                    CheckpointCoordinator.LOG.error("Error during shutdown of checkpoint coordinator via JVM shutdown hook: " + th.getMessage(), th);
                }
            }
        });
        try {
            Runtime.getRuntime().addShutdownHook(this.shutdownHook);
        } catch (IllegalStateException e) {
        } catch (Throwable th) {
            LOG.error("Cannot register checkpoint coordinator shutdown hook.", th);
        }
    }

    protected void onShutdown() {
    }

    protected void onCancelCheckpoint(long j) {
    }

    protected void onFullyAcknowledgedCheckpoint(CompletedCheckpoint completedCheckpoint) {
    }

    public void shutdown() throws Exception {
        synchronized (this.lock) {
            try {
                if (!this.shutdown) {
                    this.shutdown = true;
                    LOG.info("Stopping checkpoint coordinator for job " + this.job);
                    this.periodicScheduling = false;
                    this.triggerRequestQueued = false;
                    this.timer.cancel();
                    if (this.jobStatusListener != null) {
                        this.jobStatusListener.tell(PoisonPill.getInstance());
                        this.jobStatusListener = null;
                    }
                    this.checkpointIdCounter.stop();
                    Iterator<PendingCheckpoint> it = this.pendingCheckpoints.values().iterator();
                    while (it.hasNext()) {
                        it.next().discard(this.userClassLoader);
                    }
                    this.pendingCheckpoints.clear();
                    this.completedCheckpointStore.discardAllCheckpoints();
                    onShutdown();
                }
                if (this.shutdownHook != null && this.shutdownHook != Thread.currentThread()) {
                    try {
                        Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
                    } catch (IllegalStateException e) {
                    } catch (Throwable th) {
                        LOG.warn("Error unregistering checkpoint coordinator shutdown hook.", th);
                    }
                }
            } finally {
            }
        }
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    public boolean triggerCheckpoint(long j) throws Exception {
        return triggerCheckpoint(j, -1L);
    }

    public boolean triggerCheckpoint(long j, long j2) throws Exception {
        long andIncrement;
        synchronized (this.lock) {
            if (this.shutdown) {
                return false;
            }
            if (this.triggerRequestQueued) {
                LOG.warn("Trying to trigger another checkpoint while one was queued already");
                return false;
            }
            if (this.pendingCheckpoints.size() >= this.maxConcurrentCheckpointAttempts) {
                this.triggerRequestQueued = true;
                if (this.currentPeriodicTrigger != null) {
                    this.currentPeriodicTrigger.cancel();
                    this.currentPeriodicTrigger = null;
                }
                return false;
            }
            ExecutionAttemptID[] executionAttemptIDArr = new ExecutionAttemptID[this.tasksToTrigger.length];
            for (int i = 0; i < this.tasksToTrigger.length; i++) {
                Execution currentExecutionAttempt = this.tasksToTrigger[i].getCurrentExecutionAttempt();
                if (currentExecutionAttempt == null || currentExecutionAttempt.getState() != ExecutionState.RUNNING) {
                    LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.", this.tasksToTrigger[i].getSimpleName());
                    return false;
                }
                executionAttemptIDArr[i] = currentExecutionAttempt.getAttemptId();
            }
            HashMap hashMap = new HashMap(this.tasksToWaitFor.length);
            for (ExecutionVertex executionVertex : this.tasksToWaitFor) {
                Execution currentExecutionAttempt2 = executionVertex.getCurrentExecutionAttempt();
                if (currentExecutionAttempt2 == null) {
                    LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.", executionVertex.getSimpleName());
                    return false;
                }
                hashMap.put(currentExecutionAttempt2.getAttemptId(), executionVertex);
            }
            if (j2 < 0) {
                try {
                    andIncrement = this.checkpointIdCounter.getAndIncrement();
                } catch (Throwable th) {
                    int i2 = this.numUnsuccessfulCheckpointsTriggers + 1;
                    this.numUnsuccessfulCheckpointsTriggers = i2;
                    LOG.warn("Failed to trigger checkpoint (" + i2 + " consecutive failed attempts so far)", th);
                    return false;
                }
            } else {
                andIncrement = j2;
            }
            LOG.info("Triggering checkpoint " + andIncrement + " @ " + j);
            final PendingCheckpoint pendingCheckpoint = new PendingCheckpoint(this.job, andIncrement, j, hashMap);
            final long j3 = andIncrement;
            TimerTask timerTask = new TimerTask() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinator.2
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    try {
                        synchronized (CheckpointCoordinator.this.lock) {
                            if (!pendingCheckpoint.isDiscarded()) {
                                CheckpointCoordinator.LOG.info("Checkpoint " + j3 + " expired before completing.");
                                pendingCheckpoint.discard(CheckpointCoordinator.this.userClassLoader);
                                CheckpointCoordinator.this.pendingCheckpoints.remove(Long.valueOf(j3));
                                CheckpointCoordinator.this.rememberRecentCheckpointId(j3);
                                CheckpointCoordinator.this.onCancelCheckpoint(j3);
                                CheckpointCoordinator.this.triggerQueuedRequests();
                            }
                        }
                    } catch (Throwable th2) {
                        CheckpointCoordinator.LOG.error("Exception while handling checkpoint timeout", th2);
                    }
                }
            };
            try {
                synchronized (this.lock) {
                    if (this.shutdown) {
                        return false;
                    }
                    if (this.triggerRequestQueued) {
                        LOG.warn("Trying to trigger another checkpoint while one was queued already");
                        return false;
                    }
                    if (this.pendingCheckpoints.size() >= this.maxConcurrentCheckpointAttempts) {
                        this.triggerRequestQueued = true;
                        if (this.currentPeriodicTrigger != null) {
                            this.currentPeriodicTrigger.cancel();
                            this.currentPeriodicTrigger = null;
                        }
                        return false;
                    }
                    this.pendingCheckpoints.put(Long.valueOf(andIncrement), pendingCheckpoint);
                    this.timer.schedule(timerTask, this.checkpointTimeout);
                    for (int i3 = 0; i3 < this.tasksToTrigger.length; i3++) {
                        ExecutionAttemptID executionAttemptID = executionAttemptIDArr[i3];
                        this.tasksToTrigger[i3].sendMessageToCurrentExecution(new TriggerCheckpoint(this.job, executionAttemptID, andIncrement, j), executionAttemptID);
                    }
                    this.numUnsuccessfulCheckpointsTriggers = 0;
                    return true;
                }
            } catch (Throwable th2) {
                synchronized (this.lock) {
                    this.pendingCheckpoints.remove(Long.valueOf(andIncrement));
                    int i4 = this.numUnsuccessfulCheckpointsTriggers + 1;
                    this.numUnsuccessfulCheckpointsTriggers = i4;
                    LOG.warn("Failed to trigger checkpoint (" + i4 + " consecutive failed attempts so far)", th2);
                    if (pendingCheckpoint.isDiscarded()) {
                        return false;
                    }
                    pendingCheckpoint.discard(this.userClassLoader);
                    return false;
                }
            }
        }
    }

    public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint acknowledgeCheckpoint) throws Exception {
        boolean z;
        if (this.shutdown || acknowledgeCheckpoint == null) {
            return false;
        }
        if (!this.job.equals(acknowledgeCheckpoint.getJob())) {
            LOG.error("Received AcknowledgeCheckpoint message for wrong job: {}", acknowledgeCheckpoint);
            return false;
        }
        long checkpointId = acknowledgeCheckpoint.getCheckpointId();
        CompletedCheckpoint completedCheckpoint = null;
        synchronized (this.lock) {
            if (this.shutdown) {
                return false;
            }
            PendingCheckpoint pendingCheckpoint = this.pendingCheckpoints.get(Long.valueOf(checkpointId));
            if (pendingCheckpoint == null || pendingCheckpoint.isDiscarded()) {
                if (pendingCheckpoint != null) {
                    throw new IllegalStateException("Received message for discarded but non-removed checkpoint " + checkpointId);
                }
                if (this.recentPendingCheckpoints.contains(Long.valueOf(checkpointId))) {
                    z = true;
                    LOG.warn("Received late message for now expired checkpoint attempt " + checkpointId);
                } else {
                    z = false;
                }
            } else {
                z = true;
                if (!pendingCheckpoint.acknowledgeTask(acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getState(), acknowledgeCheckpoint.getStateSize())) {
                    LOG.error("Received duplicate or invalid acknowledge message for checkpoint " + checkpointId + " , task " + acknowledgeCheckpoint.getTaskExecutionId());
                } else if (pendingCheckpoint.isFullyAcknowledged()) {
                    completedCheckpoint = pendingCheckpoint.toCompletedCheckpoint();
                    this.completedCheckpointStore.addCheckpoint(completedCheckpoint);
                    LOG.info("Completed checkpoint " + checkpointId + " (in " + completedCheckpoint.getDuration() + " ms)");
                    LOG.debug(completedCheckpoint.getStates().toString());
                    this.pendingCheckpoints.remove(Long.valueOf(checkpointId));
                    rememberRecentCheckpointId(checkpointId);
                    dropSubsumedCheckpoints(completedCheckpoint.getTimestamp());
                    onFullyAcknowledgedCheckpoint(completedCheckpoint);
                    triggerQueuedRequests();
                }
            }
            if (completedCheckpoint != null) {
                long timestamp = completedCheckpoint.getTimestamp();
                for (ExecutionVertex executionVertex : this.tasksToCommitTo) {
                    Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
                    if (currentExecutionAttempt != null) {
                        executionVertex.sendMessageToCurrentExecution(new NotifyCheckpointComplete(this.job, currentExecutionAttempt.getAttemptId(), checkpointId, timestamp), currentExecutionAttempt.getAttemptId());
                    }
                }
                this.statsTracker.onCompletedCheckpoint(completedCheckpoint);
            }
            return z;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void rememberRecentCheckpointId(long j) {
        if (this.recentPendingCheckpoints.size() >= 16) {
            this.recentPendingCheckpoints.removeFirst();
        }
        this.recentPendingCheckpoints.addLast(Long.valueOf(j));
    }

    private void dropSubsumedCheckpoints(long j) {
        Iterator<Map.Entry<Long, PendingCheckpoint>> it = this.pendingCheckpoints.entrySet().iterator();
        while (it.hasNext()) {
            PendingCheckpoint value = it.next().getValue();
            if (value.getCheckpointTimestamp() < j) {
                rememberRecentCheckpointId(value.getCheckpointId());
                value.discard(this.userClassLoader);
                onCancelCheckpoint(value.getCheckpointId());
                it.remove();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void triggerQueuedRequests() throws Exception {
        if (this.triggerRequestQueued) {
            this.triggerRequestQueued = false;
            ScheduledTrigger scheduledTrigger = new ScheduledTrigger();
            if (!this.periodicScheduling) {
                this.timer.schedule(scheduledTrigger, 0L);
                return;
            }
            if (this.currentPeriodicTrigger != null) {
                this.currentPeriodicTrigger.cancel();
            }
            this.currentPeriodicTrigger = scheduledTrigger;
            this.timer.scheduleAtFixedRate(scheduledTrigger, 0L, this.baseInterval);
        }
    }

    public void restoreLatestCheckpointedState(Map<JobVertexID, ExecutionJobVertex> map, boolean z, boolean z2) throws Exception {
        synchronized (this.lock) {
            if (this.shutdown) {
                throw new IllegalStateException("CheckpointCoordinator is shut down");
            }
            this.completedCheckpointStore.recover();
            CompletedCheckpoint latestCheckpoint = this.completedCheckpointStore.getLatestCheckpoint();
            if (latestCheckpoint == null) {
                if (z) {
                    throw new IllegalStateException("No completed checkpoint available");
                }
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            if (z2) {
                HashMap hashMap = new HashMap();
                for (StateForTask stateForTask : latestCheckpoint.getStates()) {
                    ExecutionJobVertex executionJobVertex = map.get(stateForTask.getOperatorId());
                    executionJobVertex.getTaskVertices()[stateForTask.getSubtask()].getCurrentExecutionAttempt().setInitialState(stateForTask.getState(), currentTimeMillis);
                    Integer num = (Integer) hashMap.get(executionJobVertex);
                    if (num != null) {
                        hashMap.put(executionJobVertex, Integer.valueOf(num.intValue() + 1));
                    } else {
                        hashMap.put(executionJobVertex, 1);
                    }
                }
                for (Map.Entry entry : hashMap.entrySet()) {
                    ExecutionJobVertex executionJobVertex2 = (ExecutionJobVertex) entry.getKey();
                    if (((Integer) entry.getValue()).intValue() != executionJobVertex2.getParallelism()) {
                        throw new IllegalStateException("The checkpoint contained state only for a subset of tasks for vertex " + executionJobVertex2);
                    }
                }
            } else {
                for (StateForTask stateForTask2 : latestCheckpoint.getStates()) {
                    map.get(stateForTask2.getOperatorId()).getTaskVertices()[stateForTask2.getSubtask()].getCurrentExecutionAttempt().setInitialState(stateForTask2.getState(), currentTimeMillis);
                }
            }
        }
    }

    public int getNumberOfPendingCheckpoints() {
        return this.pendingCheckpoints.size();
    }

    public int getNumberOfRetainedSuccessfulCheckpoints() {
        int numberOfRetainedCheckpoints;
        synchronized (this.lock) {
            numberOfRetainedCheckpoints = this.completedCheckpointStore.getNumberOfRetainedCheckpoints();
        }
        return numberOfRetainedCheckpoints;
    }

    public Map<Long, PendingCheckpoint> getPendingCheckpoints() {
        HashMap hashMap;
        synchronized (this.lock) {
            hashMap = new HashMap(this.pendingCheckpoints);
        }
        return hashMap;
    }

    public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws Exception {
        List<CompletedCheckpoint> allCheckpoints;
        synchronized (this.lock) {
            allCheckpoints = this.completedCheckpointStore.getAllCheckpoints();
        }
        return allCheckpoints;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getAndIncrementCheckpointId() {
        try {
            return this.checkpointIdCounter.getAndIncrement();
        } catch (Throwable th) {
            int i = this.numUnsuccessfulCheckpointsTriggers + 1;
            this.numUnsuccessfulCheckpointsTriggers = i;
            LOG.warn("Failed to trigger checkpoint (" + i + " consecutive failed attempts so far)", th);
            return -1L;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ActorGateway getJobStatusListener() {
        return this.jobStatusListener;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setJobStatusListener(ActorGateway actorGateway) {
        this.jobStatusListener = actorGateway;
    }

    public void startCheckpointScheduler() {
        synchronized (this.lock) {
            if (this.shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            stopCheckpointScheduler();
            this.periodicScheduling = true;
            this.currentPeriodicTrigger = new ScheduledTrigger();
            this.timer.scheduleAtFixedRate(this.currentPeriodicTrigger, this.baseInterval, this.baseInterval);
        }
    }

    public void stopCheckpointScheduler() {
        synchronized (this.lock) {
            this.triggerRequestQueued = false;
            this.periodicScheduling = false;
            if (this.currentPeriodicTrigger != null) {
                this.currentPeriodicTrigger.cancel();
                this.currentPeriodicTrigger = null;
            }
            Iterator<PendingCheckpoint> it = this.pendingCheckpoints.values().iterator();
            while (it.hasNext()) {
                it.next().discard(this.userClassLoader);
            }
            this.pendingCheckpoints.clear();
            this.numUnsuccessfulCheckpointsTriggers = 0;
        }
    }

    public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID uuid) {
        ActorGateway actorGateway;
        synchronized (this.lock) {
            if (this.shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            if (this.jobStatusListener == null) {
                this.jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(Props.create(CheckpointCoordinatorDeActivator.class, new Object[]{this, uuid})), uuid);
            }
            actorGateway = this.jobStatusListener;
        }
        return actorGateway;
    }
}
