package org.apache.flink.runtime.asyncprocessing;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.state.StateFutureImpl;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.EpochManager;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendTestUtils;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.v2.AbstractValueState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.class */
class AsyncExecutionControllerTest {
    AsyncExecutionController<String> aec;
    AtomicInteger output;
    TestValueState valueState;
    final Runnable userCode = () -> {
        this.valueState.asyncValue().thenCompose(num -> {
            int intValue = num == null ? 1 : num.intValue() + 1;
            return this.valueState.asyncUpdate(Integer.valueOf(intValue)).thenCompose(r3 -> {
                return StateFutureUtils.completedFuture(Integer.valueOf(intValue));
            });
        }).thenAccept(num2 -> {
            this.output.set(num2.intValue());
        });
    };
    final Map<String, Gauge> registeredGauges = new HashMap();

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest$TestAsyncFrameworkExceptionHandler.class */
    static class TestAsyncFrameworkExceptionHandler implements StateFutureImpl.AsyncFrameworkExceptionHandler {
        String message = null;
        Throwable exception = null;

        TestAsyncFrameworkExceptionHandler() {
        }

        public void handleException(String str, Throwable th) {
            this.message = str;
            this.exception = th;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest$TestMailboxExecutor.class */
    static class TestMailboxExecutor implements MailboxExecutor {
        Exception lastException = null;
        boolean failWhenExecute;

        public TestMailboxExecutor(boolean z) {
            this.failWhenExecute = false;
            this.failWhenExecute = z;
        }

        public void execute(MailboxExecutor.MailOptions mailOptions, ThrowingRunnable<? extends Exception> throwingRunnable, String str, Object... objArr) {
            if (this.failWhenExecute) {
                throw new RuntimeException("Fail to execute.");
            }
            try {
                throwingRunnable.run();
            } catch (Exception e) {
                this.lastException = e;
            }
        }

        public void yield() throws InterruptedException, FlinkRuntimeException {
        }

        public boolean tryYield() throws FlinkRuntimeException {
            return false;
        }

        public boolean shouldInterrupt() {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest$TestStateExecutor.class */
    public static class TestStateExecutor implements StateExecutor {
        public CompletableFuture<Void> executeBatchRequests(StateRequestContainer stateRequestContainer) {
            Preconditions.checkArgument(stateRequestContainer instanceof MockStateRequestContainer);
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            Iterator<StateRequest<?, ?, ?, ?>> it = ((MockStateRequestContainer) stateRequestContainer).getStateRequestList().iterator();
            while (it.hasNext()) {
                executeRequestSync(it.next());
            }
            completableFuture.complete(null);
            return completableFuture;
        }

        public StateRequestContainer createStateRequestContainer() {
            return new MockStateRequestContainer();
        }

        public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) {
            if (stateRequest.getRequestType() == StateRequestType.VALUE_GET) {
                Preconditions.checkState(stateRequest.getState() != null);
                TestValueState state = stateRequest.getState();
                stateRequest.getFuture().complete(state.underlyingState.get((String) stateRequest.getRecordContext().getKey(), (String) stateRequest.getRecordContext().getNamespace(state)));
            } else {
                if (stateRequest.getRequestType() != StateRequestType.VALUE_UPDATE) {
                    throw new UnsupportedOperationException("Unsupported request type");
                }
                Preconditions.checkState(stateRequest.getState() != null);
                TestValueState state2 = stateRequest.getState();
                state2.underlyingState.update((String) stateRequest.getRecordContext().getKey(), (String) stateRequest.getRecordContext().getNamespace(state2), (Integer) stateRequest.getPayload());
                stateRequest.getFuture().complete((Object) null);
            }
        }

        public boolean fullyLoaded() {
            return false;
        }

        public void shutdown() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest$TestUnderlyingState.class */
    public static class TestUnderlyingState {
        private final HashMap<Tuple2<String, String>, Integer> hashMap = new HashMap<>();

        public Integer get(String str, String str2) {
            return this.hashMap.get(Tuple2.of(str, str2));
        }

        public void update(String str, String str2, Integer num) {
            this.hashMap.put(Tuple2.of(str, str2), num);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest$TestValueState.class */
    public static class TestValueState extends AbstractValueState<String, String, Integer> {
        private final TestUnderlyingState underlyingState;

        public TestValueState(StateRequestHandler stateRequestHandler, TestUnderlyingState testUnderlyingState, ValueStateDescriptor<Integer> valueStateDescriptor) {
            super(stateRequestHandler, valueStateDescriptor);
            this.underlyingState = testUnderlyingState;
            AssertionsForClassTypes.assertThat(getValueSerializer()).isEqualTo(IntSerializer.INSTANCE);
        }
    }

    AsyncExecutionControllerTest() {
    }

    void setup(int i, long j, int i2, MailboxExecutor mailboxExecutor, StateFutureImpl.AsyncFrameworkExceptionHandler asyncFrameworkExceptionHandler, CloseableRegistry closeableRegistry) throws IOException {
        TestStateExecutor testStateExecutor = new TestStateExecutor();
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("test-value-state", BasicTypeInfo.INT_TYPE_INFO);
        StateBackend buildAsyncStateBackend = StateBackendTestUtils.buildAsyncStateBackend(() -> {
            return new TestValueState(this.aec, new TestUnderlyingState(), valueStateDescriptor);
        }, testStateExecutor);
        AssertionsForClassTypes.assertThat(buildAsyncStateBackend.supportsAsyncKeyedStateBackend()).isTrue();
        try {
            AsyncKeyedStateBackend createAsyncKeyedStateBackend = buildAsyncStateBackend.createAsyncKeyedStateBackend((StateBackend.KeyedStateBackendParameters) null);
            closeableRegistry.registerCloseable(createAsyncKeyedStateBackend);
            Objects.requireNonNull(createAsyncKeyedStateBackend);
            closeableRegistry.registerCloseable(createAsyncKeyedStateBackend::dispose);
            this.aec = new AsyncExecutionController<>(mailboxExecutor, asyncFrameworkExceptionHandler, testStateExecutor, new DeclarationManager(), 128, i, j, i2, (AsyncExecutionController.SwitchContextListener) null, new UnregisteredMetricsGroup() { // from class: org.apache.flink.runtime.asyncprocessing.AsyncExecutionControllerTest.1
                String prefix = "";

                public <T, G extends Gauge<T>> G gauge(String str, G g) {
                    AsyncExecutionControllerTest.this.registeredGauges.put(this.prefix + "." + str, g);
                    return g;
                }

                public MetricGroup addGroup(String str) {
                    this.prefix = str;
                    return this;
                }
            }.addGroup("asyncStateProcessing"));
            createAsyncKeyedStateBackend.setup(this.aec);
            try {
                this.valueState = createAsyncKeyedStateBackend.getOrCreateKeyedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
                this.output = new AtomicInteger();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    @Test
    void testBasicRun() throws IOException {
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        setup(100, 10000L, HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), closeableRegistry);
        RecordContext buildContext = this.aec.buildContext("key1-r1", "key1");
        this.aec.setCurrentContext(buildContext);
        this.userCode.run();
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.registeredGauges.get("asyncStateProcessing.numInFlightRecords").getValue()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.registeredGauges.get("asyncStateProcessing.activeBufferSize").getValue()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.registeredGauges.get("asyncStateProcessing.blockingBufferSize").getValue()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.registeredGauges.get("asyncStateProcessing.numBlockingKeys").getValue()).isEqualTo(0);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.output.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(buildContext.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(0);
        RecordContext buildContext2 = this.aec.buildContext("key1-r2", "key1");
        this.aec.setCurrentContext(buildContext2);
        this.userCode.run();
        RecordContext buildContext3 = this.aec.buildContext("key1-r3", "key1");
        this.aec.setCurrentContext(buildContext3);
        this.userCode.run();
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(2);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(this.registeredGauges.get("asyncStateProcessing.numInFlightRecords").getValue()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(this.registeredGauges.get("asyncStateProcessing.activeBufferSize").getValue()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.registeredGauges.get("asyncStateProcessing.blockingBufferSize").getValue()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.registeredGauges.get("asyncStateProcessing.numBlockingKeys").getValue()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.output.get()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(buildContext2.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.output.get()).isEqualTo(3);
        AssertionsForClassTypes.assertThat(buildContext3.getReferenceCount()).isEqualTo(0);
        RecordContext buildContext4 = this.aec.buildContext("key3-r3", "key3");
        this.aec.setCurrentContext(buildContext4);
        this.userCode.run();
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.output.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(buildContext4.getReferenceCount()).isEqualTo(0);
        closeableRegistry.close();
    }

    @Test
    void testNamespace() throws IOException {
        Consumer consumer = str -> {
            this.valueState.setCurrentNamespace(str);
            this.valueState.asyncValue().thenCompose(num -> {
                int intValue = num == null ? 1 : num.intValue() + 1;
                return this.valueState.asyncUpdate(Integer.valueOf(intValue)).thenCompose(r3 -> {
                    return StateFutureUtils.completedFuture(Integer.valueOf(intValue));
                });
            }).thenAccept(num2 -> {
                this.output.set(num2.intValue());
            });
        };
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        setup(100, 10000L, HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), closeableRegistry);
        RecordContext buildContext = this.aec.buildContext("key1-r1", "key1");
        this.aec.setCurrentContext(buildContext);
        consumer.accept("key1-r1");
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.output.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(buildContext.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(0);
        RecordContext buildContext2 = this.aec.buildContext("key1-r2", "key1");
        this.aec.setCurrentContext(buildContext2);
        consumer.accept("key1-r2");
        RecordContext buildContext3 = this.aec.buildContext("key1-r1", "key1");
        this.aec.setCurrentContext(buildContext3);
        consumer.accept("key1-r1");
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(2);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(2);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.output.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(buildContext2.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.output.get()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(buildContext3.getReferenceCount()).isEqualTo(0);
        closeableRegistry.close();
    }

    @Test
    void testRecordsRunInOrder() throws IOException {
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        setup(100, 10000L, HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), closeableRegistry);
        RecordContext buildContext = this.aec.buildContext("key1-r1", "key1");
        this.aec.setCurrentContext(buildContext);
        this.userCode.run();
        RecordContext buildContext2 = this.aec.buildContext("key2-r1", "key2");
        this.aec.setCurrentContext(buildContext2);
        this.userCode.run();
        RecordContext buildContext3 = this.aec.buildContext("key1-r2", "key1");
        this.aec.setCurrentContext(buildContext3);
        this.userCode.run();
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(3);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(3);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.output.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(buildContext.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(buildContext2.getReferenceCount()).isEqualTo(0);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.output.get()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(buildContext3.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(0);
        closeableRegistry.close();
    }

    @Test
    void testInFlightRecordControl() throws IOException {
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        setup(5, 10000L, 10, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), closeableRegistry);
        for (int i = 0; i < 10; i++) {
            for (int i2 = 0; i2 < 5; i2++) {
                this.aec.setCurrentContext(this.aec.buildContext(String.format("key%d-r%d", Integer.valueOf((i * 5) + i2), Integer.valueOf((i * 5) + i2)), String.format("key%d", Integer.valueOf((i * 5) + i2))));
                this.userCode.run();
            }
            AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(0);
            AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
            AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        }
        for (int i3 = 0; i3 < 10; i3++) {
            this.aec.setCurrentContext(this.aec.buildContext(String.format("sameKey-r%d", Integer.valueOf(i3), Integer.valueOf(i3)), "sameKey"));
            this.userCode.run();
        }
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(10);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(10 - 1);
        for (int i4 = 10; i4 < 10 * 10; i4++) {
            this.aec.setCurrentContext(this.aec.buildContext(String.format("sameKey-r%d", Integer.valueOf(i4), Integer.valueOf(i4)), "sameKey"));
            this.userCode.run();
            AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(10 + 1);
            AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
            AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(10);
        }
        closeableRegistry.close();
    }

    @Test
    public void testSyncPoint() throws IOException {
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        setup(HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, 10000L, 6000, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), closeableRegistry);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        RecordContext buildContext = this.aec.buildContext("record", "key");
        this.aec.setCurrentContext(buildContext);
        buildContext.retain();
        AsyncExecutionController<String> asyncExecutionController = this.aec;
        Objects.requireNonNull(atomicInteger);
        asyncExecutionController.syncPointRequestWithCallback(atomicInteger::incrementAndGet, false);
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(buildContext.getReferenceCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        buildContext.release();
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        atomicInteger.set(0);
        this.aec.setCurrentContext(this.aec.buildContext("record1", "occupied"));
        this.userCode.run();
        RecordContext buildContext2 = this.aec.buildContext("record2", "occupied");
        this.aec.setCurrentContext(buildContext2);
        AsyncExecutionController<String> asyncExecutionController2 = this.aec;
        Objects.requireNonNull(atomicInteger);
        asyncExecutionController2.syncPointRequestWithCallback(atomicInteger::incrementAndGet, false);
        buildContext2.retain();
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(buildContext2.getReferenceCount()).isGreaterThan(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(buildContext2.getReferenceCount()).isGreaterThan(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(buildContext2.getReferenceCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        buildContext2.release();
        closeableRegistry.close();
    }

    @Test
    public void testSyncPointWithOverdraft() throws IOException {
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        setup(1, 10000L, 1, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), closeableRegistry);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        RecordContext buildContext = this.aec.buildContext("record1", "occupied");
        this.aec.setCurrentContext(buildContext);
        buildContext.retain();
        this.userCode.run();
        RecordContext buildContext2 = this.aec.buildContext("record2", "occupied");
        this.aec.setCurrentContext(buildContext2);
        AsyncExecutionController<String> asyncExecutionController = this.aec;
        Objects.requireNonNull(atomicInteger);
        asyncExecutionController.syncPointRequestWithCallback(atomicInteger::incrementAndGet, true);
        buildContext2.retain();
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(buildContext2.getReferenceCount()).isGreaterThan(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        buildContext.release();
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(buildContext2.getReferenceCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        buildContext2.release();
        closeableRegistry.close();
    }

    @Test
    void testBufferTimeout() throws Exception {
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        setup(5, HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), closeableRegistry);
        Runnable runnable = () -> {
            this.valueState.asyncValue();
        };
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(0L);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.seqAndTimeout).isNull();
        for (int i = 0; i < 5 - 1; i++) {
            this.aec.setCurrentContext(this.aec.buildContext(String.format("key%d-r%d", Integer.valueOf(i), Integer.valueOf(i)), String.format("key%d", Integer.valueOf(5 + i))));
            runnable.run();
        }
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(0L);
        AssertionsForClassTypes.assertThat((Long) this.aec.stateRequestsBuffer.seqAndTimeout.f0).isEqualTo(0L);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isFalse();
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(5 - 1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(5 - 1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        Thread.sleep(2000L);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isFalse();
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(1L);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.seqAndTimeout).isNull();
        for (int i2 = 0; i2 < 5 - 1; i2++) {
            this.aec.setCurrentContext(this.aec.buildContext(String.format("key%d-r%d", Integer.valueOf(i2), Integer.valueOf(i2)), String.format("key%d", Integer.valueOf(5 + i2))));
            runnable.run();
        }
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(1L);
        AssertionsForClassTypes.assertThat((Long) this.aec.stateRequestsBuffer.seqAndTimeout.f0).isEqualTo(1L);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isFalse();
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(5 - 1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(5 - 1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        for (int i3 = 5 - 1; i3 < 5; i3++) {
            this.aec.setCurrentContext(this.aec.buildContext(String.format("key%d-r%d", Integer.valueOf(i3), Integer.valueOf(i3)), String.format("key%d", Integer.valueOf(5 + i3))));
            runnable.run();
        }
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.currentScheduledFuture.isDone()).isFalse();
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.currentSeq.get()).isEqualTo(2L);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.seqAndTimeout).isNull();
        closeableRegistry.close();
    }

    @Test
    void testUserCodeException() throws IOException {
        TestAsyncFrameworkExceptionHandler testAsyncFrameworkExceptionHandler = new TestAsyncFrameworkExceptionHandler();
        TestMailboxExecutor testMailboxExecutor = new TestMailboxExecutor(false);
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        setup(HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, 10000L, 6000, testMailboxExecutor, testAsyncFrameworkExceptionHandler, closeableRegistry);
        Runnable runnable = () -> {
            this.valueState.asyncValue().thenAccept(num -> {
                throw new FlinkRuntimeException("Artificial exception in user code");
            });
        };
        this.aec.setCurrentContext(this.aec.buildContext("record", "key"));
        runnable.run();
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(testAsyncFrameworkExceptionHandler.exception).isNull();
        AssertionsForClassTypes.assertThat(testAsyncFrameworkExceptionHandler.message).isNull();
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(testMailboxExecutor.lastException).isInstanceOf(FlinkRuntimeException.class);
        AssertionsForClassTypes.assertThat(testMailboxExecutor.lastException.getMessage()).isEqualTo("Artificial exception in user code");
        AssertionsForClassTypes.assertThat(testAsyncFrameworkExceptionHandler.exception).isNull();
        AssertionsForClassTypes.assertThat(testAsyncFrameworkExceptionHandler.message).isNull();
        closeableRegistry.close();
    }

    @Test
    void testFrameworkException() throws IOException {
        TestAsyncFrameworkExceptionHandler testAsyncFrameworkExceptionHandler = new TestAsyncFrameworkExceptionHandler();
        TestMailboxExecutor testMailboxExecutor = new TestMailboxExecutor(true);
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        setup(HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, 10000L, 6000, testMailboxExecutor, testAsyncFrameworkExceptionHandler, closeableRegistry);
        Runnable runnable = () -> {
            this.valueState.asyncValue().thenAccept(num -> {
            });
        };
        this.aec.setCurrentContext(this.aec.buildContext("record", "key"));
        runnable.run();
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(testAsyncFrameworkExceptionHandler.exception).isNull();
        AssertionsForClassTypes.assertThat(testAsyncFrameworkExceptionHandler.message).isNull();
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(testMailboxExecutor.lastException).isNull();
        AssertionsForClassTypes.assertThat(testAsyncFrameworkExceptionHandler.exception).isInstanceOf(RuntimeException.class);
        AssertionsForClassTypes.assertThat(testAsyncFrameworkExceptionHandler.exception.getMessage()).isEqualTo("java.lang.RuntimeException: Fail to execute.");
        AssertionsForClassTypes.assertThat(testAsyncFrameworkExceptionHandler.message).isEqualTo("Caught exception when submitting StateFuture's callback.");
        closeableRegistry.close();
    }

    @Test
    void testEpochManager() throws Exception {
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        setup(HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, 10000L, 6000, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), closeableRegistry);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Runnable runnable = () -> {
            this.valueState.asyncValue().thenAccept(num -> {
                atomicInteger.incrementAndGet();
            });
        };
        RecordContext buildContext = this.aec.buildContext("key1-r1", "key1");
        EpochManager.Epoch epoch = buildContext.getEpoch();
        this.aec.setCurrentContext(buildContext);
        runnable.run();
        RecordContext buildContext2 = this.aec.buildContext("key2-r2", "key2");
        EpochManager.Epoch epoch2 = buildContext2.getEpoch();
        this.aec.setCurrentContext(buildContext2);
        runnable.run();
        AssertionsForClassTypes.assertThat(epoch).isEqualTo(epoch2);
        AssertionsForClassTypes.assertThat(epoch.ongoingRecordCount).isEqualTo(2);
        this.aec.processNonRecord((ThrowingRunnable) null, () -> {
            atomicInteger.incrementAndGet();
        });
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(3);
        AssertionsForClassTypes.assertThat(epoch.ongoingRecordCount).isEqualTo(0);
        closeableRegistry.close();
    }

    @Test
    void testMixEpochMode() throws Exception {
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        setup(HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, 10000L, 6000, new SyncMailboxExecutor(), new TestAsyncFrameworkExceptionHandler(), closeableRegistry);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Runnable runnable = () -> {
            this.valueState.asyncValue().thenAccept(num -> {
                atomicInteger.incrementAndGet();
            });
        };
        RecordContext buildContext = this.aec.buildContext("key1-r1", "key1");
        EpochManager.Epoch epoch = buildContext.getEpoch();
        this.aec.setCurrentContext(buildContext);
        runnable.run();
        this.aec.epochManager.onNonRecord((Runnable) null, () -> {
            atomicInteger.incrementAndGet();
        }, EpochManager.ParallelMode.PARALLEL_BETWEEN_EPOCH);
        AssertionsForClassTypes.assertThat(epoch.ongoingRecordCount).isEqualTo(1);
        RecordContext buildContext2 = this.aec.buildContext("key2-r2", "key2");
        EpochManager.Epoch epoch2 = buildContext2.getEpoch();
        this.aec.setCurrentContext(buildContext2);
        runnable.run();
        AssertionsForClassTypes.assertThat(epoch.ongoingRecordCount).isEqualTo(1);
        AssertionsForClassTypes.assertThat(epoch2.ongoingRecordCount).isEqualTo(1);
        this.aec.epochManager.onNonRecord((Runnable) null, () -> {
            atomicInteger.incrementAndGet();
        }, EpochManager.ParallelMode.PARALLEL_BETWEEN_EPOCH);
        AssertionsForClassTypes.assertThat(epoch.ongoingRecordCount).isEqualTo(1);
        AssertionsForClassTypes.assertThat(epoch2.ongoingRecordCount).isEqualTo(1);
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(0);
        RecordContext buildContext3 = this.aec.buildContext("key3-r3", "key3");
        EpochManager.Epoch epoch3 = buildContext3.getEpoch();
        this.aec.setCurrentContext(buildContext3);
        runnable.run();
        AssertionsForClassTypes.assertThat(epoch.ongoingRecordCount).isEqualTo(1);
        AssertionsForClassTypes.assertThat(epoch2.ongoingRecordCount).isEqualTo(1);
        AssertionsForClassTypes.assertThat(epoch3.ongoingRecordCount).isEqualTo(1);
        this.aec.epochManager.onNonRecord((Runnable) null, () -> {
            atomicInteger.incrementAndGet();
        }, EpochManager.ParallelMode.SERIAL_BETWEEN_EPOCH);
        AssertionsForClassTypes.assertThat(epoch.ongoingRecordCount).isEqualTo(0);
        AssertionsForClassTypes.assertThat(epoch2.ongoingRecordCount).isEqualTo(0);
        AssertionsForClassTypes.assertThat(epoch3.ongoingRecordCount).isEqualTo(0);
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(6);
        closeableRegistry.close();
    }
}
