package org.apache.flink.runtime.shuffle;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/shuffle/ShuffleMasterSnapshotUtil.class */
public class ShuffleMasterSnapshotUtil {
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleMasterSnapshotUtil.class);

    public static void restoreOrSnapshotShuffleMaster(ShuffleMaster<?> shuffleMaster, Configuration configuration, Executor executor) throws IOException {
        if (((Boolean) configuration.get(BatchExecutionOptions.JOB_RECOVERY_ENABLED)).booleanValue() && shuffleMaster.supportsBatchSnapshot()) {
            String str = (String) configuration.get(HighAvailabilityOptions.HA_CLUSTER_ID);
            Path path = new Path(HighAvailabilityServicesUtils.getClusterHighAvailableStoragePath(configuration), "shuffleMaster-snapshot");
            if (isShuffleMasterSnapshotExist(path, str)) {
                ShuffleMasterSnapshot readSnapshot = readSnapshot(path, str);
                LOG.info("Restore shuffle master state from cluster level snapshot.");
                shuffleMaster.restoreState(readSnapshot);
            } else {
                shuffleMaster.restoreState(null);
                CompletableFuture completableFuture = new CompletableFuture();
                executor.execute(() -> {
                    LOG.info("Take a cluster level shuffle master snapshot.");
                    shuffleMaster.snapshotState(completableFuture);
                    completableFuture.thenAccept(shuffleMasterSnapshot -> {
                        try {
                            writeSnapshot(shuffleMasterSnapshot, path, str);
                        } catch (IOException e) {
                            LOG.warn("Write cluster level shuffle master snapshot failed.", e);
                        }
                    });
                });
            }
        }
    }

    private static void writeSnapshot(ShuffleMasterSnapshot shuffleMasterSnapshot, Path path, String str) throws IOException {
        FileSystem fileSystem = path.getFileSystem();
        if (fileSystem.exists(path)) {
            throw new IOException("Shuffle master dir " + path + " already exists.");
        }
        fileSystem.mkdirs(path);
        LOG.info("Create shuffle master snapshot dir {}.", path);
        FSDataOutputStream create = fileSystem.create(new Path(path, str), FileSystem.WriteMode.NO_OVERWRITE);
        try {
            byte[] serializeObject = InstantiationUtil.serializeObject(shuffleMasterSnapshot);
            writeInt(create, serializeObject.length);
            create.write(serializeObject);
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static void writeInt(FSDataOutputStream fSDataOutputStream, int i) throws IOException {
        fSDataOutputStream.write((i >>> 24) & 255);
        fSDataOutputStream.write((i >>> 16) & 255);
        fSDataOutputStream.write((i >>> 8) & 255);
        fSDataOutputStream.write(i & 255);
    }

    @VisibleForTesting
    static boolean isShuffleMasterSnapshotExist(Path path, String str) throws IOException {
        return path.getFileSystem().exists(new Path(path, str));
    }

    @VisibleForTesting
    static ShuffleMasterSnapshot readSnapshot(Path path, String str) throws IOException {
        try {
            DataInputStream dataInputStream = new DataInputStream(path.getFileSystem().open(new Path(path, str)));
            try {
                byte[] bArr = new byte[dataInputStream.readInt()];
                dataInputStream.readFully(bArr);
                ShuffleMasterSnapshot shuffleMasterSnapshot = (ShuffleMasterSnapshot) InstantiationUtil.deserializeObject(bArr, ClassLoader.getSystemClassLoader());
                dataInputStream.close();
                return shuffleMasterSnapshot;
            } finally {
            }
        } catch (ClassNotFoundException e) {
            throw new IOException("Deserialize ShuffleMasterSnapshot failed.", e);
        }
    }
}
