package org.apache.flink.runtime.state.filesystem;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Objects;
import java.util.Random;
import javax.annotation.Nullable;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/flink/runtime/state/filesystem/AbstractFileCheckpointStorageAccessTestBase.class */
public abstract class AbstractFileCheckpointStorageAccessTestBase {

    @TempDir
    protected Path tmp;

    protected abstract CheckpointStorageAccess createCheckpointStorage(org.apache.flink.core.fs.Path path, boolean z) throws Exception;

    protected abstract CheckpointStorageAccess createCheckpointStorageWithSavepointDir(org.apache.flink.core.fs.Path path, org.apache.flink.core.fs.Path path2, boolean z) throws Exception;

    @Test
    void testPointerPathResolution() throws Exception {
        FileSystem localFileSystem = FileSystem.getLocalFileSystem();
        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(org.apache.flink.core.fs.Path.fromLocalFile(TempDirUtils.newFolder(this.tmp)), "_metadata");
        String path2 = path.getParent().toString();
        String path3 = path.toString();
        String path4 = path.getParent().toString();
        String str = path.getParent().toString() + "/";
        CheckpointStorageAccess createCheckpointStorage = createCheckpointStorage(randomTempPath(), true);
        byte[] bArr = new byte[23686];
        new Random().nextBytes(bArr);
        FSDataOutputStream create = localFileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE);
        try {
            create.write(bArr);
            if (create != null) {
                create.close();
            }
            CompletedCheckpointStorageLocation resolveCheckpoint = createCheckpointStorage.resolveCheckpoint(path3);
            CompletedCheckpointStorageLocation resolveCheckpoint2 = createCheckpointStorage.resolveCheckpoint(path4);
            CompletedCheckpointStorageLocation resolveCheckpoint3 = createCheckpointStorage.resolveCheckpoint(str);
            Assertions.assertThat(resolveCheckpoint.getExternalPointer()).isEqualTo(path2);
            Assertions.assertThat(resolveCheckpoint2.getExternalPointer()).isEqualTo(path2);
            Assertions.assertThat(resolveCheckpoint3.getExternalPointer()).isEqualTo(path2);
            StreamStateHandle metadataHandle = resolveCheckpoint.getMetadataHandle();
            StreamStateHandle metadataHandle2 = resolveCheckpoint2.getMetadataHandle();
            StreamStateHandle metadataHandle3 = resolveCheckpoint3.getMetadataHandle();
            Assertions.assertThat(metadataHandle).isNotNull();
            Assertions.assertThat(metadataHandle2).isNotNull();
            Assertions.assertThat(metadataHandle3).isNotNull();
            validateContents(metadataHandle, bArr);
            validateContents(metadataHandle2, bArr);
            validateContents(metadataHandle3, bArr);
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testFailingPointerPathResolution() throws Exception {
        CheckpointStorageAccess createCheckpointStorage = createCheckpointStorage(randomTempPath(), true);
        Assertions.assertThatThrownBy(() -> {
            createCheckpointStorage.resolveCheckpoint((String) null);
        }).isInstanceOf(NullPointerException.class);
        Assertions.assertThatThrownBy(() -> {
            createCheckpointStorage.resolveCheckpoint("");
        }).isInstanceOf(IllegalArgumentException.class);
        Assertions.assertThatThrownBy(() -> {
            createCheckpointStorage.resolveCheckpoint("this-is_not/a#filepath.at.all");
        }).isInstanceOf(IOException.class);
        Assertions.assertThatThrownBy(() -> {
            createCheckpointStorage.resolveCheckpoint(TempDirUtils.newFile(this.tmp).toURI().toString() + "_not_existing");
        }).isInstanceOf(IOException.class);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest(name = "create checkpoint job-id sub-directory: {0}")
    public void testCreateCheckpointSubDirs(boolean z) throws Exception {
        FileSystem localFileSystem = FileSystem.getLocalFileSystem();
        org.apache.flink.core.fs.Path fromLocalFile = org.apache.flink.core.fs.Path.fromLocalFile(TempDirUtils.newFolder(this.tmp));
        CheckpointStorageAccess createCheckpointStorage = createCheckpointStorage(fromLocalFile, z);
        createCheckpointStorage.initializeBaseLocationsForCheckpoint();
        createCheckpointStorage.initializeLocationForCheckpoint(42L);
        FileStatus[] listStatus = localFileSystem.listStatus(fromLocalFile);
        if (z) {
            for (FileStatus fileStatus : listStatus) {
                Assertions.assertThat(localFileSystem.listStatus(fileStatus.getPath()).length > 0).isTrue();
            }
            return;
        }
        for (FileStatus fileStatus2 : listStatus) {
            Assertions.assertThat(localFileSystem.listStatus(fileStatus2.getPath()).length).isEqualTo(0);
        }
    }

    @Test
    void testPersistMultipleMetadataOnlyCheckpoints() throws Exception {
        FileSystem localFileSystem = FileSystem.getLocalFileSystem();
        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.tmp).toURI());
        CheckpointStorageAccess createCheckpointStorage = createCheckpointStorage(path, true);
        createCheckpointStorage.initializeBaseLocationsForCheckpoint();
        CheckpointStorageAccess createCheckpointStorage2 = createCheckpointStorage(path, true);
        createCheckpointStorage2.initializeBaseLocationsForCheckpoint();
        CheckpointStorageLocation initializeLocationForCheckpoint = createCheckpointStorage.initializeLocationForCheckpoint(177L);
        CheckpointStorageLocation initializeLocationForCheckpoint2 = createCheckpointStorage2.initializeLocationForCheckpoint(177L);
        byte[] bArr = {77, 66, 55, 99, 88};
        byte[] bArr2 = {1, 3, 2, 5, 4};
        CheckpointMetadataOutputStream createMetadataOutputStream = initializeLocationForCheckpoint.createMetadataOutputStream();
        try {
            createMetadataOutputStream.write(bArr);
            CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint = createMetadataOutputStream.closeAndFinalizeCheckpoint();
            if (createMetadataOutputStream != null) {
                createMetadataOutputStream.close();
            }
            String externalPointer = closeAndFinalizeCheckpoint.getExternalPointer();
            createMetadataOutputStream = initializeLocationForCheckpoint2.createMetadataOutputStream();
            try {
                createMetadataOutputStream.write(bArr2);
                CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint2 = createMetadataOutputStream.closeAndFinalizeCheckpoint();
                if (createMetadataOutputStream != null) {
                    createMetadataOutputStream.close();
                }
                String externalPointer2 = closeAndFinalizeCheckpoint2.getExternalPointer();
                FileStatus[] listStatus = localFileSystem.listStatus(path);
                Assertions.assertThat(listStatus).hasSize(2);
                FileStatus[] listStatus2 = localFileSystem.listStatus(listStatus[0].getPath());
                FileStatus[] listStatus3 = localFileSystem.listStatus(listStatus[1].getPath());
                Assertions.assertThat(listStatus2).hasSizeGreaterThanOrEqualTo(1);
                Assertions.assertThat(listStatus3).hasSizeGreaterThanOrEqualTo(1);
                Assertions.assertThat(localFileSystem.exists(new org.apache.flink.core.fs.Path(externalPointer, "_metadata"))).isTrue();
                Assertions.assertThat(localFileSystem.exists(new org.apache.flink.core.fs.Path(externalPointer2, "_metadata"))).isTrue();
                validateContents(createCheckpointStorage.resolveCheckpoint(externalPointer).getMetadataHandle(), bArr);
                validateContents(createCheckpointStorage.resolveCheckpoint(externalPointer2).getMetadataHandle(), bArr2);
                validateContents(createCheckpointStorage2.resolveCheckpoint(externalPointer).getMetadataHandle(), bArr);
                validateContents(createCheckpointStorage2.resolveCheckpoint(externalPointer2).getMetadataHandle(), bArr2);
            } finally {
            }
        } finally {
        }
    }

