package org.apache.flink.runtime.checkpoint;

import akka.actor.ActorSystem;
import akka.actor.Props;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.ApplicationID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.types.IntValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.impl.Promise;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/SavepointCoordinator.class */
public class SavepointCoordinator extends CheckpointCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(SavepointCoordinator.class);
    private ApplicationID appId;
    private StateStore<Savepoint> savepointStore;
    private final Map<Long, Promise<String>> savepointPromises;

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/SavepointCoordinator$IgnoreCompletedCheckpointsStore.class */
    private static class IgnoreCompletedCheckpointsStore implements CompletedCheckpointStore {
        private static final CompletedCheckpointStore INSTANCE = new IgnoreCompletedCheckpointsStore();

        private IgnoreCompletedCheckpointsStore() {
        }

        @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
        public void recover() throws Exception {
        }

        @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
        public void addCheckpoint(CompletedCheckpoint completedCheckpoint) throws Exception {
        }

        @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
        public CompletedCheckpoint getLatestCheckpoint() throws Exception {
            return null;
        }

        @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
        public void discardAllCheckpoints() throws Exception {
        }

        @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
        public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
            return Collections.emptyList();
        }

        @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStore
        public int getNumberOfRetainedCheckpoints() {
            return 0;
        }
    }

    public SavepointCoordinator(ApplicationID applicationID, JobID jobID, long j, long j2, ExecutionVertex[] executionVertexArr, ExecutionVertex[] executionVertexArr2, ExecutionVertex[] executionVertexArr3, ClassLoader classLoader, CheckpointIDCounter checkpointIDCounter, StateStore<Savepoint> stateStore, CheckpointStatsTracker checkpointStatsTracker) throws Exception {
        super(jobID, j, j2, 0L, Integer.MAX_VALUE, executionVertexArr, executionVertexArr2, executionVertexArr3, classLoader, checkpointIDCounter, IgnoreCompletedCheckpointsStore.INSTANCE, RecoveryMode.STANDALONE, checkpointStatsTracker);
        this.appId = (ApplicationID) Preconditions.checkNotNull(applicationID);
        this.savepointStore = (StateStore) Preconditions.checkNotNull(stateStore);
        this.savepointPromises = new ConcurrentHashMap();
    }

    public Future<String> triggerSavepoint(long j) throws Exception {
        long andIncrementCheckpointId;
        Promise<String> defaultPromise = new Promise.DefaultPromise<>();
        try {
            andIncrementCheckpointId = getAndIncrementCheckpointId();
        } catch (Throwable th) {
            defaultPromise.failure(new Exception("Failed to trigger savepoint", th));
        }
        if (andIncrementCheckpointId == -1) {
            throw new IllegalStateException("Failed to get checkpoint Id");
        }
        if (this.savepointPromises.put(Long.valueOf(andIncrementCheckpointId), defaultPromise) != null) {
            throw new IllegalStateException("Duplicate checkpoint ID");
        }
        boolean z = false;
        try {
            z = triggerCheckpoint(j, andIncrementCheckpointId);
            if (!z) {
                this.savepointPromises.remove(Long.valueOf(andIncrementCheckpointId));
                defaultPromise.failure(new Exception("Failed to trigger savepoint"));
            }
            return defaultPromise.future();
        } catch (Throwable th2) {
            if (!z) {
                this.savepointPromises.remove(Long.valueOf(andIncrementCheckpointId));
                defaultPromise.failure(new Exception("Failed to trigger savepoint"));
            }
            throw th2;
        }
    }

    public ApplicationID restoreSavepoint(Map<JobVertexID, ExecutionJobVertex> map, String str) throws Exception {
        ApplicationID applicationID;
        Preconditions.checkNotNull(str, "Savepoint path");
        synchronized (this.lock) {
            if (isShutdown()) {
                throw new IllegalStateException("CheckpointCoordinator is shut down");
            }
            long currentTimeMillis = System.currentTimeMillis();
            LOG.info("Rolling back to savepoint '{}'.", str);
            Savepoint state = this.savepointStore.getState(str);
            CompletedCheckpoint completedCheckpoint = state.getCompletedCheckpoint();
            LOG.info("Savepoint: {}@{}", Long.valueOf(completedCheckpoint.getCheckpointID()), Long.valueOf(completedCheckpoint.getTimestamp()));
            HashMap hashMap = new HashMap();
            for (StateForTask stateForTask : completedCheckpoint.getStates()) {
                ExecutionJobVertex executionJobVertex = map.get(stateForTask.getOperatorId());
                if (executionJobVertex == null) {
                    throw new IllegalStateException(String.format("Failed to rollback to savepoint %s. Cannot map old state for task %s to the new program. This indicates that the program has been changed after the savepoint.", state, stateForTask.getOperatorId()));
                }
                IntValue intValue = (IntValue) hashMap.get(executionJobVertex.getJobVertexId());
                if (intValue == null) {
                    intValue = new IntValue(executionJobVertex.getParallelism());
                    hashMap.put(executionJobVertex.getJobVertexId(), intValue);
                }
                executionJobVertex.getTaskVertices()[stateForTask.getSubtask()].getCurrentExecutionAttempt().setInitialState(stateForTask.getState(), currentTimeMillis);
                intValue.setValue(intValue.getValue() - 1);
            }
            Iterator it = hashMap.values().iterator();
            while (it.hasNext()) {
                if (((IntValue) it.next()).getValue() != 0) {
                    throw new IllegalStateException("Parallelism mismatch between savepoint tasks and new program. This indicates that the program has been changed after the savepoint.");
                }
            }
            long checkpointID = completedCheckpoint.getCheckpointID();
            this.checkpointIdCounter.setCount(checkpointID + 1);
            LOG.info("Reset the checkpoint ID to {}", Long.valueOf(checkpointID));
            this.appId = state.getApplicationId();
            LOG.info("Reset the application ID to {}", this.appId);
            applicationID = this.appId;
        }
        return applicationID;
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointCoordinator
    protected void onShutdown() {
        Iterator<scala.concurrent.Promise<String>> it = this.savepointPromises.values().iterator();
        while (it.hasNext()) {
            it.next().failure(new Exception("Checkpoint coordinator shutdown"));
        }
        this.savepointPromises.clear();
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointCoordinator
    protected void onCancelCheckpoint(long j) {
        scala.concurrent.Promise<String> remove = this.savepointPromises.remove(Long.valueOf(j));
        if (remove != null) {
            remove.failure(new Exception("Savepoint expired before completing"));
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointCoordinator
    protected void onFullyAcknowledgedCheckpoint(CompletedCheckpoint completedCheckpoint) {
        scala.concurrent.Promise promise = (scala.concurrent.Promise) Preconditions.checkNotNull(this.savepointPromises.remove(Long.valueOf(completedCheckpoint.getCheckpointID())));
        if (promise.isCompleted()) {
            throw new IllegalStateException("Savepoint promise completed");
        }
        try {
            promise.success(this.savepointStore.putState(new Savepoint(this.appId, completedCheckpoint)));
        } catch (Exception e) {
            LOG.warn("Failed to store savepoint.", (Throwable) e);
            promise.failure(e);
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointCoordinator
    public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID uuid) {
        ActorGateway jobStatusListener;
        synchronized (this.lock) {
            if (isShutdown()) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            if (getJobStatusListener() == null) {
                setJobStatusListener(new AkkaActorGateway(actorSystem.actorOf(Props.create(SavepointCoordinatorDeActivator.class, new Object[]{this, uuid})), uuid));
            }
            jobStatusListener = getJobStatusListener();
        }
        return jobStatusListener;
    }
}
