package org.apache.flink.runtime.state.v2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.MockStateRequestContainer;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestContainer;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.class */
class AbstractAggregatingStateTest extends AbstractKeyedStateTestBase {

    /* loaded from: input_file:org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest$AggregatingStateExecutor.class */
    static class AggregatingStateExecutor implements StateExecutor {
        private static final HashMap<Tuple2<String, String>, Integer> hashMap = new HashMap<>();

        AggregatingStateExecutor() {
        }

        public CompletableFuture<Void> executeBatchRequests(StateRequestContainer stateRequestContainer) {
            Iterator<StateRequest<?, ?, ?, ?>> it = ((MockStateRequestContainer) stateRequestContainer).getStateRequestList().iterator();
            while (it.hasNext()) {
                executeRequestSync(it.next());
            }
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            completableFuture.complete(null);
            return completableFuture;
        }

        public StateRequestContainer createStateRequestContainer() {
            return new MockStateRequestContainer();
        }

        public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) {
            String str = (String) stateRequest.getRecordContext().getKey();
            String str2 = (String) stateRequest.getNamespace();
            if (stateRequest.getRequestType() != StateRequestType.AGGREGATING_ADD) {
                if (stateRequest.getRequestType() != StateRequestType.AGGREGATING_GET) {
                    throw new UnsupportedOperationException("Unsupported type");
                }
                stateRequest.getFuture().complete(hashMap.get(Tuple2.of(str, str2)));
                return;
            }
            if (stateRequest.getPayload() == null) {
                hashMap.remove(Tuple2.of(str, str2));
                stateRequest.getFuture().complete((Object) null);
            } else {
                hashMap.put(Tuple2.of(str, str2), (Integer) stateRequest.getPayload());
                stateRequest.getFuture().complete((Object) null);
            }
        }

        public boolean fullyLoaded() {
            return false;
        }

        public void shutdown() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest$SumAggregator.class */
    static class SumAggregator implements AggregateFunction<Integer, Integer, Integer> {
        private final int init;

        public SumAggregator(int i) {
            this.init = i;
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Integer m646createAccumulator() {
            return Integer.valueOf(this.init);
        }

        public Integer add(Integer num, Integer num2) {
            return Integer.valueOf(num2.intValue() + num.intValue());
        }

        public Integer getResult(Integer num) {
            return num;
        }

        public Integer merge(Integer num, Integer num2) {
            return Integer.valueOf(num.intValue() + num2.intValue());
        }
    }

    AbstractAggregatingStateTest() {
    }

    @Test
    public void testAggregating() {
        AbstractAggregatingState abstractAggregatingState = new AbstractAggregatingState(this.aec, new AggregatingStateDescriptor("testAggState", new SumAggregator(1), BasicTypeInfo.INT_TYPE_INFO));
        this.aec.setCurrentContext(this.aec.buildContext("test", "test"));
        abstractAggregatingState.asyncClear();
        validateRequestRun(abstractAggregatingState, StateRequestType.CLEAR, null, 0);
        abstractAggregatingState.asyncGet();
        validateRequestRun(abstractAggregatingState, StateRequestType.AGGREGATING_GET, null, 0);
        abstractAggregatingState.asyncAdd(1);
        validateRequestRun(abstractAggregatingState, StateRequestType.AGGREGATING_GET, null, 1);
        validateRequestRun(abstractAggregatingState, StateRequestType.AGGREGATING_ADD, 2, 0);
        abstractAggregatingState.asyncAdd(5);
        validateRequestRun(abstractAggregatingState, StateRequestType.AGGREGATING_GET, null, 1);
        validateRequestRun(abstractAggregatingState, StateRequestType.AGGREGATING_ADD, 6, 0);
    }

    @Test
    public void testMergeNamespace() throws Exception {
        AggregatingStateDescriptor aggregatingStateDescriptor = new AggregatingStateDescriptor("testState", new SumAggregator(0), BasicTypeInfo.INT_TYPE_INFO);
        AsyncExecutionController asyncExecutionController = new AsyncExecutionController(new SyncMailboxExecutor(), (str, th) -> {
        }, new AggregatingStateExecutor(), new DeclarationManager(), 1, 100, 10000L, 1, (AsyncExecutionController.SwitchContextListener) null, (MetricGroup) null);
        AbstractAggregatingState abstractAggregatingState = new AbstractAggregatingState(asyncExecutionController, aggregatingStateDescriptor);
        asyncExecutionController.setCurrentContext(asyncExecutionController.buildContext("test", "test"));
        asyncExecutionController.setCurrentNamespaceForState(abstractAggregatingState, "1");
        abstractAggregatingState.asyncAdd(1);
        asyncExecutionController.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.size()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "1"))).isEqualTo(1);
        asyncExecutionController.setCurrentNamespaceForState(abstractAggregatingState, "2");
        abstractAggregatingState.asyncAdd(2);
        asyncExecutionController.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.size()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "1"))).isEqualTo(1);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "2"))).isEqualTo(2);
        asyncExecutionController.setCurrentNamespaceForState(abstractAggregatingState, "3");
        abstractAggregatingState.asyncAdd(3);
        asyncExecutionController.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.size()).isEqualTo(3);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "1"))).isEqualTo(1);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "2"))).isEqualTo(2);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "3"))).isEqualTo(3);
        abstractAggregatingState.asyncMergeNamespaces("0", new ArrayList(Arrays.asList("1", "2", "3")));
        asyncExecutionController.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.size()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "0"))).isEqualTo(6);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "1"))).isNull();
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "2"))).isNull();
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "3"))).isNull();
        asyncExecutionController.setCurrentNamespaceForState(abstractAggregatingState, "4");
        abstractAggregatingState.asyncAdd(4);
        asyncExecutionController.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.size()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "0"))).isEqualTo(6);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "4"))).isEqualTo(4);
        abstractAggregatingState.asyncMergeNamespaces("0", new ArrayList(Arrays.asList("4")));
        asyncExecutionController.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.size()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "0"))).isEqualTo(10);
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "1"))).isNull();
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "2"))).isNull();
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "3"))).isNull();
        AssertionsForClassTypes.assertThat(AggregatingStateExecutor.hashMap.get(Tuple2.of("test", "4"))).isNull();
    }
}
