package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.MethodForwardingTestUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.class */
class CheckpointStreamWithResultProviderTest {

    @TempDir
    private Path temporaryFolder;

    CheckpointStreamWithResultProviderTest() {
    }

    @Test
    void testFactory() throws Exception {
        CheckpointStreamWithResultProvider createDuplicatingStream;
        Throwable th;
        CheckpointStreamFactory createCheckpointStreamFactory = createCheckpointStreamFactory();
        CheckpointStreamWithResultProvider createSimpleStream = CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE, createCheckpointStreamFactory);
        Throwable th2 = null;
        try {
            try {
                Assertions.assertThat(createSimpleStream).isInstanceOf(CheckpointStreamWithResultProvider.PrimaryStreamOnly.class);
                if (createSimpleStream != null) {
                    if (0 != 0) {
                        try {
                            createSimpleStream.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createSimpleStream.close();
                    }
                }
                createDuplicatingStream = CheckpointStreamWithResultProvider.createDuplicatingStream(42L, CheckpointedStateScope.EXCLUSIVE, createCheckpointStreamFactory, createLocalRecoveryDirectoryProvider());
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    Assertions.assertThat(createDuplicatingStream).isInstanceOf(CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream.class);
                    if (createDuplicatingStream != null) {
                        if (0 == 0) {
                            createDuplicatingStream.close();
                            return;
                        }
                        try {
                            createDuplicatingStream.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } catch (Throwable th7) {
                if (createDuplicatingStream != null) {
                    if (th != null) {
                        try {
                            createDuplicatingStream.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    } else {
                        createDuplicatingStream.close();
                    }
                }
                throw th7;
            }
        } catch (Throwable th9) {
            if (createSimpleStream != null) {
                if (th2 != null) {
                    try {
                        createSimpleStream.close();
                    } catch (Throwable th10) {
                        th2.addSuppressed(th10);
                    }
                } else {
                    createSimpleStream.close();
                }
            }
            throw th9;
        }
    }

    @Test
    void testCloseAndFinalizeCheckpointStreamResultPrimaryOnly() throws Exception {
        SnapshotResult<StreamStateHandle> writeCheckpointTestData = writeCheckpointTestData(CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE, createCheckpointStreamFactory()));
        Assertions.assertThat(writeCheckpointTestData.getJobManagerOwnedSnapshot()).isNotNull();
        Assertions.assertThat(writeCheckpointTestData.getTaskLocalSnapshot()).isNull();
        FSDataInputStream openInputStream = writeCheckpointTestData.getJobManagerOwnedSnapshot().openInputStream();
        Throwable th = null;
        try {
            try {
                Assertions.assertThat(openInputStream.read()).isEqualTo(66);
                Assertions.assertThat(openInputStream.read()).isEqualTo(-1);
                if (openInputStream != null) {
                    if (0 == 0) {
                        openInputStream.close();
                        return;
                    }
                    try {
                        openInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (openInputStream != null) {
                if (th != null) {
                    try {
                        openInputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    openInputStream.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testCloseAndFinalizeCheckpointStreamResultPrimaryAndSecondary() throws Exception {
        SnapshotResult<StreamStateHandle> writeCheckpointTestData = writeCheckpointTestData(CheckpointStreamWithResultProvider.createDuplicatingStream(42L, CheckpointedStateScope.EXCLUSIVE, createCheckpointStreamFactory(), createLocalRecoveryDirectoryProvider()));
        Assertions.assertThat(writeCheckpointTestData.getJobManagerOwnedSnapshot()).isNotNull();
        Assertions.assertThat(writeCheckpointTestData.getTaskLocalSnapshot()).isNotNull();
        FSDataInputStream openInputStream = writeCheckpointTestData.getJobManagerOwnedSnapshot().openInputStream();
        Throwable th = null;
        try {
            Assertions.assertThat(openInputStream.read()).isEqualTo(66);
            Assertions.assertThat(openInputStream.read()).isEqualTo(-1);
            if (openInputStream != null) {
                if (0 != 0) {
                    try {
                        openInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    openInputStream.close();
                }
            }
            FSDataInputStream openInputStream2 = writeCheckpointTestData.getTaskLocalSnapshot().openInputStream();
            Throwable th3 = null;
            try {
                try {
                    Assertions.assertThat(openInputStream2.read()).isEqualTo(66);
                    Assertions.assertThat(openInputStream2.read()).isEqualTo(-1);
                    if (openInputStream2 != null) {
                        if (0 == 0) {
                            openInputStream2.close();
                            return;
                        }
                        try {
                            openInputStream2.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th3 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (openInputStream2 != null) {
                    if (th3 != null) {
                        try {
                            openInputStream2.close();
                        } catch (Throwable th7) {
                            th3.addSuppressed(th7);
                        }
                    } else {
                        openInputStream2.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (openInputStream != null) {
                if (0 != 0) {
                    try {
                        openInputStream.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    openInputStream.close();
                }
            }
            throw th8;
        }
    }

    @Test
    void testCompletedAndCloseStateHandling() throws Exception {
        CheckpointStreamFactory createCheckpointStreamFactory = createCheckpointStreamFactory();
        testCloseBeforeComplete(new CheckpointStreamWithResultProvider.PrimaryStreamOnly(createCheckpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)));
        testCompleteBeforeClose(new CheckpointStreamWithResultProvider.PrimaryStreamOnly(createCheckpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)));
        testCloseBeforeComplete(new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(createCheckpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE), createCheckpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)));
        testCompleteBeforeClose(new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(createCheckpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE), createCheckpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)));
    }

    @Test
    void testCloseMethodForwarding() throws Exception {
        CheckpointStreamFactory createCheckpointStreamFactory = createCheckpointStreamFactory();
        MethodForwardingTestUtil.testMethodForwarding(Closeable.class, CheckpointStreamWithResultProvider.PrimaryStreamOnly::new, () -> {
            try {
                return createCheckpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        MethodForwardingTestUtil.testMethodForwarding(Closeable.class, CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream::new, () -> {
            try {
                return new DuplicatingCheckpointOutputStream(createCheckpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE), createCheckpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private SnapshotResult<StreamStateHandle> writeCheckpointTestData(CheckpointStreamWithResultProvider checkpointStreamWithResultProvider) throws IOException {
        checkpointStreamWithResultProvider.getCheckpointOutputStream().write(66);
        return checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
    }

    private CheckpointStreamFactory createCheckpointStreamFactory() {
        return new MemCheckpointStreamFactory(16384);
    }

    private void testCloseBeforeComplete(CheckpointStreamWithResultProvider checkpointStreamWithResultProvider) throws IOException {
        checkpointStreamWithResultProvider.getCheckpointOutputStream().write(66);
        checkpointStreamWithResultProvider.close();
        checkpointStreamWithResultProvider.getClass();
        Assertions.assertThatThrownBy(checkpointStreamWithResultProvider::closeAndFinalizeCheckpointStreamResult).isInstanceOf(IOException.class);
    }

    private void testCompleteBeforeClose(CheckpointStreamWithResultProvider checkpointStreamWithResultProvider) throws IOException {
        checkpointStreamWithResultProvider.getCheckpointOutputStream().write(66);
        Assertions.assertThat(checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult()).isNotNull();
        checkpointStreamWithResultProvider.close();
    }

    private LocalRecoveryDirectoryProvider createLocalRecoveryDirectoryProvider() throws IOException {
        return new LocalRecoveryDirectoryProviderImpl(TempDirUtils.newFolder(this.temporaryFolder), new JobID(), new JobVertexID(), 0);
    }
}
