/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import junit.framework.TestCase;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.state.ChangelogTestUtils;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.state.SharedStateRegistryKey;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestStreamStateHandle;
import org.apache.flink.runtime.state.TestingStreamStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.junit.Assert;
import org.junit.Test;

public class SharedStateRegistryTest {
    private static final String RESTORED_STATE_ID = "restored-state";

    @Test
    public void testRegistryNormal() {
        TestSharedState firstState = new TestSharedState("first");
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        StreamStateHandle result = sharedStateRegistry.registerReference(firstState.getRegistrationKey(), (StreamStateHandle)firstState, 0L);
        Assert.assertTrue((firstState == result ? 1 : 0) != 0);
        TestCase.assertFalse((boolean)firstState.isDiscarded());
        TestSharedState secondState = new TestSharedState("second");
        result = sharedStateRegistry.registerReference(secondState.getRegistrationKey(), (StreamStateHandle)secondState, 0L);
        Assert.assertTrue((secondState == result ? 1 : 0) != 0);
        TestCase.assertFalse((boolean)firstState.isDiscarded());
        TestCase.assertFalse((boolean)secondState.isDiscarded());
        TestSharedState firstStatePrime = new TestSharedState(firstState.getRegistrationKey().getKeyString());
        result = sharedStateRegistry.registerReference(firstState.getRegistrationKey(), (StreamStateHandle)firstStatePrime, 0L);
        Assert.assertTrue((firstStatePrime == result ? 1 : 0) != 0);
        TestCase.assertFalse((boolean)firstStatePrime.isDiscarded());
        TestCase.assertFalse((firstState == result ? 1 : 0) != 0);
        Assert.assertTrue((boolean)firstState.isDiscarded());
        sharedStateRegistry.checkpointCompleted(0L);
        TestSharedState firstStateDPrime = new TestSharedState(firstState.getRegistrationKey().getKeyString());
        result = sharedStateRegistry.registerReference(firstState.getRegistrationKey(), (StreamStateHandle)firstStateDPrime, 0L);
        TestCase.assertFalse((firstStateDPrime == result ? 1 : 0) != 0);
        Assert.assertTrue((boolean)firstStateDPrime.isDiscarded());
        Assert.assertTrue((firstStatePrime == result ? 1 : 0) != 0);
        TestCase.assertFalse((boolean)firstStatePrime.isDiscarded());
        result = sharedStateRegistry.registerReference(firstState.getRegistrationKey(), (StreamStateHandle)firstState, 0L);
        Assert.assertTrue((firstStatePrime == result ? 1 : 0) != 0);
        TestCase.assertFalse((boolean)firstStatePrime.isDiscarded());
        sharedStateRegistry.unregisterUnusedState(1L);
        Assert.assertTrue((boolean)secondState.isDiscarded());
        Assert.assertTrue((boolean)firstState.isDiscarded());
    }

    @Test
    public void testUnregisterWithUnexistedKey() {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        sharedStateRegistry.unregisterUnusedState(-1L);
        sharedStateRegistry.unregisterUnusedState(Long.MAX_VALUE);
    }

