package org.apache.flink.statefun.sdk.java.handler;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.flink.statefun.sdk.java.Address;
import org.apache.flink.statefun.sdk.java.StatefulFunction;
import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
import org.apache.flink.statefun.sdk.java.TypeName;
import org.apache.flink.statefun.sdk.java.ValueSpec;
import org.apache.flink.statefun.sdk.java.annotations.Internal;
import org.apache.flink.statefun.sdk.java.message.MessageWrapper;
import org.apache.flink.statefun.sdk.java.slice.Slice;
import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
import org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage;
import org.apache.flink.statefun.sdk.java.storage.StateValueContexts;
import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;

@Internal
/* loaded from: input_file:org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandler.class */
public final class ConcurrentRequestReplyHandler implements RequestReplyHandler {
    private final Map<TypeName, StatefulFunctionSpec> functionSpecs;

    public ConcurrentRequestReplyHandler(Map<TypeName, StatefulFunctionSpec> map) {
        this.functionSpecs = (Map) Objects.requireNonNull(map);
    }

    @Override // org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler
    public CompletableFuture<Slice> handle(Slice slice) {
        try {
            return handleInternally(ToFunction.parseFrom(SliceProtobufUtil.asByteString(slice))).thenApply(fromFunction -> {
                return SliceProtobufUtil.asSlice(fromFunction.toByteString());
            });
        } catch (Throwable th) {
            return MoreFutures.exceptional(th);
        }
    }

    CompletableFuture<FromFunction> handleInternally(ToFunction toFunction) {
        if (!toFunction.hasInvocation()) {
            return CompletableFuture.completedFuture(FromFunction.getDefaultInstance());
        }
        ToFunction.InvocationBatchRequest invocation = toFunction.getInvocation();
        Address sdkAddressFromProto = ProtoUtils.sdkAddressFromProto(invocation.getTarget());
        StatefulFunctionSpec statefulFunctionSpec = this.functionSpecs.get(sdkAddressFromProto.type());
        if (statefulFunctionSpec == null) {
            throw new IllegalStateException("Unknown target type " + sdkAddressFromProto);
        }
        Supplier<? extends StatefulFunction> supplier = statefulFunctionSpec.supplier();
        if (supplier == null) {
            throw new NullPointerException("missing function supplier for " + sdkAddressFromProto);
        }
        StatefulFunction statefulFunction = supplier.get();
        if (statefulFunction == null) {
            throw new NullPointerException("supplier for " + sdkAddressFromProto + " supplied NULL function.");
        }
        StateValueContexts.ResolutionResult resolve = StateValueContexts.resolve(statefulFunctionSpec.knownValues(), invocation.getStateList());
        return resolve.hasMissingValues() ? CompletableFuture.completedFuture(buildIncompleteInvocationResponse(resolve.missingValues())) : executeBatch(invocation, sdkAddressFromProto, new ConcurrentAddressScopedStorage(resolve.resolved()), statefulFunction);
    }

    private CompletableFuture<FromFunction> executeBatch(ToFunction.InvocationBatchRequest invocationBatchRequest, Address address, ConcurrentAddressScopedStorage concurrentAddressScopedStorage, StatefulFunction statefulFunction) {
        ConcurrentContext concurrentContext = new ConcurrentContext(address, FromFunction.InvocationResponse.newBuilder(), concurrentAddressScopedStorage);
        return MoreFutures.applySequentially(invocationBatchRequest.getInvocationsList(), invocation -> {
            return apply(statefulFunction, concurrentContext, invocation);
        }).thenApply(r5 -> {
            return finalizeResponse(concurrentAddressScopedStorage, concurrentContext.finalBuilder());
        });
    }

    private static FromFunction buildIncompleteInvocationResponse(List<ValueSpec<?>> list) {
        FromFunction.IncompleteInvocationContext.Builder newBuilder = FromFunction.IncompleteInvocationContext.newBuilder();
        Iterator<ValueSpec<?>> it = list.iterator();
        while (it.hasNext()) {
            newBuilder.addMissingValues(ProtoUtils.protoFromValueSpec(it.next()));
        }
        return FromFunction.newBuilder().setIncompleteInvocationContext(newBuilder).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static CompletableFuture<Void> apply(StatefulFunction statefulFunction, ConcurrentContext concurrentContext, ToFunction.Invocation invocation) throws Throwable {
        MessageWrapper messageWrapper = new MessageWrapper(concurrentContext.self(), invocation.getArgument());
        concurrentContext.setCaller(ProtoUtils.sdkAddressFromProto(invocation.getCaller()));
        CompletableFuture<Void> apply = statefulFunction.apply(concurrentContext, messageWrapper);
        if (apply == null) {
            throw new IllegalStateException("User function " + concurrentContext.self() + " has returned a NULL future.");
        }
        return apply;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static FromFunction finalizeResponse(ConcurrentAddressScopedStorage concurrentAddressScopedStorage, FromFunction.InvocationResponse.Builder builder) {
        builder.getClass();
        concurrentAddressScopedStorage.addMutations(builder::addStateMutations);
        return FromFunction.newBuilder().setInvocationResult(builder).build();
    }
}
