package org.apache.flink.runtime.state.memory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.AbstractFileCheckpointStorageAccessTestBase;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/memory/MemoryCheckpointStorageAccessTest.class */
public class MemoryCheckpointStorageAccessTest extends AbstractFileCheckpointStorageAccessTestBase {
    private static final int DEFAULT_MAX_STATE_SIZE = 5242880;

    @Override // org.apache.flink.runtime.state.filesystem.AbstractFileCheckpointStorageAccessTestBase
    protected CheckpointStorageAccess createCheckpointStorage(Path path, boolean z) throws Exception {
        return new MemoryBackendCheckpointStorageAccess(new JobID(), path, (Path) null, z, DEFAULT_MAX_STATE_SIZE);
    }

    @Override // org.apache.flink.runtime.state.filesystem.AbstractFileCheckpointStorageAccessTestBase
    protected CheckpointStorageAccess createCheckpointStorageWithSavepointDir(Path path, Path path2, boolean z) throws Exception {
        return new MemoryBackendCheckpointStorageAccess(new JobID(), path, path2, z, DEFAULT_MAX_STATE_SIZE);
    }

    @Test
    void testParametrizationDefault() throws Exception {
        MemoryBackendCheckpointStorageAccess createCheckpointStorage = new JobManagerCheckpointStorage().createCheckpointStorage(new JobID());
        Assertions.assertThat(createCheckpointStorage.supportsHighlyAvailableStorage()).isFalse();
        Assertions.assertThat(createCheckpointStorage.hasDefaultSavepointLocation()).isFalse();
        Assertions.assertThat(createCheckpointStorage.getDefaultSavepointDirectory()).isNull();
        Assertions.assertThat(createCheckpointStorage.getMaxStateSize()).isEqualTo(DEFAULT_MAX_STATE_SIZE);
    }

    @Test
    void testParametrizationDirectories() throws Exception {
        Assertions.assertThat(new JobManagerCheckpointStorage(TempDirUtils.newFolder(this.tmp).toURI().toString()).createCheckpointStorage(new JobID()).supportsHighlyAvailableStorage()).isTrue();
    }

    @Test
    void testParametrizationStateSize() throws Exception {
        Assertions.assertThat(new JobManagerCheckpointStorage(17).createCheckpointStorage(new JobID()).getMaxStateSize()).isEqualTo(17);
    }

    @Test
    void testNonPersistentCheckpointLocation() throws Exception {
        MemoryBackendCheckpointStorageAccess memoryBackendCheckpointStorageAccess = new MemoryBackendCheckpointStorageAccess(new JobID(), (Path) null, (Path) null, true, DEFAULT_MAX_STATE_SIZE);
        CheckpointMetadataOutputStream createMetadataOutputStream = memoryBackendCheckpointStorageAccess.initializeLocationForCheckpoint(9L).createMetadataOutputStream();
        createMetadataOutputStream.write(99);
        CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint = createMetadataOutputStream.closeAndFinalizeCheckpoint();
        Assertions.assertThat(closeAndFinalizeCheckpoint.getMetadataHandle()).isInstanceOf(ByteStreamStateHandle.class);
        Assertions.assertThatThrownBy(() -> {
            memoryBackendCheckpointStorageAccess.resolveCheckpoint(closeAndFinalizeCheckpoint.getExternalPointer());
        }).isInstanceOf(FileNotFoundException.class);
    }

    @Test
    void testLocationReference() throws Exception {
        Assertions.assertThat(new MemoryBackendCheckpointStorageAccess(new JobID(), (Path) null, (Path) null, true, DEFAULT_MAX_STATE_SIZE).initializeLocationForCheckpoint(42L).getLocationReference().isDefaultReference()).isTrue();
        Assertions.assertThat(new MemoryBackendCheckpointStorageAccess(new JobID(), randomTempPath(), (Path) null, true, DEFAULT_MAX_STATE_SIZE).initializeLocationForCheckpoint(42L).getLocationReference().isDefaultReference()).isTrue();
        Assertions.assertThat(new MemoryBackendCheckpointStorageAccess(new JobID(), (Path) null, (Path) null, true, DEFAULT_MAX_STATE_SIZE).initializeLocationForSavepoint(1337L, randomTempPath().toString()).getLocationReference().isDefaultReference()).isTrue();
    }

    @Test
    void testTaskOwnedStateStream() throws Exception {
        List asList = Arrays.asList("Flopsy", "Mopsy", "Cotton Tail", "Peter");
        CheckpointStateOutputStream createTaskOwnedStateStream = new MemoryBackendCheckpointStorageAccess(new JobID(), (Path) null, (Path) null, true, DEFAULT_MAX_STATE_SIZE).createTaskOwnedStateStream();
        try {
            Assertions.assertThat(createTaskOwnedStateStream).isInstanceOf(MemCheckpointStreamFactory.MemoryCheckpointOutputStream.class);
            new ObjectOutputStream(createTaskOwnedStateStream).writeObject(asList);
            StreamStateHandle closeAndGetHandle = createTaskOwnedStateStream.closeAndGetHandle();
            if (createTaskOwnedStateStream != null) {
                createTaskOwnedStateStream.close();
            }
            ObjectInputStream objectInputStream = new ObjectInputStream(closeAndGetHandle.openInputStream());
            try {
                Assertions.assertThat(objectInputStream.readObject()).isEqualTo(asList);
                objectInputStream.close();
            } catch (Throwable th) {
                try {
                    objectInputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createTaskOwnedStateStream != null) {
                try {
                    createTaskOwnedStateStream.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    void testStorageLocationMkdirs() throws Exception {
        MemoryBackendCheckpointStorageAccess memoryBackendCheckpointStorageAccess = new MemoryBackendCheckpointStorageAccess(new JobID(), new Path(randomTempPath(), "chk"), (Path) null, true, DEFAULT_MAX_STATE_SIZE);
        File file = new File(memoryBackendCheckpointStorageAccess.getCheckpointsDirectory().getPath());
        Assertions.assertThat(file).doesNotExist();
        memoryBackendCheckpointStorageAccess.initializeLocationForCheckpoint(177L);
        Assertions.assertThat(file).exists();
    }
}
