package org.apache.flink.runtime.state.filesystem;

import java.io.DataInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.function.Supplier;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.class */
public class FsCheckpointStateOutputStreamTest {

    @Parameterized.Parameter
    public boolean relativePaths;

    @Rule
    public final TemporaryFolder tempDir = new TemporaryFolder();

    @Parameterized.Parameters(name = "relativePaths = {0}")
    public static List<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    @Test(expected = IllegalArgumentException.class)
    public void testWrongParameters() throws Exception {
        new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile(this.tempDir.newFolder()), FileSystem.getLocalFileSystem(), 4000, 5000, this.relativePaths);
    }

    @Test
    public void testEmptyState() throws Exception {
        Assert.assertNull(new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile(this.tempDir.newFolder()), FileSystem.getLocalFileSystem(), HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 512, this.relativePaths).closeAndGetHandle());
    }

    @Test
    public void testStateBelowMemThreshold() throws Exception {
        runTest(999, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, false);
    }

    @Test
    public void testStateOneBufferAboveThreshold() throws Exception {
        runTest(896, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 15, true);
    }

    @Test
    public void testStateAboveMemThreshold() throws Exception {
        runTest(576446, 259, 17, true);
    }

    @Test
    public void testZeroThreshold() throws Exception {
        runTest(16678, 4096, 0, true);
    }

    @Test
    public void testGetPos() throws Exception {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream fsCheckpointStateOutputStream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile(this.tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, this.relativePaths);
        for (int i = 0; i < 64; i++) {
            Assert.assertEquals(i, fsCheckpointStateOutputStream.getPos());
            fsCheckpointStateOutputStream.write(66);
        }
        fsCheckpointStateOutputStream.closeAndGetHandle();
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream fsCheckpointStateOutputStream2 = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile(this.tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, this.relativePaths);
        byte[] bytes = "testme!".getBytes(ConfigConstants.DEFAULT_CHARSET);
        for (int i2 = 0; i2 < 7; i2++) {
            Assert.assertEquals(i2 * (1 + bytes.length), fsCheckpointStateOutputStream2.getPos());
            fsCheckpointStateOutputStream2.write(66);
            fsCheckpointStateOutputStream2.write(bytes);
        }
        fsCheckpointStateOutputStream2.closeAndGetHandle();
    }

    @Test
    public void testCleanupWhenClosingStream() throws IOException {
        FileSystem fileSystem = (FileSystem) Mockito.mock(FileSystem.class);
        FSDataOutputStream fSDataOutputStream = (FSDataOutputStream) Mockito.mock(FSDataOutputStream.class);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Path.class);
        Mockito.when(fileSystem.create((Path) forClass.capture(), (FileSystem.WriteMode) Matchers.any(FileSystem.WriteMode.class))).thenReturn(fSDataOutputStream);
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream fsCheckpointStateOutputStream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile(this.tempDir.newFolder()), fileSystem, 4, 0, this.relativePaths);
        fsCheckpointStateOutputStream.write(new byte[]{1, 2, 3, 4, 5});
        ((FileSystem) Mockito.verify(fileSystem)).create((Path) Matchers.any(Path.class), (FileSystem.WriteMode) Matchers.any(FileSystem.WriteMode.class));
        fsCheckpointStateOutputStream.close();
        ((FileSystem) Mockito.verify(fileSystem)).delete((Path) Matchers.eq(forClass.getValue()), Matchers.anyBoolean());
    }

    @Test
    public void testCleanupWhenFailingCloseAndGetHandle() throws IOException {
        FileSystem fileSystem = (FileSystem) Mockito.mock(FileSystem.class);
        FSDataOutputStream fSDataOutputStream = (FSDataOutputStream) Mockito.mock(FSDataOutputStream.class);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Path.class);
        Mockito.when(fileSystem.create((Path) forClass.capture(), (FileSystem.WriteMode) Matchers.any(FileSystem.WriteMode.class))).thenReturn(fSDataOutputStream);
        ((FSDataOutputStream) Mockito.doThrow(new Throwable[]{new IOException("Test IOException.")}).when(fSDataOutputStream)).close();
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream fsCheckpointStateOutputStream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile(this.tempDir.newFolder()), fileSystem, 4, 0, this.relativePaths);
        fsCheckpointStateOutputStream.write(new byte[]{1, 2, 3, 4, 5});
        ((FileSystem) Mockito.verify(fileSystem)).create((Path) Matchers.any(Path.class), (FileSystem.WriteMode) Matchers.any(FileSystem.WriteMode.class));
        try {
            fsCheckpointStateOutputStream.closeAndGetHandle();
            Assert.fail("Expected IOException");
        } catch (IOException e) {
        }
        ((FileSystem) Mockito.verify(fileSystem)).delete((Path) Matchers.eq(forClass.getValue()), Matchers.anyBoolean());
    }

    private void runTest(int i, int i2, int i3, boolean z) throws Exception {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream fsCheckpointStateOutputStream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile(this.tempDir.newFolder()), FileSystem.getLocalFileSystem(), i2, i3, this.relativePaths);
        Random random = new Random();
        byte[] bArr = new byte[i];
        byte[] bArr2 = new byte[bArr.length];
        random.nextBytes(bArr);
        System.arraycopy(bArr, 0, bArr2, 0, bArr.length);
        int i4 = 0;
        while (i4 < bArr2.length) {
            if (random.nextBoolean()) {
                int i5 = i4;
                i4++;
                fsCheckpointStateOutputStream.write(bArr2[i5]);
            } else {
                int length = random.nextBoolean() ? bArr2.length - i4 : random.nextInt(bArr2.length - i4);
                fsCheckpointStateOutputStream.write(bArr2, i4, length);
                i4 += length;
            }
        }
        StreamStateHandle closeAndGetHandle = fsCheckpointStateOutputStream.closeAndGetHandle();
        if (z) {
            Assert.assertTrue(closeAndGetHandle instanceof FileStateHandle);
        } else {
            Assert.assertTrue(closeAndGetHandle instanceof ByteStreamStateHandle);
        }
        Assert.assertArrayEquals(bArr, bArr2);
        FSDataInputStream openInputStream = closeAndGetHandle.openInputStream();
        Throwable th = null;
        try {
            try {
                byte[] bArr3 = new byte[bArr2.length];
                new DataInputStream(openInputStream).readFully(bArr3);
                Assert.assertArrayEquals(bArr2, bArr3);
                if (openInputStream != null) {
                    if (0 != 0) {
                        try {
                            openInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        openInputStream.close();
                    }
                }
                closeAndGetHandle.discardState();
            } finally {
            }
        } catch (Throwable th3) {
            if (openInputStream != null) {
                if (th != null) {
                    try {
                        openInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    openInputStream.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testWriteFailsFastWhenClosed() throws Exception {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream fsCheckpointStateOutputStream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile(this.tempDir.newFolder()), FileSystem.getLocalFileSystem(), HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 512, this.relativePaths);
        Assert.assertFalse(fsCheckpointStateOutputStream.isClosed());
        fsCheckpointStateOutputStream.close();
        Assert.assertTrue(fsCheckpointStateOutputStream.isClosed());
        try {
            fsCheckpointStateOutputStream.write(1);
            Assert.fail();
        } catch (IOException e) {
        }
        try {
            fsCheckpointStateOutputStream.write(new byte[4], 1, 2);
            Assert.fail();
        } catch (IOException e2) {
        }
    }

    @Test
    public void testMixedBelowAndAboveThreshold() throws Exception {
        byte[] bArr = new byte[1274673];
        byte[] bArr2 = new byte[1];
        byte[] bArr3 = new byte[0];
        byte[] bArr4 = new byte[177];
        Random random = new Random();
        random.nextBytes(bArr);
        random.nextBytes(bArr2);
        random.nextBytes(bArr3);
        random.nextBytes(bArr4);
        File newFolder = this.tempDir.newFolder();
        Path fromLocalFile = Path.fromLocalFile(newFolder);
        Supplier supplier = () -> {
            return new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(fromLocalFile, FileSystem.getLocalFileSystem(), HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 15, this.relativePaths);
        };
        CheckpointStateOutputStream checkpointStateOutputStream = (CheckpointStateOutputStream) supplier.get();
        CheckpointStateOutputStream checkpointStateOutputStream2 = (CheckpointStateOutputStream) supplier.get();
        CheckpointStateOutputStream checkpointStateOutputStream3 = (CheckpointStateOutputStream) supplier.get();
        checkpointStateOutputStream.write(bArr);
        checkpointStateOutputStream2.write(bArr2);
        checkpointStateOutputStream3.write(bArr3);
        FileStateHandle closeAndGetHandle = checkpointStateOutputStream.closeAndGetHandle();
        ByteStreamStateHandle closeAndGetHandle2 = checkpointStateOutputStream2.closeAndGetHandle();
        ByteStreamStateHandle closeAndGetHandle3 = checkpointStateOutputStream3.closeAndGetHandle();
        CheckpointStateOutputStream checkpointStateOutputStream4 = (CheckpointStateOutputStream) supplier.get();
        Throwable th = null;
        try {
            checkpointStateOutputStream4.write(bArr4);
            StreamStateHandle closeAndGetHandle4 = checkpointStateOutputStream4.closeAndGetHandle();
            if (checkpointStateOutputStream4 != null) {
                if (0 != 0) {
                    try {
                        checkpointStateOutputStream4.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    checkpointStateOutputStream4.close();
                }
            }
            CheckpointStateOutputStream checkpointStateOutputStream5 = (CheckpointStateOutputStream) supplier.get();
            checkpointStateOutputStream5.write(bArr4);
            checkpointStateOutputStream5.close();
            try {
                checkpointStateOutputStream5.closeAndGetHandle();
                Assert.fail();
            } catch (IOException e) {
            }
            validateBytesInStream(closeAndGetHandle.openInputStream(), bArr);
            closeAndGetHandle.discardState();
            Assert.assertFalse(isDirectoryEmpty(newFolder));
            ensureLocalFileDeleted(closeAndGetHandle.getFilePath());
            validateBytesInStream(closeAndGetHandle2.openInputStream(), bArr2);
            closeAndGetHandle2.discardState();
            Assert.assertFalse(isDirectoryEmpty(newFolder));
            Assert.assertNull(closeAndGetHandle3);
            Assert.assertFalse(isDirectoryEmpty(newFolder));
            validateBytesInStream(closeAndGetHandle4.openInputStream(), bArr4);
            closeAndGetHandle4.discardState();
            Assert.assertTrue(isDirectoryEmpty(newFolder));
        } catch (Throwable th3) {
            if (checkpointStateOutputStream4 != null) {
                if (0 != 0) {
                    try {
                        checkpointStateOutputStream4.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    checkpointStateOutputStream4.close();
                }
            }
            throw th3;
        }
    }

    @Test
    @Category({FailsInGHAContainerWithRootUser.class})
    public void testStreamDoesNotTryToCleanUpParentOnError() throws Exception {
        File newFolder = this.tempDir.newFolder();
        Assume.assumeTrue(newFolder.setWritable(false, true));
        checkDirectoryNotWritable(newFolder);
        FileSystem fileSystem = (FileSystem) Mockito.spy(FileSystem.getLocalFileSystem());
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream fsCheckpointStateOutputStream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile(newFolder), fileSystem, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 1, this.relativePaths);
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream fsCheckpointStateOutputStream2 = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(Path.fromLocalFile(newFolder), fileSystem, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 1, this.relativePaths);
        fsCheckpointStateOutputStream.write(new byte[61]);
        fsCheckpointStateOutputStream2.write(new byte[61]);
        try {
            fsCheckpointStateOutputStream.closeAndGetHandle();
            Assert.fail("this should fail with an exception");
        } catch (IOException e) {
        }
        fsCheckpointStateOutputStream2.close();
        ((FileSystem) Mockito.verify(fileSystem, Mockito.times(0))).delete((Path) Matchers.any(Path.class), Matchers.anyBoolean());
        Assert.assertTrue(newFolder.exists());
        Assert.assertTrue(newFolder.isDirectory());
    }

    private static void ensureLocalFileDeleted(Path path) {
        URI uri = path.toUri();
        if (!"file".equals(uri.getScheme())) {
            throw new IllegalArgumentException("not a local path");
        }
        Assert.assertFalse("file not properly deleted", new File(uri.getPath()).exists());
    }

    private static boolean isDirectoryEmpty(File file) {
        String[] list;
        return !file.exists() || (list = file.list()) == null || list.length == 0;
    }

    private static void validateBytesInStream(InputStream inputStream, byte[] bArr) throws IOException {
        int read;
        try {
            byte[] bArr2 = new byte[bArr.length];
            int i = 0;
            while (i < bArr2.length && (read = inputStream.read(bArr2, i, bArr2.length - i)) != -1) {
                i += read;
            }
            Assert.assertEquals("not enough data", bArr2.length, i);
            Assert.assertEquals("too much data", -1L, inputStream.read());
            Assert.assertArrayEquals("wrong data", bArr, bArr2);
            inputStream.close();
        } catch (Throwable th) {
            inputStream.close();
            throw th;
        }
    }

    private static void checkDirectoryNotWritable(File file) {
        try {
            FileOutputStream fileOutputStream = new FileOutputStream(new File(file, "temp"));
            Throwable th = null;
            try {
                fileOutputStream.write(42);
                fileOutputStream.flush();
                if (fileOutputStream != null) {
                    if (0 != 0) {
                        try {
                            fileOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileOutputStream.close();
                    }
                }
                Assert.fail("this should fail when writing is properly prevented");
            } finally {
            }
        } catch (IOException e) {
        }
    }
}
