package org.apache.flink.core.state;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;

@Internal
/* loaded from: input_file:org/apache/flink/core/state/StateFutureImpl.class */
public class StateFutureImpl<T> implements InternalStateFuture<T> {
    private final CompletableFuture<T> completableFuture = new CompletableFuture<>();
    protected final CallbackRunner callbackRunner;
    protected final AsyncFrameworkExceptionHandler exceptionHandler;

    /* loaded from: input_file:org/apache/flink/core/state/StateFutureImpl$AsyncFrameworkExceptionHandler.class */
    public interface AsyncFrameworkExceptionHandler {
        void handleException(String str, Throwable th);
    }

    /* loaded from: input_file:org/apache/flink/core/state/StateFutureImpl$CallbackRunner.class */
    public interface CallbackRunner {
        void submit(ThrowingRunnable<? extends Exception> throwingRunnable);
    }

    public StateFutureImpl(CallbackRunner callbackRunner, AsyncFrameworkExceptionHandler asyncFrameworkExceptionHandler) {
        this.callbackRunner = callbackRunner;
        this.exceptionHandler = asyncFrameworkExceptionHandler;
    }

    public <U> StateFuture<U> thenApply(FunctionWithException<? super T, ? extends U, ? extends Exception> functionWithException) {
        callbackRegistered();
        if (!this.completableFuture.isDone()) {
            StateFutureImpl makeNewStateFuture = makeNewStateFuture();
            this.completableFuture.thenAccept((Consumer) obj -> {
                this.callbackRunner.submit(() -> {
                    makeNewStateFuture.completeInCallbackRunner(functionWithException.apply(obj));
                    callbackFinished();
                });
            }).exceptionally(th -> {
                this.exceptionHandler.handleException("Caught exception when submitting StateFuture's callback.", th);
                return null;
            });
            return makeNewStateFuture;
        }
        try {
            Object apply = FunctionWithException.unchecked(functionWithException).apply(this.completableFuture.get());
            callbackFinished();
            return StateFutureUtils.completedFuture(apply);
        } catch (Exception e) {
            this.exceptionHandler.handleException("Caught exception when processing completed StateFuture's callback.", e);
            return null;
        }
    }

    public StateFuture<Void> thenAccept(ThrowingConsumer<? super T, ? extends Exception> throwingConsumer) {
        callbackRegistered();
        if (!this.completableFuture.isDone()) {
            StateFutureImpl<A> makeNewStateFuture = makeNewStateFuture();
            this.completableFuture.thenAccept((Consumer) obj -> {
                this.callbackRunner.submit(() -> {
                    throwingConsumer.accept(obj);
                    makeNewStateFuture.completeInCallbackRunner(null);
                    callbackFinished();
                });
            }).exceptionally(th -> {
                this.exceptionHandler.handleException("Caught exception when submitting StateFuture's callback.", th);
                return null;
            });
            return makeNewStateFuture;
        }
        try {
            ThrowingConsumer.unchecked(throwingConsumer).accept(this.completableFuture.get());
            callbackFinished();
            return StateFutureUtils.completedVoidFuture();
        } catch (Exception e) {
            this.exceptionHandler.handleException("Caught exception when processing completed StateFuture's callback.", e);
            return null;
        }
    }

    public <U> StateFuture<U> thenCompose(FunctionWithException<? super T, ? extends StateFuture<U>, ? extends Exception> functionWithException) {
        callbackRegistered();
        if (!this.completableFuture.isDone()) {
            StateFutureImpl makeNewStateFuture = makeNewStateFuture();
            this.completableFuture.thenAccept((Consumer) obj -> {
                this.callbackRunner.submit(() -> {
                    StateFuture stateFuture = (StateFuture) functionWithException.apply(obj);
                    Objects.requireNonNull(makeNewStateFuture);
                    stateFuture.thenAccept(makeNewStateFuture::completeInCallbackRunner);
                    callbackFinished();
                });
            }).exceptionally(th -> {
                this.exceptionHandler.handleException("Caught exception when submitting StateFuture's callback.", th);
                return null;
            });
            return makeNewStateFuture;
        }
        try {
            T t = this.completableFuture.get();
            callbackFinished();
            return (StateFuture) FunctionWithException.unchecked(functionWithException).apply(t);
        } catch (Throwable th2) {
            this.exceptionHandler.handleException("Caught exception when processing completed StateFuture's callback.", th2);
            return null;
        }
    }

    public <U, V> StateFuture<V> thenCombine(StateFuture<? extends U> stateFuture, BiFunctionWithException<? super T, ? super U, ? extends V, ? extends Exception> biFunctionWithException) {
        callbackRegistered();
        if (!this.completableFuture.isDone()) {
            StateFutureImpl makeNewStateFuture = makeNewStateFuture();
            ((InternalStateFuture) stateFuture).thenSyncAccept(obj -> {
                this.completableFuture.thenAccept((Consumer) obj -> {
                    this.callbackRunner.submit(() -> {
                        makeNewStateFuture.completeInCallbackRunner(biFunctionWithException.apply(obj, obj));
                        callbackFinished();
                    });
                }).exceptionally(th -> {
                    this.exceptionHandler.handleException("Caught exception when submitting StateFuture's callback.", th);
                    return null;
                });
            });
            return makeNewStateFuture;
        }
        try {
            T t = this.completableFuture.get();
            return stateFuture.thenCompose(obj2 -> {
                Object apply = biFunctionWithException.apply(t, obj2);
                callbackFinished();
                return StateFutureUtils.completedFuture(apply);
            });
        } catch (Throwable th) {
            this.exceptionHandler.handleException("Caught exception when submitting StateFuture's callback.", th);
            return null;
        }
    }

    public <A> StateFutureImpl<A> makeNewStateFuture() {
        return new StateFutureImpl<>(this.callbackRunner, this.exceptionHandler);
    }

    @Override // org.apache.flink.core.state.InternalStateFuture
    public void complete(T t) {
        if (this.completableFuture.isCompletedExceptionally()) {
            throw new IllegalStateException("StateFuture already failed !");
        }
        this.completableFuture.complete(t);
        postComplete(false);
    }

    @Override // org.apache.flink.core.state.InternalStateFuture
    public void completeExceptionally(String str, Throwable th) {
        this.exceptionHandler.handleException(str, th);
    }

    private void completeInCallbackRunner(T t) {
        this.completableFuture.complete(t);
        postComplete(true);
    }

    public void callbackRegistered() {
    }

    public void postComplete(boolean z) {
    }

    public void callbackFinished() {
    }

    @Override // org.apache.flink.core.state.InternalStateFuture
    public void thenSyncAccept(ThrowingConsumer<? super T, ? extends Exception> throwingConsumer) {
        this.completableFuture.thenAccept((Consumer) ThrowingConsumer.unchecked(throwingConsumer)).exceptionally(th -> {
            this.exceptionHandler.handleException("Caught exception when processing completed StateFuture's callback.", th);
            return null;
        });
    }
}