    @Test
    public void testRegisterChangelogStateBackendHandles() throws InterruptedException {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        long materializationId1 = 1L;
        ChangelogTestUtils.IncrementalStateHandleWrapper materializedStateBase1 = ChangelogTestUtils.createDummyIncrementalStateHandle(materializationId1);
        ChangelogTestUtils.IncrementalStateHandleWrapper materializedState1 = materializedStateBase1.deserialize();
        ChangelogTestUtils.ChangelogStateHandleWrapper nonMaterializedState1 = ChangelogTestUtils.createDummyChangelogStateHandle(1L, 2L);
        long materializationId = 1L;
        long checkpointId1 = 41L;
        ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl changelogStateBackendHandle1 = new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(Collections.singletonList(materializedState1), Collections.singletonList(nonMaterializedState1), materializedStateBase1.getKeyGroupRange(), checkpointId1, materializationId, nonMaterializedState1.getStateSize());
        changelogStateBackendHandle1.registerSharedStates((SharedStateRegistry)sharedStateRegistry, checkpointId1);
        sharedStateRegistry.checkpointCompleted(checkpointId1);
        sharedStateRegistry.unregisterUnusedState(checkpointId1);
        ChangelogTestUtils.IncrementalStateHandleWrapper materializedState2 = materializedStateBase1.deserialize();
        ChangelogTestUtils.ChangelogStateHandleWrapper nonMaterializedState2 = ChangelogTestUtils.createDummyChangelogStateHandle(2L, 3L);
        long checkpointId2 = 42L;
        ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl changelogStateBackendHandle2 = new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(Collections.singletonList(materializedState2), Collections.singletonList(nonMaterializedState2), materializedStateBase1.getKeyGroupRange(), checkpointId2, materializationId, nonMaterializedState2.getStateSize());
        changelogStateBackendHandle2.registerSharedStates((SharedStateRegistry)sharedStateRegistry, checkpointId2);
        sharedStateRegistry.checkpointCompleted(checkpointId2);
        sharedStateRegistry.unregisterUnusedState(checkpointId2);
        TestCase.assertFalse((boolean)materializedState1.isDiscarded());
        TestCase.assertFalse((boolean)materializedState2.isDiscarded());
        Assert.assertTrue((boolean)nonMaterializedState1.isDiscarded());
        long materializationId2 = 2L;
        ChangelogTestUtils.IncrementalStateHandleWrapper materializedStateBase2 = ChangelogTestUtils.createDummyIncrementalStateHandle(materializationId2);
        ChangelogTestUtils.IncrementalStateHandleWrapper materializedState3 = materializedStateBase2.deserialize();
        long checkpointId3 = 43L;
        ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl changelogStateBackendHandle3 = new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(Collections.singletonList(materializedState3), Collections.singletonList(nonMaterializedState2), materializedState3.getKeyGroupRange(), checkpointId3, materializationId2, 0L);
        changelogStateBackendHandle3.registerSharedStates((SharedStateRegistry)sharedStateRegistry, checkpointId3);
        sharedStateRegistry.checkpointCompleted(checkpointId3);
        sharedStateRegistry.unregisterUnusedState(checkpointId3);
        Assert.assertTrue((boolean)materializedState1.isDiscarded());
        TestCase.assertFalse((boolean)nonMaterializedState2.isDiscarded());
    }

    @Test
    public void testUnregisterUnusedSavepointState() {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        TestingStreamStateHandle handle = new TestingStreamStateHandle();
        this.registerInitialCheckpoint((SharedStateRegistry)sharedStateRegistry, RESTORED_STATE_ID, CheckpointProperties.forSavepoint((boolean)false, (SavepointFormatType)SavepointFormatType.NATIVE));
        sharedStateRegistry.registerReference(new SharedStateRegistryKey(RESTORED_STATE_ID), (StreamStateHandle)handle, 2L);
        sharedStateRegistry.registerReference(new SharedStateRegistryKey(RESTORED_STATE_ID), (StreamStateHandle)handle, 3L);
        sharedStateRegistry.registerReference(new SharedStateRegistryKey("new-state"), (StreamStateHandle)new TestingStreamStateHandle(), 4L);
        Assert.assertEquals((String)"Only the initial checkpoint should be retained because its state is in use", Collections.singleton(1L), (Object)sharedStateRegistry.unregisterUnusedState(3L));
        Assert.assertTrue((String)"The initial checkpoint state is unused so it could be discarded", (boolean)sharedStateRegistry.unregisterUnusedState(4L).isEmpty());
    }

