package org.apache.flink.runtime.state.filesystem;

import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.PathsCopyingFileSystem;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.NotDuplicatingCheckpointStateToolset;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.apache.flink.testutils.TestFileSystem;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.class */
class FsCheckpointStorageAccessTest extends AbstractFileCheckpointStorageAccessTestBase {
    private static final int FILE_SIZE_THRESHOLD = 1024;
    private static final int WRITE_BUFFER_SIZE = 4096;

    /* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest$TestDuplicatingFileSystem.class */
    private static final class TestDuplicatingFileSystem extends TestFileSystem implements PathsCopyingFileSystem {
        private TestDuplicatingFileSystem() {
        }

        public boolean canCopyPaths(Path path, Path path2) throws IOException {
            return !path.equals(path2);
        }

        public void copyFiles(List<PathsCopyingFileSystem.CopyRequest> list, ICloseableRegistry iCloseableRegistry) throws IOException {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest$TestingPath.class */
    private static final class TestingPath extends Path {
        private static final long serialVersionUID = 2560119808844230488L;

        @Nonnull
        private final transient FileSystem fileSystem;

        TestingPath(String str, @Nonnull FileSystem fileSystem) {
            super(str);
            this.fileSystem = fileSystem;
        }

        public FileSystem getFileSystem() {
            return this.fileSystem;
        }
    }

    FsCheckpointStorageAccessTest() {
    }

    @Override // org.apache.flink.runtime.state.filesystem.AbstractFileCheckpointStorageAccessTestBase
    protected CheckpointStorageAccess createCheckpointStorage(Path path, boolean z) throws Exception {
        return new FsCheckpointStorageAccess(path, (Path) null, z, new JobID(), 1024, WRITE_BUFFER_SIZE);
    }

    @Override // org.apache.flink.runtime.state.filesystem.AbstractFileCheckpointStorageAccessTestBase
    protected CheckpointStorageAccess createCheckpointStorageWithSavepointDir(Path path, Path path2, boolean z) throws Exception {
        return new FsCheckpointStorageAccess(path, path2, z, new JobID(), 1024, WRITE_BUFFER_SIZE);
    }

    @Test
    void testSavepointsInOneDirectoryDefaultLocation() throws Exception {
        Path fromLocalFile = Path.fromLocalFile(TempDirUtils.newFolder(this.tmp));
        FsCheckpointStorageLocation initializeLocationForSavepoint = new FsCheckpointStorageAccess(Path.fromLocalFile(TempDirUtils.newFolder(this.tmp)), fromLocalFile, true, new JobID(), 1024, WRITE_BUFFER_SIZE).initializeLocationForSavepoint(52452L, (String) null);
        assertParent(fromLocalFile, initializeLocationForSavepoint.getCheckpointDirectory());
        assertParent(fromLocalFile, initializeLocationForSavepoint.getSharedStateDirectory());
        assertParent(fromLocalFile, initializeLocationForSavepoint.getTaskOwnedStateDirectory());
        initializeLocationForSavepoint.disposeOnFailure();
    }

    @Test
    void testSavepointsInOneDirectoryCustomLocation() throws Exception {
        Path fromLocalFile = Path.fromLocalFile(TempDirUtils.newFolder(this.tmp));
        FsCheckpointStorageLocation initializeLocationForSavepoint = new FsCheckpointStorageAccess(Path.fromLocalFile(TempDirUtils.newFolder(this.tmp)), (Path) null, true, new JobID(), 1024, WRITE_BUFFER_SIZE).initializeLocationForSavepoint(52452L, fromLocalFile.toString());
        assertParent(fromLocalFile, initializeLocationForSavepoint.getCheckpointDirectory());
        assertParent(fromLocalFile, initializeLocationForSavepoint.getSharedStateDirectory());
        assertParent(fromLocalFile, initializeLocationForSavepoint.getTaskOwnedStateDirectory());
        initializeLocationForSavepoint.disposeOnFailure();
    }

    @Test
    void testTaskOwnedStateStream() throws Exception {
        List asList = Arrays.asList("Flopsy", "Mopsy", "Cotton Tail", "Peter");
        CheckpointStateOutputStream createTaskOwnedStateStream = new FsCheckpointStorageAccess(Path.fromLocalFile(TempDirUtils.newFolder(this.tmp)), (Path) null, true, new JobID(), 10, WRITE_BUFFER_SIZE).createTaskOwnedStateStream();
        try {
            Assertions.assertThat(createTaskOwnedStateStream).isInstanceOf(FsCheckpointStreamFactory.FsCheckpointStateOutputStream.class);
            new ObjectOutputStream(createTaskOwnedStateStream).writeObject(asList);
            FileStateHandle closeAndGetHandle = createTaskOwnedStateStream.closeAndGetHandle();
            if (createTaskOwnedStateStream != null) {
                createTaskOwnedStateStream.close();
            }
            Assertions.assertThat(closeAndGetHandle.getFilePath().getParent().getName()).isEqualTo("taskowned");
            ObjectInputStream objectInputStream = new ObjectInputStream(closeAndGetHandle.openInputStream());
            try {
                Assertions.assertThat(objectInputStream.readObject()).isEqualTo(asList);
                objectInputStream.close();
            } catch (Throwable th) {
                try {
                    objectInputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createTaskOwnedStateStream != null) {
                try {
                    createTaskOwnedStateStream.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    void testDirectoriesForExclusiveAndSharedState() throws Exception {
        LocalFileSystem sharedInstance = LocalFileSystem.getSharedInstance();
        FsCheckpointStorageLocation fsCheckpointStorageLocation = new FsCheckpointStorageLocation(sharedInstance, randomTempPath(), randomTempPath(), randomTempPath(), CheckpointStorageLocationReference.getDefault(), 1024, WRITE_BUFFER_SIZE);
        Assertions.assertThat(fsCheckpointStorageLocation.getSharedStateDirectory()).isNotEqualTo(fsCheckpointStorageLocation.getCheckpointDirectory());
        Assertions.assertThat(sharedInstance.listStatus(fsCheckpointStorageLocation.getCheckpointDirectory())).isEmpty();
        Assertions.assertThat(sharedInstance.listStatus(fsCheckpointStorageLocation.getSharedStateDirectory())).isEmpty();
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream = fsCheckpointStorageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        createCheckpointStateOutputStream.write(42);
        createCheckpointStateOutputStream.flushToFile();
        StreamStateHandle closeAndGetHandle = createCheckpointStateOutputStream.closeAndGetHandle();
        Assertions.assertThat(sharedInstance.listStatus(fsCheckpointStorageLocation.getCheckpointDirectory())).hasSize(1);
        Assertions.assertThat(sharedInstance.listStatus(fsCheckpointStorageLocation.getSharedStateDirectory())).isEmpty();
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream2 = fsCheckpointStorageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
        createCheckpointStateOutputStream2.write(42);
        createCheckpointStateOutputStream2.flushToFile();
        StreamStateHandle closeAndGetHandle2 = createCheckpointStateOutputStream2.closeAndGetHandle();
        Assertions.assertThat(sharedInstance.listStatus(fsCheckpointStorageLocation.getCheckpointDirectory())).hasSize(1);
        Assertions.assertThat(sharedInstance.listStatus(fsCheckpointStorageLocation.getSharedStateDirectory())).hasSize(1);
        closeAndGetHandle.discardState();
        closeAndGetHandle2.discardState();
    }

    @Test
    void testStorageLocationMkdirs() throws Exception {
        FsCheckpointStorageAccess fsCheckpointStorageAccess = new FsCheckpointStorageAccess(randomTempPath(), (Path) null, true, new JobID(), 1024, WRITE_BUFFER_SIZE);
        File file = new File(fsCheckpointStorageAccess.getCheckpointsDirectory().getPath());
        Assertions.assertThat(file).doesNotExist();
        fsCheckpointStorageAccess.initializeBaseLocationsForCheckpoint();
        Assertions.assertThat(file).exists();
        Assertions.assertThat(new File(new FsCheckpointStorageAccess(randomTempPath(), (Path) null, true, new JobID(), 1024, WRITE_BUFFER_SIZE).resolveCheckpointStorageLocation(177L, CheckpointStorageLocationReference.getDefault()).getCheckpointDirectory().getPath())).doesNotExist();
    }

    @Test
    void testResolveCheckpointStorageLocation() throws Exception {
        FileSystem fileSystem = (FileSystem) Mockito.mock(FileSystem.class);
        FsCheckpointStorageAccess fsCheckpointStorageAccess = new FsCheckpointStorageAccess(new TestingPath("hdfs:///checkpoint/", fileSystem), (Path) null, true, new JobID(), 1024, WRITE_BUFFER_SIZE);
        Assertions.assertThat(fsCheckpointStorageAccess.resolveCheckpointStorageLocation(1L, CheckpointStorageLocationReference.getDefault()).getFileSystem()).isEqualTo(fileSystem);
        Assertions.assertThat(fsCheckpointStorageAccess.resolveCheckpointStorageLocation(2L, AbstractFsCheckpointStorageAccess.encodePathAsReference(new Path("file:///savepoint/"))).getFileSystem()).isInstanceOf(LocalFileSystem.class);
    }

    @Test
    void testNotDuplicationCheckpointStateToolset() throws Exception {
        Assertions.assertThat(createCheckpointStorage(randomTempPath(), true).createTaskOwnedCheckpointStateToolset()).isInstanceOf(NotDuplicatingCheckpointStateToolset.class);
    }

    @Test
    void testDuplicationCheckpointStateToolset() throws Exception {
        Assertions.assertThat(new FsCheckpointStorageAccess(new TestDuplicatingFileSystem(), randomTempPath(), (Path) null, true, new JobID(), 1024, WRITE_BUFFER_SIZE).createTaskOwnedCheckpointStateToolset()).isInstanceOf(FsCheckpointStateToolset.class);
    }

    private void assertParent(Path path, Path path2) {
        Assertions.assertThat(path2).isEqualTo(new Path(path, path2.getName()));
    }
}
