package org.apache.flink.runtime.state.filesystem;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.class */
public class FsMergingCheckpointStorageLocationTest {
    public static Path checkpointBaseDir;
    public static Path sharedStateDir;
    public static Path taskOwnedStateDir;
    private static final String SNAPSHOT_MGR_ID = "snapshotMgrId";
    private static final int FILE_STATE_SIZE_THRESHOLD = 1024;
    private static final int WRITE_BUFFER_SIZE = 1024;
    private static final JobID jobId = JobID.generate();
    private static final OperatorID opId = new OperatorID();
    private static final FileMergingSnapshotManager.SubtaskKey SUBTASK_KEY = new FileMergingSnapshotManager.SubtaskKey(jobId.toHexString(), opId.toHexString(), 1, 1);

    @Rule
    public final TemporaryFolder tmpFolder = new TemporaryFolder();
    private final Random random = new Random();

    @Before
    public void prepareDirectories() {
        checkpointBaseDir = new Path(this.tmpFolder.toString());
        sharedStateDir = new Path(checkpointBaseDir, "shared");
        taskOwnedStateDir = new Path(checkpointBaseDir, "taskowned");
    }

    @Test
    public void testWriteMultipleStateFilesWithinCheckpoint() throws Exception {
        testWriteMultipleStateFiles();
    }

    private void testWriteMultipleStateFiles() throws Exception {
        FileMergingSnapshotManager createFileMergingSnapshotManager = createFileMergingSnapshotManager();
        FsMergingCheckpointStorageLocation createFsMergingCheckpointStorageLocation = createFsMergingCheckpointStorageLocation(1L, createFileMergingSnapshotManager);
        FileSystem fileSystem = createFsMergingCheckpointStorageLocation.getFileSystem();
        Assertions.assertThat(fileSystem.exists(createFileMergingSnapshotManager.getManagedDir(SUBTASK_KEY, CheckpointedStateScope.EXCLUSIVE))).isTrue();
        Assertions.assertThat(fileSystem.exists(createFileMergingSnapshotManager.getManagedDir(SUBTASK_KEY, CheckpointedStateScope.SHARED))).isTrue();
        List<byte[]> generateRandomByteStates = generateRandomByteStates(3, 2, 16);
        ArrayList arrayList = new ArrayList(3);
        Iterator<byte[]> it = generateRandomByteStates.iterator();
        while (it.hasNext()) {
            arrayList.add(uploadOneStateFileAndGetStateHandle(1L, createFsMergingCheckpointStorageLocation, it.next()));
        }
        verifyStateHandlesAllPointToTheSameFile(arrayList);
    }

