package org.apache.flink.runtime.state.v2;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.MockStateRequestContainer;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestContainer;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/v2/AbstractReducingStateTest.class */
public class AbstractReducingStateTest extends AbstractKeyedStateTestBase {

    /* loaded from: input_file:org/apache/flink/runtime/state/v2/AbstractReducingStateTest$ReducingStateExecutor.class */
    static class ReducingStateExecutor implements StateExecutor {
        private static final HashMap<Tuple2<String, String>, Integer> hashMap = new HashMap<>();

        public ReducingStateExecutor() {
            hashMap.clear();
        }

        public CompletableFuture<Void> executeBatchRequests(StateRequestContainer stateRequestContainer) {
            Preconditions.checkArgument(stateRequestContainer instanceof MockStateRequestContainer);
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            for (StateRequest<?, ?, ?, ?> stateRequest : ((MockStateRequestContainer) stateRequestContainer).getStateRequestList()) {
                if (stateRequest.getRequestType() == StateRequestType.REDUCING_GET) {
                    stateRequest.getFuture().complete(hashMap.get(Tuple2.of((String) stateRequest.getRecordContext().getKey(), (String) stateRequest.getNamespace())));
                } else {
                    if (stateRequest.getRequestType() != StateRequestType.REDUCING_ADD) {
                        throw new UnsupportedOperationException("Unsupported request type");
                    }
                    String str = (String) stateRequest.getRecordContext().getKey();
                    String str2 = (String) stateRequest.getNamespace();
                    if (stateRequest.getPayload() == null) {
                        hashMap.remove(Tuple2.of(str, str2));
                        stateRequest.getFuture().complete((Object) null);
                    } else {
                        hashMap.put(Tuple2.of(str, str2), (Integer) stateRequest.getPayload());
                        stateRequest.getFuture().complete((Object) null);
                    }
                }
            }
            completableFuture.complete(null);
            return completableFuture;
        }

        public StateRequestContainer createStateRequestContainer() {
            return new MockStateRequestContainer();
        }

        public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) {
            throw new UnsupportedOperationException("Unsupported synchronous execution");
        }

        public boolean fullyLoaded() {
            return false;
        }

        public void shutdown() {
        }
    }

    @Test
    public void testEachOperation() {
        AbstractReducingState abstractReducingState = new AbstractReducingState(this.aec, new ReducingStateDescriptor("testState", (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, BasicTypeInfo.INT_TYPE_INFO));
        this.aec.setCurrentContext(this.aec.buildContext("test", "test"));
        abstractReducingState.asyncClear();
        validateRequestRun(abstractReducingState, StateRequestType.CLEAR, null, 0);
        abstractReducingState.asyncGet();
        validateRequestRun(abstractReducingState, StateRequestType.REDUCING_GET, null, 0);
        abstractReducingState.asyncAdd(1);
        validateRequestRun(abstractReducingState, StateRequestType.REDUCING_GET, null, 1);
        validateRequestRun(abstractReducingState, StateRequestType.REDUCING_ADD, 1, 0);
        abstractReducingState.clear();
        validateRequestRun(abstractReducingState, StateRequestType.CLEAR, null, 0);
        abstractReducingState.get();
        validateRequestRun(abstractReducingState, StateRequestType.REDUCING_GET, null, 0);
        abstractReducingState.add(1);
        validateRequestRun(abstractReducingState, StateRequestType.REDUCING_GET, null, 1);
        validateRequestRun(abstractReducingState, StateRequestType.REDUCING_ADD, 1, 0);
    }

    @Test
    public void testMergeNamespace() throws Exception {
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("testState", (v0, v1) -> {
            return Integer.sum(v0, v1);
        }, BasicTypeInfo.INT_TYPE_INFO);
        AsyncExecutionController asyncExecutionController = new AsyncExecutionController(new SyncMailboxExecutor(), (str, th) -> {
        }, new ReducingStateExecutor(), new DeclarationManager(), 1, 100, 10000L, 1, (AsyncExecutionController.SwitchContextListener) null, (MetricGroup) null);
        AbstractReducingState abstractReducingState = new AbstractReducingState(asyncExecutionController, reducingStateDescriptor);
        asyncExecutionController.setCurrentContext(asyncExecutionController.buildContext("test", "test"));
        asyncExecutionController.setCurrentNamespaceForState(abstractReducingState, "1");
        abstractReducingState.asyncAdd(1);
        asyncExecutionController.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.size()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "1"))).isEqualTo(1);
        asyncExecutionController.setCurrentNamespaceForState(abstractReducingState, "2");
        abstractReducingState.asyncAdd(2);
        asyncExecutionController.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.size()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "1"))).isEqualTo(1);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "2"))).isEqualTo(2);
        asyncExecutionController.setCurrentNamespaceForState(abstractReducingState, "3");
        abstractReducingState.asyncAdd(3);
        asyncExecutionController.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.size()).isEqualTo(3);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "1"))).isEqualTo(1);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "2"))).isEqualTo(2);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "3"))).isEqualTo(3);
        abstractReducingState.asyncMergeNamespaces("0", new ArrayList(Arrays.asList("1", "2", "3")));
        asyncExecutionController.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.size()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "0"))).isEqualTo(6);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "1"))).isNull();
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "2"))).isNull();
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "3"))).isNull();
        asyncExecutionController.setCurrentNamespaceForState(abstractReducingState, "4");
        abstractReducingState.asyncAdd(4);
        asyncExecutionController.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.size()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "0"))).isEqualTo(6);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "4"))).isEqualTo(4);
        abstractReducingState.asyncMergeNamespaces("0", new ArrayList(Arrays.asList("4")));
        asyncExecutionController.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.size()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "0"))).isEqualTo(10);
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "1"))).isNull();
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "2"))).isNull();
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "3"))).isNull();
        AssertionsForClassTypes.assertThat(ReducingStateExecutor.hashMap.get(Tuple2.of("test", "4"))).isNull();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 114251:
                if (implMethodName.equals("sum")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/ReduceFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("reduce") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("(II)I")) {
                    return (v0, v1) -> {
                        return Integer.sum(v0, v1);
                    };
                }
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/ReduceFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("reduce") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("(II)I")) {
                    return (v0, v1) -> {
                        return Integer.sum(v0, v1);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
