package org.apache.flink.runtime.asyncprocessing;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Objects;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.core.state.StateFutureImpl;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.EpochManager;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.class */
public class ContextStateFutureImplTest {

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest$SingleStepRunner.class */
    public static class SingleStepRunner {
        private final LinkedList<ThrowingRunnable<? extends Exception>> runnables = new LinkedList<>();

        public void submit(ThrowingRunnable<? extends Exception> throwingRunnable) {
            this.runnables.add(throwingRunnable);
        }

        public boolean runThrough() {
            boolean z = false;
            while (true) {
                boolean z2 = z;
                if (this.runnables.isEmpty()) {
                    return z2;
                }
                ThrowingRunnable.unchecked(this.runnables.poll()).run();
                z = true;
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest$TestAsyncFrameworkExceptionHandler.class */
    static class TestAsyncFrameworkExceptionHandler implements StateFutureImpl.AsyncFrameworkExceptionHandler {
        String message;
        Throwable exception;

        TestAsyncFrameworkExceptionHandler() {
        }

        public void handleException(String str, Throwable th) {
            this.message = str;
            this.exception = th;
        }
    }

    @Test
    public void testThenApply() {
        SingleStepRunner singleStepRunner = new SingleStepRunner();
        RecordContext buildRecordContext = buildRecordContext("a", "b");
        TestAsyncFrameworkExceptionHandler testAsyncFrameworkExceptionHandler = new TestAsyncFrameworkExceptionHandler();
        Objects.requireNonNull(singleStepRunner);
        ContextStateFutureImpl contextStateFutureImpl = new ContextStateFutureImpl(singleStepRunner::submit, testAsyncFrameworkExceptionHandler, buildRecordContext);
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(1);
        contextStateFutureImpl.thenApply(r3 -> {
            return 1L;
        });
        contextStateFutureImpl.complete((Object) null);
        Assertions.assertThat(singleStepRunner.runThrough()).isTrue();
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(0);
        Objects.requireNonNull(singleStepRunner);
        ContextStateFutureImpl contextStateFutureImpl2 = new ContextStateFutureImpl(singleStepRunner::submit, testAsyncFrameworkExceptionHandler, buildRecordContext);
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(1);
        contextStateFutureImpl2.complete((Object) null);
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(0);
        contextStateFutureImpl2.thenApply(r32 -> {
            return 1L;
        });
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(0);
        Assertions.assertThat(singleStepRunner.runThrough()).isFalse();
        ContextStateFutureImpl contextStateFutureImpl3 = new ContextStateFutureImpl(throwingRunnable -> {
            singleStepRunner.submit(() -> {
                throwingRunnable.run();
            });
        }, testAsyncFrameworkExceptionHandler, buildRecordContext);
        contextStateFutureImpl3.thenApply(r4 -> {
            throw new FlinkRuntimeException("Artificial exception for thenApply().");
        });
        contextStateFutureImpl3.complete((Object) null);
        try {
            singleStepRunner.runThrough();
            Assertions.fail("Should throw an exception.");
        } catch (Throwable th) {
            Assertions.assertThat(th).isInstanceOf(FlinkRuntimeException.class);
            Assertions.assertThat(th.getMessage()).isEqualTo("Artificial exception for thenApply().");
        }
    }

    @Test
    public void testThenAccept() {
        SingleStepRunner singleStepRunner = new SingleStepRunner();
        RecordContext buildRecordContext = buildRecordContext("a", "b");
        TestAsyncFrameworkExceptionHandler testAsyncFrameworkExceptionHandler = new TestAsyncFrameworkExceptionHandler();
        Objects.requireNonNull(singleStepRunner);
        ContextStateFutureImpl contextStateFutureImpl = new ContextStateFutureImpl(singleStepRunner::submit, testAsyncFrameworkExceptionHandler, buildRecordContext);
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(1);
        contextStateFutureImpl.thenAccept(r1 -> {
        });
        contextStateFutureImpl.complete((Object) null);
        Assertions.assertThat(singleStepRunner.runThrough()).isTrue();
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(0);
        Objects.requireNonNull(singleStepRunner);
        ContextStateFutureImpl contextStateFutureImpl2 = new ContextStateFutureImpl(singleStepRunner::submit, testAsyncFrameworkExceptionHandler, buildRecordContext);
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(1);
        contextStateFutureImpl2.complete((Object) null);
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(0);
        contextStateFutureImpl2.thenAccept(r12 -> {
        });
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(0);
        Assertions.assertThat(singleStepRunner.runThrough()).isFalse();
        ContextStateFutureImpl contextStateFutureImpl3 = new ContextStateFutureImpl(throwingRunnable -> {
            singleStepRunner.submit(() -> {
                throwingRunnable.run();
            });
        }, testAsyncFrameworkExceptionHandler, buildRecordContext);
        try {
            contextStateFutureImpl3.complete((Object) null);
            contextStateFutureImpl3.thenAccept(r4 -> {
                throw new FlinkRuntimeException("Artificial exception for thenAccept().");
            });
            Assertions.fail("Should throw an exception.");
        } catch (Throwable th) {
            Assertions.assertThat(th).isInstanceOf(FlinkRuntimeException.class);
            Assertions.assertThat(th.getMessage()).isEqualTo("Artificial exception for thenAccept().");
            Assertions.assertThat(testAsyncFrameworkExceptionHandler.exception).isNull();
        }
    }

    @Test
    public void testThenCompose() {
        SingleStepRunner singleStepRunner = new SingleStepRunner();
        RecordContext buildRecordContext = buildRecordContext("a", "b");
        TestAsyncFrameworkExceptionHandler testAsyncFrameworkExceptionHandler = new TestAsyncFrameworkExceptionHandler();
        Objects.requireNonNull(singleStepRunner);
        ContextStateFutureImpl contextStateFutureImpl = new ContextStateFutureImpl(singleStepRunner::submit, testAsyncFrameworkExceptionHandler, buildRecordContext);
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(1);
        contextStateFutureImpl.thenCompose(r3 -> {
            return StateFutureUtils.completedFuture(1L);
        });
        contextStateFutureImpl.complete((Object) null);
        Assertions.assertThat(singleStepRunner.runThrough()).isTrue();
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(0);
        Objects.requireNonNull(singleStepRunner);
        ContextStateFutureImpl contextStateFutureImpl2 = new ContextStateFutureImpl(singleStepRunner::submit, testAsyncFrameworkExceptionHandler, buildRecordContext);
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(1);
        contextStateFutureImpl2.complete((Object) null);
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(0);
        contextStateFutureImpl2.thenCompose(r32 -> {
            return StateFutureUtils.completedFuture(1L);
        });
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(0);
        Assertions.assertThat(singleStepRunner.runThrough()).isFalse();
        ContextStateFutureImpl contextStateFutureImpl3 = new ContextStateFutureImpl(throwingRunnable -> {
            singleStepRunner.submit(() -> {
                throwingRunnable.run();
            });
        }, testAsyncFrameworkExceptionHandler, buildRecordContext);
        contextStateFutureImpl3.thenCompose(r4 -> {
            throw new FlinkException("Artificial exception for thenCompose().");
        });
        contextStateFutureImpl3.complete((Object) null);
        try {
            singleStepRunner.runThrough();
            Assertions.fail("Should throw an exception.");
        } catch (Throwable th) {
            Assertions.assertThat(th).isInstanceOf(RuntimeException.class);
            Assertions.assertThat(th.getMessage()).isEqualTo("org.apache.flink.util.FlinkException: Artificial exception for thenCompose().");
        }
    }

    @Test
    public void testThenCombine() {
        SingleStepRunner singleStepRunner = new SingleStepRunner();
        RecordContext buildRecordContext = buildRecordContext("a", "b");
        TestAsyncFrameworkExceptionHandler testAsyncFrameworkExceptionHandler = new TestAsyncFrameworkExceptionHandler();
        Objects.requireNonNull(singleStepRunner);
        ContextStateFutureImpl contextStateFutureImpl = new ContextStateFutureImpl(singleStepRunner::submit, testAsyncFrameworkExceptionHandler, buildRecordContext);
        Objects.requireNonNull(singleStepRunner);
        ContextStateFutureImpl contextStateFutureImpl2 = new ContextStateFutureImpl(singleStepRunner::submit, testAsyncFrameworkExceptionHandler, buildRecordContext);
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(2);
        contextStateFutureImpl.thenCombine(contextStateFutureImpl2, (r3, r4) -> {
            return 1L;
        });
        contextStateFutureImpl.complete((Object) null);
        contextStateFutureImpl2.complete((Object) null);
        Assertions.assertThat(singleStepRunner.runThrough()).isTrue();
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(0);
        Objects.requireNonNull(singleStepRunner);
        ContextStateFutureImpl contextStateFutureImpl3 = new ContextStateFutureImpl(singleStepRunner::submit, testAsyncFrameworkExceptionHandler, buildRecordContext);
        Objects.requireNonNull(singleStepRunner);
        ContextStateFutureImpl contextStateFutureImpl4 = new ContextStateFutureImpl(singleStepRunner::submit, testAsyncFrameworkExceptionHandler, buildRecordContext);
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(2);
        contextStateFutureImpl3.complete((Object) null);
        contextStateFutureImpl3.thenCombine(contextStateFutureImpl4, (r32, r42) -> {
            return 1L;
        });
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isGreaterThan(1);
        contextStateFutureImpl4.complete((Object) null);
        Assertions.assertThat(singleStepRunner.runThrough()).isTrue();
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(0);
        Objects.requireNonNull(singleStepRunner);
        ContextStateFutureImpl contextStateFutureImpl5 = new ContextStateFutureImpl(singleStepRunner::submit, testAsyncFrameworkExceptionHandler, buildRecordContext);
        Objects.requireNonNull(singleStepRunner);
        ContextStateFutureImpl contextStateFutureImpl6 = new ContextStateFutureImpl(singleStepRunner::submit, testAsyncFrameworkExceptionHandler, buildRecordContext);
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(2);
        contextStateFutureImpl6.complete((Object) null);
        contextStateFutureImpl5.thenCombine(contextStateFutureImpl6, (r33, r43) -> {
            return 1L;
        });
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isGreaterThan(1);
        contextStateFutureImpl5.complete((Object) null);
        Assertions.assertThat(singleStepRunner.runThrough()).isTrue();
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(0);
        Objects.requireNonNull(singleStepRunner);
        ContextStateFutureImpl contextStateFutureImpl7 = new ContextStateFutureImpl(singleStepRunner::submit, testAsyncFrameworkExceptionHandler, buildRecordContext);
        Objects.requireNonNull(singleStepRunner);
        ContextStateFutureImpl contextStateFutureImpl8 = new ContextStateFutureImpl(singleStepRunner::submit, testAsyncFrameworkExceptionHandler, buildRecordContext);
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(2);
        contextStateFutureImpl7.complete((Object) null);
        contextStateFutureImpl8.complete((Object) null);
        contextStateFutureImpl7.thenCombine(contextStateFutureImpl8, (r34, r44) -> {
            return 1L;
        });
        Assertions.assertThat(singleStepRunner.runThrough()).isFalse();
        Assertions.assertThat(buildRecordContext.getReferenceCount()).isEqualTo(0);
    }

    @Test
    public void testComplex() {
        SingleStepRunner singleStepRunner = new SingleStepRunner();
        RecordContext buildRecordContext = buildRecordContext("a", "b");
        TestAsyncFrameworkExceptionHandler testAsyncFrameworkExceptionHandler = new TestAsyncFrameworkExceptionHandler();
        for (int i = 0; i < 32; i++) {
            ArrayList arrayList = new ArrayList(6);
            for (int i2 = 0; i2 < 5; i2++) {
                Objects.requireNonNull(singleStepRunner);
                ContextStateFutureImpl contextStateFutureImpl = new ContextStateFutureImpl(singleStepRunner::submit, testAsyncFrameworkExceptionHandler, buildRecordContext);
                arrayList.add(contextStateFutureImpl);
                if (((i >>> i2) & 1) == 1) {
                    contextStateFutureImpl.complete((Object) null);
                }
            }
            StateFutureUtils.combineAll(Arrays.asList((ContextStateFutureImpl) arrayList.get(0), (ContextStateFutureImpl) arrayList.get(1), (ContextStateFutureImpl) arrayList.get(2))).thenCombine((StateFuture) arrayList.get(3), (collection, r4) -> {
                return 1L;
            }).thenCompose(l -> {
                return (StateFuture) arrayList.get(4);
            }).thenApply(r3 -> {
                return 2L;
            }).thenAccept(l2 -> {
            });
            for (int i3 = 0; i3 < 5; i3++) {
                if (((i >>> i3) & 1) == 0) {
                    ((ContextStateFutureImpl) arrayList.get(i3)).complete((Object) null);
                }
            }
            if (i == 31) {
                Assertions.assertThat(buildRecordContext.getReferenceCount()).withFailMessage("The reference counted tests fail for profile id %d", new Object[]{Integer.valueOf(i)}).isEqualTo(0);
                Assertions.assertThat(singleStepRunner.runThrough()).withFailMessage("The reference counted tests fail for profile id %d", new Object[]{Integer.valueOf(i)}).isFalse();
            } else {
                Assertions.assertThat(buildRecordContext.getReferenceCount()).withFailMessage("The reference counted tests fail for profile id %d", new Object[]{Integer.valueOf(i)}).isGreaterThan(0);
                Assertions.assertThat(singleStepRunner.runThrough()).withFailMessage("The reference counted tests fail for profile id %d", new Object[]{Integer.valueOf(i)}).isTrue();
                Assertions.assertThat(buildRecordContext.getReferenceCount()).withFailMessage("The reference counted tests fail for profile id %d", new Object[]{Integer.valueOf(i)}).isEqualTo(0);
            }
        }
    }

    private <K> RecordContext<K> buildRecordContext(Object obj, K k) {
        return new RecordContext<>(obj, k, recordContext -> {
        }, KeyGroupRangeAssignment.assignToKeyGroup(k, 128), new EpochManager.Epoch(0L), 0);
    }
}
