package org.apache.flink.runtime.asyncprocessing;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.class */
public class AbstractStateIteratorTest {

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest$TestIteratorStateExecutor.class */
    static class TestIteratorStateExecutor implements StateExecutor {
        final int limit;
        final int step;
        AsyncExecutionController aec;
        int current = 0;
        AtomicInteger processedCount = new AtomicInteger(0);

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest$TestIteratorStateExecutor$TestIterator.class */
        public static class TestIterator extends AbstractStateIterator<Integer> {
            final int current;
            final int limit;

            public TestIterator(State state, StateRequestType stateRequestType, AsyncExecutionController asyncExecutionController, Collection<Integer> collection, int i, int i2) {
                super(state, stateRequestType, asyncExecutionController, collection);
                this.current = i;
                this.limit = i2;
            }

            public boolean hasNextLoading() {
                return this.current < this.limit;
            }

            protected Object nextPayloadForContinuousLoading() {
                return this;
            }
        }

        public TestIteratorStateExecutor(int i, int i2) {
            this.limit = i;
            this.step = i2;
        }

        public void bindAec(AsyncExecutionController asyncExecutionController) {
            this.aec = asyncExecutionController;
        }

        public CompletableFuture<Void> executeBatchRequests(StateRequestContainer stateRequestContainer) {
            Preconditions.checkArgument(stateRequestContainer instanceof MockStateRequestContainer);
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            Iterator<StateRequest<?, ?, ?, ?>> it = ((MockStateRequestContainer) stateRequestContainer).getStateRequestList().iterator();
            while (it.hasNext()) {
                executeRequestSync(it.next());
            }
            completableFuture.complete(null);
            return completableFuture;
        }

        public StateRequestContainer createStateRequestContainer() {
            return new MockStateRequestContainer();
        }

        public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) {
            if (stateRequest.getRequestType() == StateRequestType.MAP_ITER) {
                ArrayList arrayList = new ArrayList(this.step);
                for (int i = 0; this.current < this.limit && i < this.step; i++) {
                    int i2 = this.current;
                    this.current = i2 + 1;
                    arrayList.add(Integer.valueOf(i2));
                }
                stateRequest.getFuture().complete(new TestIterator(stateRequest.getState(), stateRequest.getRequestType(), this.aec, arrayList, this.current, this.limit));
            } else if (stateRequest.getRequestType() == StateRequestType.ITERATOR_LOADING) {
                Assertions.assertThat(stateRequest.getPayload()).isInstanceOf(TestIterator.class);
                Assertions.assertThat(((TestIterator) stateRequest.getPayload()).current).isEqualTo(this.current);
                ArrayList arrayList2 = new ArrayList(this.step);
                for (int i3 = 0; this.current < this.limit && i3 < this.step; i3++) {
                    int i4 = this.current;
                    this.current = i4 + 1;
                    arrayList2.add(Integer.valueOf(i4));
                }
                stateRequest.getFuture().complete(new TestIterator(stateRequest.getState(), ((TestIterator) stateRequest.getPayload()).getRequestType(), this.aec, arrayList2, this.current, this.limit));
            } else {
                org.junit.jupiter.api.Assertions.fail("Unsupported request type " + stateRequest.getRequestType());
            }
            this.processedCount.incrementAndGet();
        }

        public boolean fullyLoaded() {
            return false;
        }

        public void shutdown() {
        }
    }

    @Test
    public void testPartialLoading() {
        TestIteratorStateExecutor testIteratorStateExecutor = new TestIteratorStateExecutor(100, 3);
        AsyncExecutionController asyncExecutionController = new AsyncExecutionController(new SyncMailboxExecutor(), (str, th) -> {
        }, testIteratorStateExecutor, new DeclarationManager(), 1, 100, 1000L, 1, (AsyncExecutionController.SwitchContextListener) null, (MetricGroup) null);
        testIteratorStateExecutor.bindAec(asyncExecutionController);
        asyncExecutionController.setCurrentContext(asyncExecutionController.buildContext("1", "key1"));
        AtomicInteger atomicInteger = new AtomicInteger();
        asyncExecutionController.handleRequest((State) null, StateRequestType.MAP_ITER, (Object) null).thenAccept(obj -> {
            Assertions.assertThat(obj).isInstanceOf(StateIterator.class);
            ((StateIterator) obj).onNext(num -> {
                Assertions.assertThat(num).isEqualTo(atomicInteger.getAndIncrement());
            }).thenAccept(r4 -> {
                Assertions.assertThat(atomicInteger.get()).isEqualTo(100);
            });
        });
        asyncExecutionController.drainInflightRecords(0);
    }

    @Test
    public void testPartialLoadingWithReturnValue() {
        TestIteratorStateExecutor testIteratorStateExecutor = new TestIteratorStateExecutor(100, 3);
        AsyncExecutionController asyncExecutionController = new AsyncExecutionController(new SyncMailboxExecutor(), (str, th) -> {
        }, testIteratorStateExecutor, new DeclarationManager(), 1, 100, 1000L, 1, (AsyncExecutionController.SwitchContextListener) null, (MetricGroup) null);
        testIteratorStateExecutor.bindAec(asyncExecutionController);
        asyncExecutionController.setCurrentContext(asyncExecutionController.buildContext("1", "key1"));
        AtomicInteger atomicInteger = new AtomicInteger();
        asyncExecutionController.handleRequest((State) null, StateRequestType.MAP_ITER, (Object) null).thenAccept(obj -> {
            Assertions.assertThat(obj).isInstanceOf(StateIterator.class);
            ((StateIterator) obj).onNext(num -> {
                Assertions.assertThat(num).isEqualTo(atomicInteger.getAndIncrement());
                return StateFutureUtils.completedFuture(String.valueOf(num));
            }).thenAccept(collection -> {
                Assertions.assertThat(atomicInteger.get()).isEqualTo(100);
                int i = 0;
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    int i2 = i;
                    i++;
                    Assertions.assertThat((String) it.next()).isEqualTo(String.valueOf(i2));
                }
            });
        });
        asyncExecutionController.drainInflightRecords(0);
    }

    @Test
    public void testPartialLoadingWithConversionToIterable() {
        TestIteratorStateExecutor testIteratorStateExecutor = new TestIteratorStateExecutor(100, 3);
        AsyncExecutionController asyncExecutionController = new AsyncExecutionController(new SyncMailboxExecutor(), (str, th) -> {
        }, testIteratorStateExecutor, new DeclarationManager(), 1, 100, 1000L, 1, (AsyncExecutionController.SwitchContextListener) null, (MetricGroup) null);
        testIteratorStateExecutor.bindAec(asyncExecutionController);
        asyncExecutionController.setCurrentContext(asyncExecutionController.buildContext("1", "key1"));
        AtomicInteger atomicInteger = new AtomicInteger();
        StateFutureUtils.toIterable(asyncExecutionController.handleRequest((State) null, StateRequestType.MAP_ITER, (Object) null)).thenAccept(obj -> {
            Assertions.assertThat(obj instanceof Iterable);
            ((Iterable) obj).forEach(num -> {
                Assertions.assertThat(num).isEqualTo(atomicInteger.getAndIncrement());
            });
            Assertions.assertThat(atomicInteger.get()).isEqualTo(100);
        });
        asyncExecutionController.drainInflightRecords(0);
    }
}