    @Test
    void writeToAlreadyExistingCheckpointFails() throws Exception {
        byte[] bArr = {8, 8, 4, 5, 2, 6, 3};
        CheckpointStorageAccess createCheckpointStorage = createCheckpointStorage(randomTempPath(), true);
        createCheckpointStorage.initializeBaseLocationsForCheckpoint();
        CheckpointStorageLocation initializeLocationForCheckpoint = createCheckpointStorage.initializeLocationForCheckpoint(177L);
        CheckpointMetadataOutputStream createMetadataOutputStream = initializeLocationForCheckpoint.createMetadataOutputStream();
        try {
            createMetadataOutputStream.write(bArr);
            createMetadataOutputStream.closeAndFinalizeCheckpoint();
            if (createMetadataOutputStream != null) {
                createMetadataOutputStream.close();
            }
            Objects.requireNonNull(initializeLocationForCheckpoint);
            Assertions.assertThatThrownBy(initializeLocationForCheckpoint::createMetadataOutputStream).isInstanceOf(IOException.class);
        } catch (Throwable th) {
            if (createMetadataOutputStream != null) {
                try {
                    createMetadataOutputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testSavepointPathConfiguredAndTarget() throws Exception {
        org.apache.flink.core.fs.Path randomTempPath = randomTempPath();
        org.apache.flink.core.fs.Path randomTempPath2 = randomTempPath();
        testSavepoint(randomTempPath, randomTempPath2, randomTempPath2);
    }

    @Test
    void testSavepointPathConfiguredNoTarget() throws Exception {
        org.apache.flink.core.fs.Path randomTempPath = randomTempPath();
        testSavepoint(randomTempPath, null, randomTempPath);
    }

    @Test
    void testNoSavepointPathConfiguredAndTarget() throws Exception {
        org.apache.flink.core.fs.Path fromLocalFile = org.apache.flink.core.fs.Path.fromLocalFile(TempDirUtils.newFolder(this.tmp));
        testSavepoint(null, fromLocalFile, fromLocalFile);
    }

    @Test
    void testNoSavepointPathConfiguredNoTarget() throws Exception {
        CheckpointStorageAccess createCheckpointStorage = createCheckpointStorage(randomTempPath(), true);
        Assertions.assertThatThrownBy(() -> {
            createCheckpointStorage.initializeLocationForSavepoint(1337L, (String) null);
        }).isInstanceOf(IllegalArgumentException.class);
    }

    private void testSavepoint(@Nullable org.apache.flink.core.fs.Path path, @Nullable org.apache.flink.core.fs.Path path2, org.apache.flink.core.fs.Path path3) throws Exception {
        byte[] bArr = {77, 66, 55, 99, 88};
        CheckpointMetadataOutputStream createMetadataOutputStream = (path == null ? createCheckpointStorage(randomTempPath(), true) : createCheckpointStorageWithSavepointDir(randomTempPath(), path, true)).initializeLocationForSavepoint(52452L, path2 == null ? null : path2.toString()).createMetadataOutputStream();
        try {
            createMetadataOutputStream.write(bArr);
            CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint = createMetadataOutputStream.closeAndFinalizeCheckpoint();
            if (createMetadataOutputStream != null) {
                createMetadataOutputStream.close();
            }
            Assertions.assertThat(org.apache.flink.core.fs.Path.fromLocalFile(new File(new org.apache.flink.core.fs.Path(closeAndFinalizeCheckpoint.getExternalPointer()).getParent().getPath()))).isEqualTo(path3);
            validateContents(closeAndFinalizeCheckpoint.getMetadataHandle(), bArr);
            Assertions.assertThat(new org.apache.flink.core.fs.Path(new File(closeAndFinalizeCheckpoint.getMetadataHandle().getFilePath().getParent().getParent().getPath()).toURI())).isEqualTo(path3);
        } catch (Throwable th) {
            if (createMetadataOutputStream != null) {
                try {
                    createMetadataOutputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public org.apache.flink.core.fs.Path randomTempPath() throws IOException {
        return org.apache.flink.core.fs.Path.fromLocalFile(TempDirUtils.newFolder(this.tmp));
    }

    private static void validateContents(StreamStateHandle streamStateHandle, byte[] bArr) throws IOException {
        FSDataInputStream openInputStream = streamStateHandle.openInputStream();
        try {
            validateContents((InputStream) openInputStream, bArr);
            if (openInputStream != null) {
                openInputStream.close();
            }
        } catch (Throwable th) {
            if (openInputStream != null) {
                try {
                    openInputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static void validateContents(InputStream inputStream, byte[] bArr) throws IOException {
        byte[] bArr2 = new byte[bArr.length];
        int i = 0;
        int length = bArr.length;
        while (true) {
            int i2 = length;
            if (i2 <= 0) {
                Assertions.assertThat(bArr2).isEqualTo(bArr);
                return;
            }
            int read = inputStream.read(bArr2, i, i2);
            if (read == -1) {
                throw new EOFException();
            }
            i += read;
            length = i2 - read;
        }
    }
}