    @Test
    public void testUnregisterNonInitialCheckpoint() {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        sharedStateRegistry.registerReference(new SharedStateRegistryKey("stateId"), (StreamStateHandle)new TestingStreamStateHandle(), 1L);
        sharedStateRegistry.registerReference(new SharedStateRegistryKey("stateId"), (StreamStateHandle)new TestingStreamStateHandle(), 2L);
        Assert.assertTrue((String)"First (non-initial) checkpoint could be discarded", (boolean)sharedStateRegistry.unregisterUnusedState(2L).isEmpty());
    }

    @Test
    public void testUnregisterInitialCheckpoint() {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        TestingStreamStateHandle handle = new TestingStreamStateHandle();
        this.registerInitialCheckpoint((SharedStateRegistry)sharedStateRegistry, RESTORED_STATE_ID, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION));
        sharedStateRegistry.registerReference(new SharedStateRegistryKey(RESTORED_STATE_ID), (StreamStateHandle)handle, 2L);
        Assert.assertTrue((String)"(retained) checkpoint - should NOT be considered in use even if its state is in use", (boolean)sharedStateRegistry.unregisterUnusedState(2L).isEmpty());
    }

    @Test
    public void testUnregisterInitialCheckpointUsedInChangelog() {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        TestingStreamStateHandle handle = new TestingStreamStateHandle();
        this.registerInitialCheckpoint((SharedStateRegistry)sharedStateRegistry, RESTORED_STATE_ID, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION));
        sharedStateRegistry.registerReference(new SharedStateRegistryKey(RESTORED_STATE_ID), (StreamStateHandle)handle, 2L, true);
        sharedStateRegistry.registerReference(new SharedStateRegistryKey(RESTORED_STATE_ID), (StreamStateHandle)handle, 3L, false);
        Assert.assertEquals((String)"(retained) checkpoint - should be considered in use as long as its state is in use by changelog", Collections.singleton(1L), (Object)sharedStateRegistry.unregisterUnusedState(3L));
    }

    private void registerInitialCheckpoint(SharedStateRegistry sharedStateRegistry, String stateId, CheckpointProperties properties) {
        IncrementalRemoteKeyedStateHandle initialHandle = IncrementalRemoteKeyedStateHandle.restore((UUID)UUID.randomUUID(), (KeyGroupRange)KeyGroupRange.EMPTY_KEY_GROUP_RANGE, (long)1L, Collections.emptyMap(), Collections.emptyMap(), (StreamStateHandle)new ByteStreamStateHandle("meta", new byte[1]), (long)1024L, (StateHandleID)new StateHandleID(stateId));
        OperatorID operatorID = new OperatorID();
        OperatorState operatorState = new OperatorState(operatorID, 1, 1);
        operatorState.putState(0, OperatorSubtaskState.builder().setManagedKeyedState((KeyedStateHandle)initialHandle).build());
        sharedStateRegistry.registerAllAfterRestored(new CompletedCheckpoint(new JobID(), 1L, 1L, 1L, Collections.singletonMap(operatorID, operatorState), Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation(), null, properties), RestoreMode.DEFAULT);
    }

    private static class TestSharedState
    implements TestStreamStateHandle {
        private static final long serialVersionUID = 4468635881465159780L;
        private SharedStateRegistryKey key;
        private boolean discarded;

        TestSharedState(String key) {
            this.key = new SharedStateRegistryKey(key);
            this.discarded = false;
        }

        public SharedStateRegistryKey getRegistrationKey() {
            return this.key;
        }

        public void discardState() throws Exception {
            this.discarded = true;
        }

        public long getStateSize() {
            return this.key.toString().length();
        }

        public int hashCode() {
            return this.key.hashCode();
        }

        public FSDataInputStream openInputStream() throws IOException {
            throw new UnsupportedOperationException();
        }

        public Optional<byte[]> asBytesIfInMemory() {
            return Optional.empty();
        }

        public boolean isDiscarded() {
            return this.discarded;
        }
    }
}

