package org.apache.flink.statefun.sdk.java.handler;

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import org.apache.flink.statefun.sdk.java.Address;
import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
import org.apache.flink.statefun.sdk.java.Context;
import org.apache.flink.statefun.sdk.java.TypeName;
import org.apache.flink.statefun.sdk.java.message.EgressMessage;
import org.apache.flink.statefun.sdk.java.message.Message;
import org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage;
import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/statefun/sdk/java/handler/ConcurrentContext.class */
public final class ConcurrentContext implements Context {
    private final Address self;
    private final FromFunction.InvocationResponse.Builder responseBuilder;
    private final ConcurrentAddressScopedStorage storage;
    private boolean noFurtherModificationsAllowed;
    private Address caller;

    public ConcurrentContext(Address address, FromFunction.InvocationResponse.Builder builder, ConcurrentAddressScopedStorage concurrentAddressScopedStorage) {
        this.self = (Address) Objects.requireNonNull(address);
        this.responseBuilder = (FromFunction.InvocationResponse.Builder) Objects.requireNonNull(builder);
        this.storage = (ConcurrentAddressScopedStorage) Objects.requireNonNull(concurrentAddressScopedStorage);
    }

    @Override // org.apache.flink.statefun.sdk.java.Context
    public Address self() {
        return this.self;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setCaller(Address address) {
        this.caller = address;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FromFunction.InvocationResponse.Builder finalBuilder() {
        FromFunction.InvocationResponse.Builder builder;
        synchronized (this.responseBuilder) {
            this.noFurtherModificationsAllowed = true;
            builder = this.responseBuilder;
        }
        return builder;
    }

    @Override // org.apache.flink.statefun.sdk.java.Context
    public Optional<Address> caller() {
        return Optional.ofNullable(this.caller);
    }

    @Override // org.apache.flink.statefun.sdk.java.Context
    public void send(Message message) {
        Objects.requireNonNull(message);
        FromFunction.Invocation build = FromFunction.Invocation.newBuilder().setArgument(ProtoUtils.getTypedValue(message)).setTarget(ProtoUtils.protoAddressFromSdk(message.targetAddress())).build();
        synchronized (this.responseBuilder) {
            checkNotDone();
            this.responseBuilder.addOutgoingMessages(build);
        }
    }

    @Override // org.apache.flink.statefun.sdk.java.Context
    public void sendAfter(Duration duration, Message message) {
        Objects.requireNonNull(duration);
        Objects.requireNonNull(message);
        FromFunction.DelayedInvocation build = FromFunction.DelayedInvocation.newBuilder().setArgument(ProtoUtils.getTypedValue(message)).setTarget(ProtoUtils.protoAddressFromSdk(message.targetAddress())).setDelayInMs(duration.toMillis()).build();
        synchronized (this.responseBuilder) {
            checkNotDone();
            this.responseBuilder.addDelayedInvocations(build);
        }
    }

    @Override // org.apache.flink.statefun.sdk.java.Context
    public void sendAfter(Duration duration, String str, Message message) {
        Objects.requireNonNull(duration);
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("message cancellation token can not be empty or null.");
        }
        Objects.requireNonNull(message);
        FromFunction.DelayedInvocation build = FromFunction.DelayedInvocation.newBuilder().setArgument(ProtoUtils.getTypedValue(message)).setTarget(ProtoUtils.protoAddressFromSdk(message.targetAddress())).setDelayInMs(duration.toMillis()).setCancellationToken(str).build();
        synchronized (this.responseBuilder) {
            checkNotDone();
            this.responseBuilder.addDelayedInvocations(build);
        }
    }

    @Override // org.apache.flink.statefun.sdk.java.Context
    public void cancelDelayedMessage(String str) {
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("message cancellation token can not be empty or null.");
        }
        FromFunction.DelayedInvocation build = FromFunction.DelayedInvocation.newBuilder().setIsCancellationRequest(true).setCancellationToken(str).build();
        synchronized (this.responseBuilder) {
            checkNotDone();
            this.responseBuilder.addDelayedInvocations(build);
        }
    }

    @Override // org.apache.flink.statefun.sdk.java.Context
    public void send(EgressMessage egressMessage) {
        Objects.requireNonNull(egressMessage);
        TypeName targetEgressId = egressMessage.targetEgressId();
        FromFunction.EgressMessage build = FromFunction.EgressMessage.newBuilder().setArgument(ProtoUtils.getTypedValue(egressMessage)).setEgressNamespace(targetEgressId.namespace()).setEgressType(targetEgressId.name()).build();
        synchronized (this.responseBuilder) {
            checkNotDone();
            this.responseBuilder.addOutgoingEgresses(build);
        }
    }

    @Override // org.apache.flink.statefun.sdk.java.Context
    public AddressScopedStorage storage() {
        return this.storage;
    }

    private void checkNotDone() {
        if (this.noFurtherModificationsAllowed) {
            throw new IllegalStateException("Function has already completed its execution.");
        }
    }
}
