package org.apache.flink.runtime.state;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.RunnableFuture;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/state/StateSnapshotCompressionTest.class */
public class StateSnapshotCompressionTest extends TestLogger {
    @Test
    public void testCompressionConfiguration() throws BackendBuildingException {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setUseSnapshotCompression(true);
        HeapKeyedStateBackend<String> stringHeapKeyedStateBackend = getStringHeapKeyedStateBackend(executionConfig);
        try {
            Assert.assertTrue(SnappyStreamCompressionDecorator.INSTANCE.equals(stringHeapKeyedStateBackend.getKeyGroupCompressionDecorator()));
            IOUtils.closeQuietly(stringHeapKeyedStateBackend);
            stringHeapKeyedStateBackend.dispose();
            ExecutionConfig executionConfig2 = new ExecutionConfig();
            executionConfig2.setUseSnapshotCompression(false);
            try {
                Assert.assertTrue(UncompressedStreamCompressionDecorator.INSTANCE.equals(getStringHeapKeyedStateBackend(executionConfig2).getKeyGroupCompressionDecorator()));
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void snapshotRestoreRoundtripWithCompression() throws Exception {
        snapshotRestoreRoundtrip(true);
    }

    @Test
    public void snapshotRestoreRoundtripUncompressed() throws Exception {
        snapshotRestoreRoundtrip(false);
    }

    private HeapKeyedStateBackend<String> getStringHeapKeyedStateBackend(ExecutionConfig executionConfig) throws BackendBuildingException {
        return getStringHeapKeyedStateBackend(executionConfig, Collections.emptyList());
    }

    private HeapKeyedStateBackend<String> getStringHeapKeyedStateBackend(ExecutionConfig executionConfig, Collection<KeyedStateHandle> collection) throws BackendBuildingException {
        return new HeapKeyedStateBackendBuilder((TaskKvStateRegistry) Mockito.mock(TaskKvStateRegistry.class), StringSerializer.INSTANCE, StateSnapshotCompressionTest.class.getClassLoader(), 16, new KeyGroupRange(0, 15), executionConfig, TtlTimeProvider.DEFAULT, LatencyTrackingStateConfig.disabled(), collection, AbstractStateBackend.getCompressionDecorator(executionConfig), TestLocalRecoveryConfig.disabled(), (HeapPriorityQueueSetFactory) Mockito.mock(HeapPriorityQueueSetFactory.class), true, new CloseableRegistry()).build();
    }

    private void snapshotRestoreRoundtrip(boolean z) throws Exception {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setUseSnapshotCompression(z);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("test", String.class);
        valueStateDescriptor.initializeSerializerUnlessSet(executionConfig);
        HeapKeyedStateBackend<String> stringHeapKeyedStateBackend = getStringHeapKeyedStateBackend(executionConfig);
        try {
            InternalValueState createInternalState = stringHeapKeyedStateBackend.createInternalState(new VoidNamespaceSerializer(), valueStateDescriptor);
            stringHeapKeyedStateBackend.setCurrentKey("A");
            createInternalState.setCurrentNamespace(VoidNamespace.INSTANCE);
            createInternalState.update("42");
            stringHeapKeyedStateBackend.setCurrentKey("B");
            createInternalState.setCurrentNamespace(VoidNamespace.INSTANCE);
            createInternalState.update("43");
            stringHeapKeyedStateBackend.setCurrentKey("C");
            createInternalState.setCurrentNamespace(VoidNamespace.INSTANCE);
            createInternalState.update("44");
            stringHeapKeyedStateBackend.setCurrentKey("D");
            createInternalState.setCurrentNamespace(VoidNamespace.INSTANCE);
            createInternalState.update("45");
            RunnableFuture snapshot = stringHeapKeyedStateBackend.snapshot(0L, 0L, new MemCheckpointStreamFactory(4194304), CheckpointOptions.forCheckpointWithDefaultLocation());
            snapshot.run();
            KeyedStateHandle jobManagerOwnedSnapshot = ((SnapshotResult) snapshot.get()).getJobManagerOwnedSnapshot();
            IOUtils.closeQuietly(stringHeapKeyedStateBackend);
            stringHeapKeyedStateBackend.dispose();
            HeapKeyedStateBackend<String> stringHeapKeyedStateBackend2 = getStringHeapKeyedStateBackend(new ExecutionConfig(), StateObjectCollection.singleton(jobManagerOwnedSnapshot));
            try {
                InternalValueState createInternalState2 = stringHeapKeyedStateBackend2.createInternalState(new VoidNamespaceSerializer(), valueStateDescriptor);
                stringHeapKeyedStateBackend2.setCurrentKey("A");
                createInternalState2.setCurrentNamespace(VoidNamespace.INSTANCE);
                Assert.assertEquals("42", createInternalState2.value());
                stringHeapKeyedStateBackend2.setCurrentKey("B");
                createInternalState2.setCurrentNamespace(VoidNamespace.INSTANCE);
                Assert.assertEquals("43", createInternalState2.value());
                stringHeapKeyedStateBackend2.setCurrentKey("C");
                createInternalState2.setCurrentNamespace(VoidNamespace.INSTANCE);
                Assert.assertEquals("44", createInternalState2.value());
                stringHeapKeyedStateBackend2.setCurrentKey("D");
                createInternalState2.setCurrentNamespace(VoidNamespace.INSTANCE);
                Assert.assertEquals("45", createInternalState2.value());
                IOUtils.closeQuietly(stringHeapKeyedStateBackend2);
                stringHeapKeyedStateBackend2.dispose();
            } catch (Throwable th) {
                IOUtils.closeQuietly(stringHeapKeyedStateBackend2);
                stringHeapKeyedStateBackend2.dispose();
                throw th;
            }
        } catch (Throwable th2) {
            IOUtils.closeQuietly(stringHeapKeyedStateBackend);
            stringHeapKeyedStateBackend.dispose();
            throw th2;
        }
    }
}
