/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializer;
import org.apache.flink.runtime.checkpoint.metadata.MetadataSerializers;
import org.apache.flink.runtime.checkpoint.metadata.MetadataV6Serializer;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLoader;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Checkpoints {
    private static final Logger LOG = LoggerFactory.getLogger(Checkpoints.class);
    public static final int HEADER_MAGIC_NUMBER = 1231054637;

    public static void storeCheckpointMetadata(CheckpointMetadata checkpointMetadata, OutputStream out) throws IOException {
        DataOutputStream dos = new DataOutputStream(out);
        Checkpoints.storeCheckpointMetadata(checkpointMetadata, dos);
    }

    public static void storeCheckpointMetadata(CheckpointMetadata checkpointMetadata, DataOutputStream out) throws IOException {
        Checkpoints.storeCheckpointMetadata(checkpointMetadata, out, MetadataV6Serializer.INSTANCE);
    }

    public static void storeCheckpointMetadata(CheckpointMetadata checkpointMetadata, DataOutputStream out, MetadataSerializer serializer) throws IOException {
        out.writeInt(1231054637);
        out.writeInt(serializer.getVersion());
        serializer.serialize(checkpointMetadata, out);
    }

    public static CheckpointMetadata loadCheckpointMetadata(DataInputStream in, ClassLoader classLoader, String externalPointer) throws IOException {
        Preconditions.checkNotNull((Object)in, (String)"input stream");
        Preconditions.checkNotNull((Object)classLoader, (String)"classLoader");
        int magicNumber = in.readInt();
        if (magicNumber == 1231054637) {
            int version = in.readInt();
            MetadataSerializer serializer = MetadataSerializers.getSerializer(version);
            return serializer.deserialize(in, classLoader, externalPointer);
        }
        throw new IOException("Unexpected magic number. This can have multiple reasons: (1) You are trying to load a Flink 1.0 savepoint, which is not supported by this version of Flink. (2) The file you were pointing to is not a savepoint at all. (3) The savepoint file has been corrupted.");
    }

    public static CompletedCheckpoint loadAndValidateCheckpoint(JobID jobId, Map<JobVertexID, ExecutionJobVertex> tasks, CompletedCheckpointStorageLocation location, ClassLoader classLoader, boolean allowNonRestoredState, CheckpointProperties checkpointProperties) throws IOException {
        CheckpointMetadata checkpointMetadata;
        Preconditions.checkNotNull((Object)jobId, (String)"jobId");
        Preconditions.checkNotNull(tasks, (String)"tasks");
        Preconditions.checkNotNull((Object)location, (String)"location");
        Preconditions.checkNotNull((Object)classLoader, (String)"classLoader");
        StreamStateHandle metadataHandle = location.getMetadataHandle();
        String checkpointPointer = location.getExternalPointer();
        try (FSDataInputStream in = metadataHandle.openInputStream();){
            DataInputStream dis = new DataInputStream((InputStream)in);
            checkpointMetadata = Checkpoints.loadCheckpointMetadata(dis, classLoader, checkpointPointer);
        }
        HashMap<OperatorID, ExecutionJobVertex> operatorToJobVertexMapping = new HashMap<OperatorID, ExecutionJobVertex>();
        for (ExecutionJobVertex task : tasks.values()) {
            for (OperatorIDPair operatorIDPair : task.getOperatorIDs()) {
                operatorToJobVertexMapping.put(operatorIDPair.getGeneratedOperatorID(), task);
                operatorIDPair.getUserDefinedOperatorID().ifPresent(id -> operatorToJobVertexMapping.put((OperatorID)((Object)id), task));
            }
        }
        HashMap operatorStates = CollectionUtil.newHashMapWithExpectedSize((int)checkpointMetadata.getOperatorStates().size());
        for (OperatorState operatorState : checkpointMetadata.getOperatorStates()) {
            ExecutionJobVertex executionJobVertex = (ExecutionJobVertex)operatorToJobVertexMapping.get((Object)operatorState.getOperatorID());
            if (executionJobVertex != null) {
                if (executionJobVertex.getMaxParallelism() == operatorState.getMaxParallelism() || executionJobVertex.canRescaleMaxParallelism(operatorState.getMaxParallelism())) {
                    operatorStates.put(operatorState.getOperatorID(), operatorState);
                    continue;
                }
                String msg = String.format("Failed to rollback to checkpoint/savepoint %s. Max parallelism mismatch between checkpoint/savepoint state and new program. Cannot map operator %s with max parallelism %d to new program with max parallelism %d. This indicates that the program has been changed in a non-compatible way after the checkpoint/savepoint.", new Object[]{checkpointMetadata, operatorState.getOperatorID(), operatorState.getMaxParallelism(), executionJobVertex.getMaxParallelism()});
                throw new IllegalStateException(msg);
            }
            if (allowNonRestoredState) {
                LOG.info("Skipping savepoint state for operator {}.", (Object)operatorState.getOperatorID());
                continue;
            }
            if (operatorState.getCoordinatorState() != null) {
                Checkpoints.throwNonRestoredStateException(checkpointPointer, operatorState.getOperatorID());
            }
            for (OperatorSubtaskState operatorSubtaskState : operatorState.getStates()) {
                if (!operatorSubtaskState.hasState()) continue;
                Checkpoints.throwNonRestoredStateException(checkpointPointer, operatorState.getOperatorID());
            }
            LOG.info("Skipping empty savepoint state for operator {}.", (Object)operatorState.getOperatorID());
        }
        return new CompletedCheckpoint(jobId, checkpointMetadata.getCheckpointId(), 0L, 0L, operatorStates, checkpointMetadata.getMasterStates(), checkpointProperties, location, null, checkpointMetadata.getCheckpointProperties());
    }

    private static void throwNonRestoredStateException(String checkpointPointer, OperatorID operatorId) {
        String msg = String.format("Failed to rollback to checkpoint/savepoint %s. Cannot map checkpoint/savepoint state for operator %s to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.", new Object[]{checkpointPointer, operatorId});
        throw new IllegalStateException(msg);
    }

    public static void disposeSavepoint(String pointer, CheckpointStorage checkpointStorage, ClassLoader classLoader) throws IOException, FlinkException {
        CheckpointMetadata metadata;
        Preconditions.checkNotNull((Object)pointer, (String)"location");
        Preconditions.checkNotNull((Object)checkpointStorage, (String)"stateBackend");
        Preconditions.checkNotNull((Object)classLoader, (String)"classLoader");
        CompletedCheckpointStorageLocation checkpointLocation = checkpointStorage.resolveCheckpoint(pointer);
        StreamStateHandle metadataHandle = checkpointLocation.getMetadataHandle();
        try (FSDataInputStream in = metadataHandle.openInputStream();
             DataInputStream dis = new DataInputStream((InputStream)in);){
            metadata = Checkpoints.loadCheckpointMetadata(dis, classLoader, pointer);
        }
        Exception exception = null;
        try {
            metadataHandle.discardState();
        }
        catch (Exception e) {
            exception = e;
        }
        try {
            metadata.dispose();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        try {
            checkpointLocation.disposeStorageLocation();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        if (exception != null) {
            ExceptionUtils.rethrowIOException((Throwable)exception);
        }
    }

    public static void disposeSavepoint(String pointer, Configuration configuration, ClassLoader classLoader, @Nullable Logger logger) throws IOException, FlinkException {
        Preconditions.checkNotNull((Object)pointer, (String)"location");
        Preconditions.checkNotNull((Object)configuration, (String)"configuration");
        Preconditions.checkNotNull((Object)classLoader, (String)"classLoader");
        CheckpointStorage storage = Checkpoints.loadCheckpointStorage(new Configuration(), configuration, classLoader, logger);
        Checkpoints.disposeSavepoint(pointer, storage, classLoader);
    }

    @Nonnull
    public static StateBackend loadStateBackend(Configuration jobConfig, Configuration clusterConfig, ClassLoader classLoader, @Nullable Logger logger) {
        if (logger != null) {
            logger.info("Attempting to load configured state backend for savepoint disposal");
        }
        Configuration mergedConfig = new Configuration(clusterConfig);
        mergedConfig.addAll(jobConfig);
        try {
            return StateBackendLoader.loadStateBackendFromConfig((ReadableConfig)mergedConfig, classLoader, null);
        }
        catch (Throwable t) {
            if (logger != null) {
                logger.info("Could not load configured state backend.");
                logger.debug("Detailed exception:", t);
            }
            return new HashMapStateBackend();
        }
    }

    @Nonnull
    public static CheckpointStorage loadCheckpointStorage(Configuration jobConfig, Configuration clusterConfig, ClassLoader classLoader, @Nullable Logger logger) {
        CheckpointStorage checkpointStorage;
        block4: {
            StateBackend backend = Checkpoints.loadStateBackend(jobConfig, clusterConfig, classLoader, logger);
            if (logger != null) {
                logger.info("Attempting to load configured checkpoint storage for savepoint disposal");
            }
            checkpointStorage = null;
            try {
                checkpointStorage = CheckpointStorageLoader.load(null, backend, jobConfig, clusterConfig, classLoader, null);
            }
            catch (Throwable t) {
                if (logger == null) break block4;
                logger.info("Could not load configured state backend.");
                logger.debug("Detailed exception:", t);
            }
        }
        if (checkpointStorage == null) {
            return new JobManagerCheckpointStorage();
        }
        return checkpointStorage;
    }

    private Checkpoints() {
    }
}

