package org.apache.flink.runtime.state.v2;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestContainer;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.v2.internal.InternalKeyedState;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

/* loaded from: input_file:org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.class */
public class AbstractKeyedStateTestBase {
    AsyncExecutionController aec;
    TestStateExecutor testStateExecutor;
    AtomicReference<Throwable> exception;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase$TestAsyncStateBackend.class */
    public static class TestAsyncStateBackend implements StateBackend {
        TestAsyncStateBackend() {
        }

        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> keyedStateBackendParameters) throws Exception {
            throw new UnsupportedOperationException("Don't support createKeyedStateBackend yet");
        }

        public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters operatorStateBackendParameters) throws Exception {
            throw new UnsupportedOperationException("Don't support createOperatorStateBackend yet");
        }

        public boolean supportsAsyncKeyedStateBackend() {
            return true;
        }

        public <K> AsyncKeyedStateBackend<K> createAsyncKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> keyedStateBackendParameters) {
            return new AsyncKeyedStateBackend<K>() { // from class: org.apache.flink.runtime.state.v2.AbstractKeyedStateTestBase.TestAsyncStateBackend.1
                @Nonnull
                public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String str, @Nonnull TypeSerializer<T> typeSerializer) {
                    throw new UnsupportedOperationException("Not support for test yet.");
                }

                @Nonnull
                public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
                    throw new UnsupportedOperationException("Not support yet");
                }

                public void notifyCheckpointComplete(long j) throws Exception {
                    throw new UnsupportedOperationException("Not support yet");
                }

                public void notifyCheckpointSubsumed(long j) throws Exception {
                    throw new UnsupportedOperationException("Not support yet");
                }

                public void close() throws IOException {
                }

                public void setup(@Nonnull StateRequestHandler stateRequestHandler) {
                }

                public <N, S extends State, SV> S getOrCreateKeyedState(N n, TypeSerializer<N> typeSerializer, StateDescriptor<SV> stateDescriptor) throws Exception {
                    return null;
                }

                @Nonnull
                public <N, S extends InternalKeyedState, SV> S createStateInternal(@Nonnull N n, @Nonnull TypeSerializer<N> typeSerializer, @Nonnull StateDescriptor<SV> stateDescriptor) throws Exception {
                    return null;
                }

                public StateExecutor createStateExecutor() {
                    return new TestStateExecutor();
                }

                public KeyGroupRange getKeyGroupRange() {
                    return new KeyGroupRange(0, 127);
                }

                public void dispose() {
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase$TestStateExecutor.class */
    static class TestStateExecutor implements StateExecutor {
        private Deque<StateRequest<?, ?, ?, ?>> receivedRequest = new ConcurrentLinkedDeque();

        /* loaded from: input_file:org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase$TestStateExecutor$TestStateRequestContainer.class */
        static class TestStateRequestContainer implements StateRequestContainer {
            ArrayList<StateRequest<?, ?, ?, ?>> requests = new ArrayList<>();

            TestStateRequestContainer() {
            }

            public void offer(StateRequest<?, ?, ?, ?> stateRequest) {
                this.requests.add(stateRequest);
            }

            public boolean isEmpty() {
                return this.requests.isEmpty();
            }
        }

        TestStateExecutor() {
        }

        <IN> void validate(@Nullable State state, StateRequestType stateRequestType, @Nullable IN in) {
            AssertionsForClassTypes.assertThat(this.receivedRequest.isEmpty()).isFalse();
            StateRequest<?, ?, ?, ?> pop = this.receivedRequest.pop();
            AssertionsForClassTypes.assertThat(pop.getState()).isEqualTo(state);
            AssertionsForClassTypes.assertThat(pop.getRequestType()).isEqualTo(stateRequestType);
            AssertionsForClassTypes.assertThat(pop.getPayload()).isEqualTo(in);
        }

        public CompletableFuture<Void> executeBatchRequests(StateRequestContainer stateRequestContainer) {
            Iterator<StateRequest<?, ?, ?, ?>> it = ((TestStateRequestContainer) stateRequestContainer).requests.iterator();
            while (it.hasNext()) {
                executeRequestSync(it.next());
            }
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            completableFuture.complete(null);
            return completableFuture;
        }

        public StateRequestContainer createStateRequestContainer() {
            return new TestStateRequestContainer();
        }

        public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) {
            this.receivedRequest.add(stateRequest);
            if (stateRequest.getRequestType() == StateRequestType.MAP_CONTAINS || stateRequest.getRequestType() == StateRequestType.MAP_IS_EMPTY) {
                stateRequest.getFuture().complete(true);
            } else {
                stateRequest.getFuture().complete((Object) null);
            }
        }

        public boolean fullyLoaded() {
            return false;
        }

        public void shutdown() {
        }
    }

    @BeforeEach
    void setup() {
        this.testStateExecutor = (TestStateExecutor) createStateExecutor();
        this.aec = new AsyncExecutionController(new SyncMailboxExecutor(), (str, th) -> {
            this.exception.set(th);
        }, this.testStateExecutor, new DeclarationManager(), 1, 1, 1000L, 1, (AsyncExecutionController.SwitchContextListener) null, (MetricGroup) null);
        this.exception = new AtomicReference<>(null);
    }

    @AfterEach
    void after() {
        AssertionsForClassTypes.assertThat(this.exception.get()).isNull();
    }

    private StateExecutor createStateExecutor() {
        TestAsyncStateBackend testAsyncStateBackend = new TestAsyncStateBackend();
        AssertionsForClassTypes.assertThat(testAsyncStateBackend.supportsAsyncKeyedStateBackend()).isTrue();
        return testAsyncStateBackend.createAsyncKeyedStateBackend(null).createStateExecutor();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <IN> void validateRequestRun(@Nullable State state, StateRequestType stateRequestType, @Nullable IN in, int i) {
        this.aec.triggerIfNeeded(true);
        this.testStateExecutor.validate(state, stateRequestType, in);
        AssertionsForClassTypes.assertThat(this.testStateExecutor.receivedRequest.size()).isEqualTo(i);
    }
}
