package org.apache.flink.runtime.state.filesystem;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Objects;
import org.apache.flink.core.fs.EntropyInjectingFileSystem;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.apache.flink.runtime.state.filesystem.FsStorageEntropyTest;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.class */
class FsCheckpointStreamFactoryTest {

    @TempDir
    private Path exclusiveStateDir;

    @TempDir
    private Path sharedStateDir;

    /* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest$DisabledEntropyFS.class */
    private static final class DisabledEntropyFS extends LocalFileSystem implements EntropyInjectingFileSystem {
        private DisabledEntropyFS() {
        }

        public String getEntropyInjectionKey() {
            return null;
        }

        public String generateEntropy() {
            return null;
        }
    }

    FsCheckpointStreamFactoryTest() {
    }

    @Test
    void testWriteFlushesIfAboveThreshold() throws IOException {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream = createFactory(FileSystem.getLocalFileSystem(), 100, 100).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        createCheckpointStateOutputStream.write(new byte[100]);
        File[] listFiles = new File(this.exclusiveStateDir.toUri()).listFiles();
        Assertions.assertThat(listFiles).hasSize(1);
        File file = listFiles[0];
        Assertions.assertThat(file).hasSize(100);
        createCheckpointStateOutputStream.write(new byte[100 - 1]);
        createCheckpointStateOutputStream.write(127);
        Assertions.assertThat(file).hasSize(100);
    }

    @Test
    void testExclusiveStateHasRelativePathHandles() throws IOException {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream = createFactory(FileSystem.getLocalFileSystem(), 0).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        createCheckpointStateOutputStream.write(1657);
        RelativeFileStateHandle closeAndGetHandle = createCheckpointStateOutputStream.closeAndGetHandle();
        Assertions.assertThat(closeAndGetHandle).isInstanceOf(RelativeFileStateHandle.class);
        assertPathsEqual(this.exclusiveStateDir, closeAndGetHandle.getFilePath().getParent());
    }

    @Test
    void testSharedStateHasAbsolutePathHandles() throws IOException {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream = createFactory(FileSystem.getLocalFileSystem(), 0).createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
        createCheckpointStateOutputStream.write(0);
        FileStateHandle closeAndGetHandle = createCheckpointStateOutputStream.closeAndGetHandle();
        Assertions.assertThat(closeAndGetHandle).isInstanceOf(FileStateHandle.class);
        Assertions.assertThat(closeAndGetHandle).isNotInstanceOf(RelativeFileStateHandle.class);
        assertPathsEqual(this.sharedStateDir, closeAndGetHandle.getFilePath().getParent());
    }

    @Test
    void testEntropyMakesExclusiveStateAbsolutePaths() throws IOException {
        FsStorageEntropyTest.TestEntropyAwareFs testEntropyAwareFs = new FsStorageEntropyTest.TestEntropyAwareFs();
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream = createFactory(testEntropyAwareFs, 0).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        createCheckpointStateOutputStream.write(0);
        FileStateHandle closeAndGetHandle = createCheckpointStateOutputStream.closeAndGetHandle();
        Assertions.assertThat(closeAndGetHandle).isInstanceOf(FileStateHandle.class);
        Assertions.assertThat(closeAndGetHandle).isNotInstanceOf(RelativeFileStateHandle.class);
        assertPathsEqual(this.exclusiveStateDir.resolve(testEntropyAwareFs.generateEntropy()), closeAndGetHandle.getFilePath().getParent());
    }

    @Test
    void testFSWithDisabledEntropyHasRelativePaths() throws IOException {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream = createFactory((FileSystem) new DisabledEntropyFS(), 0).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        createCheckpointStateOutputStream.write(0);
        RelativeFileStateHandle closeAndGetHandle = createCheckpointStateOutputStream.closeAndGetHandle();
        Assertions.assertThat(closeAndGetHandle).isInstanceOf(RelativeFileStateHandle.class);
        assertPathsEqual(this.exclusiveStateDir, closeAndGetHandle.getFilePath().getParent());
    }

    @Test
    void testFlushUnderThreshold() throws IOException {
        flushAndVerify(10, 10, true);
    }

    @Test
    void testFlushAboveThreshold() throws IOException {
        flushAndVerify(10, 11, false);
    }

    private void flushAndVerify(int i, int i2, boolean z) throws IOException {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream = createFactory(new FsStorageEntropyTest.TestEntropyAwareFs(), i).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        createCheckpointStateOutputStream.write(new byte[i2], 0, i2);
        createCheckpointStateOutputStream.flush();
        Assertions.assertThat(new File(this.exclusiveStateDir.toUri()).listFiles()).hasSize(z ? 0 : 1);
    }

    private static void assertPathsEqual(Path path, org.apache.flink.core.fs.Path path2) {
        Assertions.assertThat(path2).isEqualTo(new org.apache.flink.core.fs.Path(new org.apache.flink.core.fs.Path(path.toUri()).toString()));
    }

    private FsCheckpointStreamFactory createFactory(FileSystem fileSystem, int i) {
        return createFactory(fileSystem, i, 4096);
    }

    private FsCheckpointStreamFactory createFactory(FsStorageEntropyTest.TestEntropyAwareFs testEntropyAwareFs, int i) {
        return new FsCheckpointStreamFactory(testEntropyAwareFs, new org.apache.flink.core.fs.Path(this.exclusiveStateDir.resolve((String) Objects.requireNonNull(testEntropyAwareFs.getEntropyInjectionKey())).toUri()), new org.apache.flink.core.fs.Path(this.sharedStateDir.toUri()), i, 4096);
    }

    private FsCheckpointStreamFactory createFactory(FileSystem fileSystem, int i, int i2) {
        return new FsCheckpointStreamFactory(fileSystem, new org.apache.flink.core.fs.Path(this.exclusiveStateDir.toUri()), new org.apache.flink.core.fs.Path(this.sharedStateDir.toUri()), i, i2);
    }
}
