package org.apache.flink.runtime.state.v2;

import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Objects;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.ListStateDescriptor;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.MapStateDescriptor;
import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.runtime.state.v2.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.v2.internal.InternalReducingState;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/v2/AsyncKeyedStateBackendAdaptorTest.class */
public class AsyncKeyedStateBackendAdaptorTest {
    @Test
    public void testValueStateAdaptor() throws Exception {
        CheckpointableKeyedStateBackend<String> createBackend = createBackend();
        AsyncKeyedStateBackendAdaptor asyncKeyedStateBackendAdaptor = new AsyncKeyedStateBackendAdaptor(createBackend);
        createBackend.setCurrentKey("test");
        ValueState orCreateKeyedState = asyncKeyedStateBackendAdaptor.getOrCreateKeyedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("testState", BasicTypeInfo.INT_TYPE_INFO));
        orCreateKeyedState.clear();
        Assertions.assertThat((Integer) orCreateKeyedState.value()).isNull();
        orCreateKeyedState.update(10);
        Assertions.assertThat((Integer) orCreateKeyedState.value()).isEqualTo(10);
        orCreateKeyedState.asyncClear().thenCompose(r4 -> {
            Assertions.assertThat((Integer) orCreateKeyedState.value()).isNull();
            return orCreateKeyedState.asyncUpdate(20);
        }).thenCompose(r42 -> {
            Assertions.assertThat((Integer) orCreateKeyedState.value()).isEqualTo(20);
            return orCreateKeyedState.asyncValue();
        }).thenAccept(num -> {
            Assertions.assertThat(num).isEqualTo(20);
        });
        asyncKeyedStateBackendAdaptor.close();
    }

    @Test
    public void testListStateAdaptor() throws Exception {
        CheckpointableKeyedStateBackend<String> createBackend = createBackend();
        AsyncKeyedStateBackendAdaptor asyncKeyedStateBackendAdaptor = new AsyncKeyedStateBackendAdaptor(createBackend);
        createBackend.setCurrentKey("test");
        ListState orCreateKeyedState = asyncKeyedStateBackendAdaptor.getOrCreateKeyedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor("testState", BasicTypeInfo.INT_TYPE_INFO));
        orCreateKeyedState.clear();
        Assertions.assertThat((Iterable) orCreateKeyedState.get()).isNull();
        orCreateKeyedState.add(10);
        Assertions.assertThat((Iterable) orCreateKeyedState.get()).containsExactlyInAnyOrderElementsOf(Arrays.asList(10));
        orCreateKeyedState.addAll(Arrays.asList(20, 30));
        Assertions.assertThat((Iterable) orCreateKeyedState.get()).containsExactlyInAnyOrderElementsOf(Arrays.asList(10, 20, 30));
        orCreateKeyedState.update(Arrays.asList(40, 50));
        Assertions.assertThat((Iterable) orCreateKeyedState.get()).containsExactlyInAnyOrderElementsOf(Arrays.asList(40, 50));
        orCreateKeyedState.asyncClear().thenCompose(r4 -> {
            Assertions.assertThat((Iterable) orCreateKeyedState.get()).isNull();
            return orCreateKeyedState.asyncAdd(10);
        }).thenCompose(r7 -> {
            Assertions.assertThat((Iterable) orCreateKeyedState.get()).containsExactlyInAnyOrderElementsOf(Arrays.asList(10));
            return orCreateKeyedState.asyncAddAll(Arrays.asList(20, 30));
        }).thenCompose(r72 -> {
            Assertions.assertThat((Iterable) orCreateKeyedState.get()).containsExactlyInAnyOrderElementsOf(Arrays.asList(10, 20, 30));
            return orCreateKeyedState.asyncUpdate(Arrays.asList(40, 50));
        }).thenAccept(r73 -> {
            Assertions.assertThat((Iterable) orCreateKeyedState.get()).containsExactlyInAnyOrderElementsOf(Arrays.asList(40, 50));
        });
        asyncKeyedStateBackendAdaptor.close();
    }

    @Test
    public void testMapStateAdaptor() throws Exception {
        CheckpointableKeyedStateBackend<String> createBackend = createBackend();
        AsyncKeyedStateBackendAdaptor asyncKeyedStateBackendAdaptor = new AsyncKeyedStateBackendAdaptor(createBackend);
        createBackend.setCurrentKey("test");
        MapState orCreateKeyedState = asyncKeyedStateBackendAdaptor.getOrCreateKeyedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new MapStateDescriptor("testState", BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
        HashMap<Integer, Integer> hashMap = new HashMap<Integer, Integer>() { // from class: org.apache.flink.runtime.state.v2.AsyncKeyedStateBackendAdaptorTest.1
            {
                put(10, 100);
                put(20, 200);
            }
        };
        orCreateKeyedState.clear();
        Assertions.assertThat(orCreateKeyedState.isEmpty()).isTrue();
        orCreateKeyedState.put(10, 100);
        Assertions.assertThat((Integer) orCreateKeyedState.get(10)).isEqualTo(100);
        orCreateKeyedState.putAll(new HashMap<Integer, Integer>() { // from class: org.apache.flink.runtime.state.v2.AsyncKeyedStateBackendAdaptorTest.2
            {
                put(20, 200);
            }
        });
        Assertions.assertThat((Integer) orCreateKeyedState.get(20)).isEqualTo(200);
        Assertions.assertThat(orCreateKeyedState.contains(20)).isTrue();
        Assertions.assertThat(orCreateKeyedState.entries()).containsExactlyInAnyOrderElementsOf(hashMap.entrySet());
        Assertions.assertThat(orCreateKeyedState.keys()).containsExactlyInAnyOrderElementsOf(hashMap.keySet());
        Assertions.assertThat(orCreateKeyedState.values()).containsExactlyInAnyOrderElementsOf(hashMap.values());
        Assertions.assertThat(orCreateKeyedState.iterator()).toIterable().containsExactlyInAnyOrderElementsOf(hashMap.entrySet());
        orCreateKeyedState.asyncClear().thenCompose(r5 -> {
            Assertions.assertThat(orCreateKeyedState.isEmpty()).isTrue();
            return orCreateKeyedState.asyncPut(10, 100);
        }).thenCompose(r52 -> {
            Assertions.assertThat((Integer) orCreateKeyedState.get(10)).isEqualTo(100);
            return orCreateKeyedState.asyncPutAll(hashMap);
        }).thenCompose(r4 -> {
            Assertions.assertThat((Integer) orCreateKeyedState.get(20)).isEqualTo(200);
            return orCreateKeyedState.asyncContains(20);
        }).thenCompose(bool -> {
            Assertions.assertThat(bool).isTrue();
            return orCreateKeyedState.asyncEntries();
        }).thenCompose(stateIterator -> {
            HashMap hashMap2 = new HashMap();
            stateIterator.onNext(entry -> {
                hashMap2.put((Integer) entry.getKey(), (Integer) entry.getValue());
            });
            Assertions.assertThat(hashMap2.entrySet()).containsExactlyInAnyOrderElementsOf(hashMap.entrySet());
            return orCreateKeyedState.asyncKeys();
        }).thenCompose(stateIterator2 -> {
            HashSet hashSet = new HashSet();
            Objects.requireNonNull(hashSet);
            stateIterator2.onNext((v1) -> {
                r1.add(v1);
            });
            Assertions.assertThat(hashSet).containsExactlyInAnyOrderElementsOf(hashMap.keySet());
            return orCreateKeyedState.asyncValues();
        }).thenAccept(stateIterator3 -> {
            HashSet hashSet = new HashSet();
            Objects.requireNonNull(hashSet);
            stateIterator3.onNext((v1) -> {
                r1.add(v1);
            });
            Assertions.assertThat(hashSet).containsExactlyInAnyOrderElementsOf(hashMap.values());
        });
        asyncKeyedStateBackendAdaptor.close();
    }

    @Test
    public void testReducingStateAdaptor() throws Exception {
        CheckpointableKeyedStateBackend<String> createBackend = createBackend();
        AsyncKeyedStateBackendAdaptor asyncKeyedStateBackendAdaptor = new AsyncKeyedStateBackendAdaptor(createBackend);
        createBackend.setCurrentKey("test");
        InternalReducingState orCreateKeyedState = asyncKeyedStateBackendAdaptor.getOrCreateKeyedState(0L, LongSerializer.INSTANCE, new ReducingStateDescriptor("testState", (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, BasicTypeInfo.INT_TYPE_INFO));
        orCreateKeyedState.clear();
        Assertions.assertThat((Integer) orCreateKeyedState.get()).isNull();
        orCreateKeyedState.add(10);
        Assertions.assertThat((Integer) orCreateKeyedState.get()).isEqualTo(10);
        orCreateKeyedState.add(20);
        Assertions.assertThat((Integer) orCreateKeyedState.get()).isEqualTo(30);
        orCreateKeyedState.setCurrentNamespace(1L);
        Assertions.assertThat((Integer) orCreateKeyedState.get()).isNull();
        orCreateKeyedState.add(30);
        orCreateKeyedState.mergeNamespaces(0L, Arrays.asList(0L, 1L));
        orCreateKeyedState.setCurrentNamespace(0L);
        Assertions.assertThat((Integer) orCreateKeyedState.get()).isEqualTo(60);
        orCreateKeyedState.setCurrentNamespace(1L);
        Assertions.assertThat((Integer) orCreateKeyedState.get()).isNull();
        orCreateKeyedState.setCurrentNamespace(0L);
        orCreateKeyedState.asyncClear().thenCompose(r4 -> {
            Assertions.assertThat((Integer) orCreateKeyedState.get()).isNull();
            return orCreateKeyedState.asyncAdd(10);
        }).thenCompose(r3 -> {
            return orCreateKeyedState.asyncGet();
        }).thenCompose(num -> {
            Assertions.assertThat(num).isEqualTo(10);
            return orCreateKeyedState.asyncAdd(20);
        }).thenCompose(r32 -> {
            return orCreateKeyedState.asyncGet();
        }).thenCompose(num2 -> {
            Assertions.assertThat(num2).isEqualTo(30);
            orCreateKeyedState.setCurrentNamespace(1L);
            return orCreateKeyedState.asyncGet();
        }).thenCompose(num3 -> {
            Assertions.assertThat(num3).isNull();
            return orCreateKeyedState.asyncAdd(30);
        }).thenCompose(r9 -> {
            return orCreateKeyedState.asyncMergeNamespaces(0L, Arrays.asList(0L, 1L));
        }).thenCompose(r5 -> {
            orCreateKeyedState.setCurrentNamespace(0L);
            return orCreateKeyedState.asyncGet();
        }).thenCompose(num4 -> {
            Assertions.assertThat(num4).isEqualTo(60);
            orCreateKeyedState.setCurrentNamespace(1L);
            return orCreateKeyedState.asyncGet();
        }).thenAccept(num5 -> {
            Assertions.assertThat(num5).isNull();
        });
        asyncKeyedStateBackendAdaptor.close();
    }

    @Test
    public void testAggregatingStateAdaptor() throws Exception {
        CheckpointableKeyedStateBackend<String> createBackend = createBackend();
        AsyncKeyedStateBackendAdaptor asyncKeyedStateBackendAdaptor = new AsyncKeyedStateBackendAdaptor(createBackend);
        createBackend.setCurrentKey("test");
        InternalAggregatingState orCreateKeyedState = asyncKeyedStateBackendAdaptor.getOrCreateKeyedState(0L, LongSerializer.INSTANCE, new AggregatingStateDescriptor("testState", new AggregateFunction<Integer, Integer, String>() { // from class: org.apache.flink.runtime.state.v2.AsyncKeyedStateBackendAdaptorTest.3
            /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
            public Integer m648createAccumulator() {
                return 0;
            }

            public Integer add(Integer num, Integer num2) {
                return Integer.valueOf(num2.intValue() + num.intValue());
            }

            public String getResult(Integer num) {
                return num.toString();
            }

            public Integer merge(Integer num, Integer num2) {
                return Integer.valueOf(num.intValue() + num2.intValue());
            }
        }, BasicTypeInfo.INT_TYPE_INFO));
        orCreateKeyedState.clear();
        Assertions.assertThat((String) orCreateKeyedState.get()).isNull();
        orCreateKeyedState.add(10);
        Assertions.assertThat((String) orCreateKeyedState.get()).isEqualTo("10");
        orCreateKeyedState.add(20);
        Assertions.assertThat((String) orCreateKeyedState.get()).isEqualTo("30");
        orCreateKeyedState.setCurrentNamespace(1L);
        Assertions.assertThat((String) orCreateKeyedState.get()).isNull();
        orCreateKeyedState.add(30);
        orCreateKeyedState.mergeNamespaces(0L, Arrays.asList(0L, 1L));
        orCreateKeyedState.setCurrentNamespace(0L);
        Assertions.assertThat((String) orCreateKeyedState.get()).isEqualTo("60");
        orCreateKeyedState.setCurrentNamespace(1L);
        Assertions.assertThat((String) orCreateKeyedState.get()).isNull();
        orCreateKeyedState.setCurrentNamespace(0L);
        orCreateKeyedState.asyncClear().thenCompose(r4 -> {
            Assertions.assertThat((String) orCreateKeyedState.get()).isNull();
            return orCreateKeyedState.asyncAdd(10);
        }).thenCompose(r3 -> {
            return orCreateKeyedState.asyncGet();
        }).thenCompose(str -> {
            Assertions.assertThat(str).isEqualTo("10");
            return orCreateKeyedState.asyncAdd(20);
        }).thenCompose(r32 -> {
            return orCreateKeyedState.asyncGet();
        }).thenCompose(str2 -> {
            Assertions.assertThat(str2).isEqualTo("30");
            orCreateKeyedState.setCurrentNamespace(1L);
            return orCreateKeyedState.asyncGet();
        }).thenCompose(str3 -> {
            Assertions.assertThat(str3).isNull();
            return orCreateKeyedState.asyncAdd(30);
        }).thenCompose(r9 -> {
            return orCreateKeyedState.asyncMergeNamespaces(0L, Arrays.asList(0L, 1L));
        }).thenCompose(r5 -> {
            orCreateKeyedState.setCurrentNamespace(0L);
            return orCreateKeyedState.asyncGet();
        }).thenCompose(str4 -> {
            Assertions.assertThat(str4).isEqualTo("60");
            orCreateKeyedState.setCurrentNamespace(1L);
            return orCreateKeyedState.asyncGet();
        }).thenAccept(str5 -> {
            Assertions.assertThat(str5).isNull();
        });
        asyncKeyedStateBackendAdaptor.close();
    }

    private static CheckpointableKeyedStateBackend<String> createBackend() throws Exception {
        ExecutionConfig executionConfig = new ExecutionConfig();
        return new HeapKeyedStateBackendBuilder(new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()), StringSerializer.INSTANCE, ClassLoader.getSystemClassLoader(), 128, new KeyGroupRange(0, 127), executionConfig, TtlTimeProvider.DEFAULT, LatencyTrackingStateConfig.disabled(), Collections.EMPTY_LIST, AbstractStateBackend.getCompressionDecorator(executionConfig), TestLocalRecoveryConfig.disabled(), new HeapPriorityQueueSetFactory(new KeyGroupRange(0, 127), 128, 128), true, new CloseableRegistry()).build();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 114251:
                if (implMethodName.equals("sum")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/ReduceFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("reduce") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("(II)I")) {
                    return (v0, v1) -> {
                        return Integer.sum(v0, v1);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
