/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.util.MethodForwardingTestUtil;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class CheckpointStreamWithResultProviderTest
extends TestLogger {
    private static TemporaryFolder temporaryFolder;

    @BeforeClass
    public static void beforeClass() throws IOException {
        temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
    }

    @AfterClass
    public static void afterClass() {
        temporaryFolder.delete();
    }

    @Test
    public void testFactory() throws Exception {
        CheckpointStreamFactory primaryFactory = this.createCheckpointStreamFactory();
        try (CheckpointStreamWithResultProvider primaryOnly = CheckpointStreamWithResultProvider.createSimpleStream((CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryFactory);){
            Assert.assertTrue((boolean)(primaryOnly instanceof CheckpointStreamWithResultProvider.PrimaryStreamOnly));
        }
        LocalRecoveryDirectoryProvider directoryProvider = this.createLocalRecoveryDirectoryProvider();
        try (CheckpointStreamWithResultProvider primaryAndSecondary = CheckpointStreamWithResultProvider.createDuplicatingStream((long)42L, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryFactory, (LocalRecoveryDirectoryProvider)directoryProvider);){
            Assert.assertTrue((boolean)(primaryAndSecondary instanceof CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream));
        }
    }

    @Test
    public void testCloseAndFinalizeCheckpointStreamResultPrimaryOnly() throws Exception {
        CheckpointStreamFactory primaryFactory = this.createCheckpointStreamFactory();
        CheckpointStreamWithResultProvider resultProvider = CheckpointStreamWithResultProvider.createSimpleStream((CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryFactory);
        SnapshotResult<StreamStateHandle> result = this.writeCheckpointTestData(resultProvider);
        Assert.assertNotNull((Object)result.getJobManagerOwnedSnapshot());
        Assert.assertNull((Object)result.getTaskLocalSnapshot());
        try (FSDataInputStream inputStream = ((StreamStateHandle)result.getJobManagerOwnedSnapshot()).openInputStream();){
            Assert.assertEquals((long)66L, (long)inputStream.read());
            Assert.assertEquals((long)-1L, (long)inputStream.read());
        }
    }

    @Test
    public void testCloseAndFinalizeCheckpointStreamResultPrimaryAndSecondary() throws Exception {
        CheckpointStreamFactory primaryFactory = this.createCheckpointStreamFactory();
        LocalRecoveryDirectoryProvider directoryProvider = this.createLocalRecoveryDirectoryProvider();
        CheckpointStreamWithResultProvider resultProvider = CheckpointStreamWithResultProvider.createDuplicatingStream((long)42L, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryFactory, (LocalRecoveryDirectoryProvider)directoryProvider);
        SnapshotResult<StreamStateHandle> result = this.writeCheckpointTestData(resultProvider);
        Assert.assertNotNull((Object)result.getJobManagerOwnedSnapshot());
        Assert.assertNotNull((Object)result.getTaskLocalSnapshot());
        try (FSDataInputStream inputStream = ((StreamStateHandle)result.getJobManagerOwnedSnapshot()).openInputStream();){
            Assert.assertEquals((long)66L, (long)inputStream.read());
            Assert.assertEquals((long)-1L, (long)inputStream.read());
        }
        inputStream = ((StreamStateHandle)result.getTaskLocalSnapshot()).openInputStream();
        var6_6 = null;
        try {
            Assert.assertEquals((long)66L, (long)inputStream.read());
            Assert.assertEquals((long)-1L, (long)inputStream.read());
        }
        catch (Throwable throwable) {
            var6_6 = throwable;
            throw throwable;
        }
        finally {
            if (inputStream != null) {
                if (var6_6 != null) {
                    try {
                        inputStream.close();
                    }
                    catch (Throwable throwable) {
                        var6_6.addSuppressed(throwable);
                    }
                } else {
                    inputStream.close();
                }
            }
        }
    }

    @Test
    public void testCompletedAndCloseStateHandling() throws Exception {
        CheckpointStreamFactory primaryFactory = this.createCheckpointStreamFactory();
        this.testCloseBeforeComplete((CheckpointStreamWithResultProvider)new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)));
        this.testCompleteBeforeClose((CheckpointStreamWithResultProvider)new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)));
        this.testCloseBeforeComplete((CheckpointStreamWithResultProvider)new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE), primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)));
        this.testCompleteBeforeClose((CheckpointStreamWithResultProvider)new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE), primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)));
    }

    @Test
    public void testCloseMethodForwarding() throws Exception {
        CheckpointStreamFactory streamFactory = this.createCheckpointStreamFactory();
        MethodForwardingTestUtil.testMethodForwarding(Closeable.class, CheckpointStreamWithResultProvider.PrimaryStreamOnly::new, () -> {
            try {
                return streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        MethodForwardingTestUtil.testMethodForwarding(Closeable.class, CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream::new, () -> {
            try {
                return new DuplicatingCheckpointOutputStream(streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE), streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE));
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private SnapshotResult<StreamStateHandle> writeCheckpointTestData(CheckpointStreamWithResultProvider resultProvider) throws IOException {
        CheckpointStateOutputStream checkpointOutputStream = resultProvider.getCheckpointOutputStream();
        checkpointOutputStream.write(66);
        return resultProvider.closeAndFinalizeCheckpointStreamResult();
    }

    private CheckpointStreamFactory createCheckpointStreamFactory() {
        return new MemCheckpointStreamFactory(16384);
    }

    private void testCloseBeforeComplete(CheckpointStreamWithResultProvider resultProvider) throws IOException {
        resultProvider.getCheckpointOutputStream().write(66);
        resultProvider.close();
        try {
            resultProvider.closeAndFinalizeCheckpointStreamResult();
            Assert.fail();
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    private void testCompleteBeforeClose(CheckpointStreamWithResultProvider resultProvider) throws IOException {
        resultProvider.getCheckpointOutputStream().write(66);
        Assert.assertNotNull((Object)resultProvider.closeAndFinalizeCheckpointStreamResult());
        resultProvider.close();
    }

    private LocalRecoveryDirectoryProvider createLocalRecoveryDirectoryProvider() throws IOException {
        File localStateDir = temporaryFolder.newFolder();
        JobID jobID = new JobID();
        JobVertexID jobVertexID = new JobVertexID();
        int subtaskIdx = 0;
        return new LocalRecoveryDirectoryProviderImpl(localStateDir, jobID, jobVertexID, subtaskIdx);
    }
}