    @Test
    public void testCheckpointStreamClosedExceptionally() throws Exception {
        FileMergingSnapshotManager createFileMergingSnapshotManager = createFileMergingSnapshotManager();
        Throwable th = null;
        try {
            try {
                FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream = createFileMergingSnapshotManager.createCheckpointStateOutputStream(SUBTASK_KEY, 1L, CheckpointedStateScope.EXCLUSIVE);
                Throwable th2 = null;
                try {
                    try {
                        createCheckpointStateOutputStream.flushToFile();
                        assertPathNotNullAndCheckExistence(createCheckpointStateOutputStream.getFilePath(), true);
                        throw new IOException();
                    } catch (Throwable th3) {
                        th2 = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (createCheckpointStateOutputStream != null) {
                        if (th2 != null) {
                            try {
                                createCheckpointStateOutputStream.close();
                            } catch (Throwable th5) {
                                th2.addSuppressed(th5);
                            }
                        } else {
                            createCheckpointStateOutputStream.close();
                        }
                    }
                    throw th4;
                }
            } catch (IOException e) {
                assertPathNotNullAndCheckExistence(null, false);
                if (createFileMergingSnapshotManager != null) {
                    if (0 == 0) {
                        createFileMergingSnapshotManager.close();
                        return;
                    }
                    try {
                        createFileMergingSnapshotManager.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                }
            }
        } catch (Throwable th7) {
            if (createFileMergingSnapshotManager != null) {
                if (0 != 0) {
                    try {
                        createFileMergingSnapshotManager.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    createFileMergingSnapshotManager.close();
                }
            }
            throw th7;
        }
    }

    private void assertPathNotNullAndCheckExistence(Path path, boolean z) throws IOException {
        Assertions.assertThat(path).isNotNull();
        Assertions.assertThat(path.getFileSystem().exists(path)).isEqualTo(z);
    }

    @Test
    public void testWritingToClosedStream() {
        try {
            FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream = createFsMergingCheckpointStorageLocation(1L, createFileMergingSnapshotManager()).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
            Throwable th = null;
            try {
                try {
                    createCheckpointStateOutputStream.flushToFile();
                    createCheckpointStateOutputStream.closeAndGetHandle();
                    createCheckpointStateOutputStream.flushToFile();
                    Assertions.fail("Expected IOException");
                    if (createCheckpointStateOutputStream != null) {
                        if (0 != 0) {
                            try {
                                createCheckpointStateOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createCheckpointStateOutputStream.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (IOException e) {
            Assertions.assertThat(e.getMessage()).isEqualTo("Cannot call flushToFile() to a closed stream.");
        }
    }

    @Test
    public void testWriteAndReadPositionInformation() throws Exception {
        FileMergingSnapshotManager createFileMergingSnapshotManager = createFileMergingSnapshotManager(128L);
        uploadCheckpointStates(1L, generateRandomByteStates(1, 10, 10), createFsMergingCheckpointStorageLocation(1L, createFileMergingSnapshotManager));
        for (int i = 2; i < 10; i++) {
            testWriteAndReadPositionInformationInCheckpoint(i, 128L, createFileMergingSnapshotManager);
        }
    }

    private void testWriteAndReadPositionInformationInCheckpoint(long j, long j2, FileMergingSnapshotManager fileMergingSnapshotManager) throws IOException {
        FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream = createFsMergingCheckpointStorageLocation(j, fileMergingSnapshotManager).createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
        Throwable th = null;
        try {
            try {
                byte[] bArr = new byte[10];
                byte[] generateRandomBytes = generateRandomBytes(64);
                createCheckpointStateOutputStream.write(generateRandomBytes);
                for (int i = 0; i < 10; i++) {
                    int nextInt = this.random.nextInt(64);
                    byte[] longToBytes = longToBytes(nextInt);
                    bArr[i] = generateRandomBytes[nextInt];
                    createCheckpointStateOutputStream.write(longToBytes);
                }
                SegmentFileStateHandle closeAndGetHandle = createCheckpointStateOutputStream.closeAndGetHandle();
                Assertions.assertThat(closeAndGetHandle).isNotNull();
                byte[] bArr2 = new byte[10];
                byte[] bArr3 = new byte[1];
                FSDataInputStream openInputStream = closeAndGetHandle.openInputStream();
                Assertions.assertThat(openInputStream).isNotNull();
                openInputStream.seek(64);
                for (int i2 = 0; i2 < 10; i2++) {
                    byte[] bArr4 = new byte[8];
                    Assertions.assertThat(openInputStream.read(bArr4)).isEqualTo(8);
                    long pos = openInputStream.getPos();
                    openInputStream.seek(bytesToLong(bArr4));
                    Assertions.assertThat(openInputStream.read(bArr3) >= 0).isTrue();
                    bArr2[i2] = bArr3[0];
                    openInputStream.seek(pos);
                }
                Assertions.assertThat(bArr2).isEqualTo(bArr);
                if (createCheckpointStateOutputStream != null) {
                    if (0 == 0) {
                        createCheckpointStateOutputStream.close();
                        return;
                    }
                    try {
                        createCheckpointStateOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createCheckpointStateOutputStream != null) {
                if (th != null) {
                    try {
                        createCheckpointStateOutputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createCheckpointStateOutputStream.close();
                }
            }
            throw th4;
        }
    }

    private FileMergingSnapshotManager createFileMergingSnapshotManager() {
        return createFileMergingSnapshotManager(-1L);
    }

    private FileMergingSnapshotManager createFileMergingSnapshotManager(long j) {
        FileMergingSnapshotManager build = new FileMergingSnapshotManagerBuilder(jobId, new ResourceID(SNAPSHOT_MGR_ID), FileMergingType.MERGE_WITHIN_CHECKPOINT).build();
        build.initFileSystem(LocalFileSystem.getSharedInstance(), checkpointBaseDir, sharedStateDir, taskOwnedStateDir, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE);
        build.registerSubtaskForSharedStates(SUBTASK_KEY);
        return build;
    }

    public FsMergingCheckpointStorageLocation createFsMergingCheckpointStorageLocation(long j, @Nonnull FileMergingSnapshotManager fileMergingSnapshotManager) {
        CheckpointStorageLocationReference encodePathAsReference = AbstractFsCheckpointStorageAccess.encodePathAsReference(Path.fromLocalFile(LocalFileSystem.getSharedInstance().pathToFile(checkpointBaseDir)));
        Assertions.assertThat(fileMergingSnapshotManager).isNotNull();
        return new FsMergingCheckpointStorageLocation(SUBTASK_KEY, LocalFileSystem.getSharedInstance(), checkpointBaseDir, sharedStateDir, taskOwnedStateDir, encodePathAsReference, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, fileMergingSnapshotManager, j);
    }

    private SegmentFileStateHandle uploadOneStateFileAndGetStateHandle(long j, FsMergingCheckpointStorageLocation fsMergingCheckpointStorageLocation, byte[] bArr) throws IOException {
        FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream = fsMergingCheckpointStorageLocation.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
        Throwable th = null;
        try {
            try {
                createCheckpointStateOutputStream.write(bArr);
                SegmentFileStateHandle closeAndGetHandle = createCheckpointStateOutputStream.closeAndGetHandle();
                if (createCheckpointStateOutputStream != null) {
                    if (0 != 0) {
                        try {
                            createCheckpointStateOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createCheckpointStateOutputStream.close();
                    }
                }
                return closeAndGetHandle;
            } finally {
            }
        } catch (Throwable th3) {
            if (createCheckpointStateOutputStream != null) {
                if (th != null) {
                    try {
                        createCheckpointStateOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createCheckpointStateOutputStream.close();
                }
            }
            throw th3;
        }
    }

    private boolean bytesEqual(byte[] bArr, byte[] bArr2) {
        if (bArr == null || bArr2 == null || bArr.length != bArr2.length) {
            return false;
        }
        for (int i = 0; i < bArr.length; i++) {
            if (bArr[i] != bArr2[i]) {
                return false;
            }
        }
        return true;
    }

    private List<SegmentFileStateHandle> uploadCheckpointStates(long j, List<byte[]> list, FsMergingCheckpointStorageLocation fsMergingCheckpointStorageLocation) throws IOException {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<byte[]> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(uploadOneStateFileAndGetStateHandle(j, fsMergingCheckpointStorageLocation, it.next()));
        }
        return arrayList;
    }

    private byte[] generateRandomBytes(int i) {
        byte[] bArr = new byte[i];
        this.random.nextBytes(bArr);
        return bArr;
    }

    private List<byte[]> generateRandomByteStates(int i, int i2, int i3) {
        ArrayList arrayList = new ArrayList(i);
        for (int i4 = 0; i4 < i; i4++) {
            arrayList.add(generateRandomBytes(this.random.nextInt((i3 - i2) + 1) + i2));
        }
        return arrayList;
    }

    private void verifyStateHandlesAllPointToTheSameFile(List<SegmentFileStateHandle> list) {
        Path path = null;
        for (SegmentFileStateHandle segmentFileStateHandle : list) {
            Assertions.assertThat(path == null || path.equals(segmentFileStateHandle.getFilePath())).isTrue();
            path = segmentFileStateHandle.getFilePath();
        }
    }

    private byte[] longToBytes(long j) {
        ByteBuffer allocate = ByteBuffer.allocate(8);
        allocate.putLong(j);
        return allocate.array();
    }

    private long bytesToLong(byte[] bArr) {
        ByteBuffer allocate = ByteBuffer.allocate(8);
        allocate.put(bArr);
        allocate.flip();
        return allocate.getLong();
    }
}
