package org.apache.beam.repackaged.direct_java.runners.fnexecution.jobsubmission;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.repackaged.direct_java.runners.core.construction.graph.PipelineValidator;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.FnService;
import org.apache.beam.repackaged.direct_java.sdk.fn.stream.SynchronizedStreamObserver;
import org.apache.beam.sdk.function.ThrowingConsumer;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusException;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/fnexecution/jobsubmission/InMemoryJobService.class */
public class InMemoryJobService extends JobServiceGrpc.JobServiceImplBase implements FnService {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryJobService.class);
    private final ConcurrentMap<String, JobPreparation> preparations = new ConcurrentHashMap();
    private final ConcurrentMap<String, JobInvocation> invocations = new ConcurrentHashMap();
    private final ConcurrentMap<String, String> stagingSessionTokens = new ConcurrentHashMap();
    private final Endpoints.ApiServiceDescriptor stagingServiceDescriptor;
    private final Function<String, String> stagingServiceTokenProvider;
    private final ThrowingConsumer<Exception, String> cleanupJobFn;
    private final JobInvoker invoker;

    public static InMemoryJobService create(Endpoints.ApiServiceDescriptor apiServiceDescriptor, Function<String, String> function, ThrowingConsumer<Exception, String> throwingConsumer, JobInvoker jobInvoker) {
        return new InMemoryJobService(apiServiceDescriptor, function, throwingConsumer, jobInvoker);
    }

    private InMemoryJobService(Endpoints.ApiServiceDescriptor apiServiceDescriptor, Function<String, String> function, ThrowingConsumer<Exception, String> throwingConsumer, JobInvoker jobInvoker) {
        this.stagingServiceDescriptor = apiServiceDescriptor;
        this.stagingServiceTokenProvider = function;
        this.cleanupJobFn = throwingConsumer;
        this.invoker = jobInvoker;
    }

    public void prepare(JobApi.PrepareJobRequest prepareJobRequest, StreamObserver<JobApi.PrepareJobResponse> streamObserver) {
        try {
            LOG.trace("{} {}", JobApi.PrepareJobRequest.class.getSimpleName(), prepareJobRequest);
            String format = String.format("%s_%s", prepareJobRequest.getJobName(), UUID.randomUUID().toString());
            Struct pipelineOptions = prepareJobRequest.getPipelineOptions();
            if (pipelineOptions == null) {
                throw new NullPointerException("Encountered null pipeline options.");
            }
            LOG.trace("PIPELINE OPTIONS {} {}", pipelineOptions.getClass(), pipelineOptions);
            if (this.preparations.putIfAbsent(format, JobPreparation.builder().setId(format).setPipeline(prepareJobRequest.getPipeline()).setOptions(pipelineOptions).build()) != null) {
                streamObserver.onError(Status.NOT_FOUND.withDescription(String.format("A job with the preparation ID \"%s\" already exists.", format)).asException());
                return;
            }
            String apply = this.stagingServiceTokenProvider.apply(format);
            this.stagingSessionTokens.putIfAbsent(format, apply);
            streamObserver.onNext(JobApi.PrepareJobResponse.newBuilder().setPreparationId(format).setArtifactStagingEndpoint(this.stagingServiceDescriptor).setStagingSessionToken(apply).build());
            streamObserver.onCompleted();
        } catch (Exception e) {
            LOG.error("Could not prepare job with name {}", prepareJobRequest.getJobName(), e);
            streamObserver.onError(Status.INTERNAL.withCause(e).asException());
        }
    }

    public void run(JobApi.RunJobRequest runJobRequest, StreamObserver<JobApi.RunJobResponse> streamObserver) {
        LOG.trace("{} {}", JobApi.RunJobRequest.class.getSimpleName(), runJobRequest);
        String preparationId = runJobRequest.getPreparationId();
        try {
            JobPreparation jobPreparation = this.preparations.get(preparationId);
            if (jobPreparation == null) {
                streamObserver.onError(Status.NOT_FOUND.withDescription(String.format("Unknown Preparation Id \"%s\".", preparationId)).asException());
                return;
            }
            try {
                PipelineValidator.validate(jobPreparation.pipeline());
                JobInvocation invoke = this.invoker.invoke(jobPreparation.pipeline(), jobPreparation.options(), runJobRequest.getRetrievalToken());
                String id = invoke.getId();
                invoke.addStateListener(jobStateEvent -> {
                    if (JobInvocation.isTerminated(jobStateEvent.getState()).booleanValue()) {
                        String str = this.stagingSessionTokens.get(preparationId);
                        this.stagingSessionTokens.remove(preparationId);
                        if (this.cleanupJobFn != null) {
                            try {
                                this.cleanupJobFn.accept(str);
                            } catch (Exception e) {
                                LOG.warn("Failed to remove job staging directory for token {}: {}", str, e);
                            }
                        }
                    }
                });
                invoke.start();
                this.invocations.put(id, invoke);
                streamObserver.onNext(JobApi.RunJobResponse.newBuilder().setJobId(id).build());
                streamObserver.onCompleted();
            } catch (Exception e) {
                LOG.warn("Encountered Unexpected Exception during validation", e);
                streamObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT.withCause(e)));
            }
        } catch (StatusRuntimeException e2) {
            LOG.warn("Encountered Status Exception", e2);
            streamObserver.onError(e2);
        } catch (Exception e3) {
            LOG.error(String.format("Encountered Unexpected Exception for Preparation %s", preparationId), e3);
            streamObserver.onError(Status.INTERNAL.withCause(e3).asException());
        }
    }

    public void getJobs(JobApi.GetJobsRequest getJobsRequest, StreamObserver<JobApi.GetJobsResponse> streamObserver) {
        LOG.trace("{} {}", JobApi.GetJobsRequest.class.getSimpleName(), getJobsRequest);
        try {
            ArrayList arrayList = new ArrayList();
            Iterator<JobInvocation> it = this.invocations.values().iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().toProto());
            }
            streamObserver.onNext(JobApi.GetJobsResponse.newBuilder().addAllJobInfo(arrayList).build());
            streamObserver.onCompleted();
        } catch (Exception e) {
            LOG.error("Encountered Unexpected Exception", e);
            streamObserver.onError(Status.INTERNAL.withCause(e).asException());
        }
    }

    public void getState(JobApi.GetJobStateRequest getJobStateRequest, StreamObserver<JobApi.JobStateEvent> streamObserver) {
        LOG.trace("{} {}", JobApi.GetJobStateRequest.class.getSimpleName(), getJobStateRequest);
        String jobId = getJobStateRequest.getJobId();
        try {
            streamObserver.onNext(getInvocation(jobId).getStateEvent());
            streamObserver.onCompleted();
        } catch (StatusRuntimeException | StatusException e) {
            streamObserver.onError(e);
        } catch (Exception e2) {
            LOG.error(String.format("Encountered Unexpected Exception for Invocation %s", jobId), e2);
            streamObserver.onError(Status.INTERNAL.withCause(e2).asException());
        }
    }

    public void getPipeline(JobApi.GetJobPipelineRequest getJobPipelineRequest, StreamObserver<JobApi.GetJobPipelineResponse> streamObserver) {
        LOG.trace("{} {}", JobApi.GetJobPipelineRequest.class.getSimpleName(), getJobPipelineRequest);
        String jobId = getJobPipelineRequest.getJobId();
        try {
            streamObserver.onNext(JobApi.GetJobPipelineResponse.newBuilder().setPipeline(getInvocation(jobId).getPipeline()).build());
            streamObserver.onCompleted();
        } catch (Exception e) {
            LOG.error(String.format("Encountered Unexpected Exception for Invocation %s", jobId), e);
            streamObserver.onError(Status.INTERNAL.withCause(e).asException());
        } catch (StatusRuntimeException | StatusException e2) {
            streamObserver.onError(e2);
        }
    }

    public void cancel(JobApi.CancelJobRequest cancelJobRequest, StreamObserver<JobApi.CancelJobResponse> streamObserver) {
        LOG.trace("{} {}", JobApi.CancelJobRequest.class.getSimpleName(), cancelJobRequest);
        String jobId = cancelJobRequest.getJobId();
        try {
            JobInvocation invocation = getInvocation(jobId);
            invocation.cancel();
            streamObserver.onNext(JobApi.CancelJobResponse.newBuilder().setState(invocation.getState()).build());
            streamObserver.onCompleted();
        } catch (StatusRuntimeException | StatusException e) {
            streamObserver.onError(e);
        } catch (Exception e2) {
            LOG.error(String.format("Encountered Unexpected Exception for Invocation %s", jobId), e2);
            streamObserver.onError(Status.INTERNAL.withCause(e2).asException());
        }
    }

    public void getStateStream(JobApi.GetJobStateRequest getJobStateRequest, StreamObserver<JobApi.JobStateEvent> streamObserver) {
        LOG.trace("{} {}", JobApi.GetJobStateRequest.class.getSimpleName(), getJobStateRequest);
        String jobId = getJobStateRequest.getJobId();
        try {
            getInvocation(jobId).addStateListener(jobStateEvent -> {
                streamObserver.onNext(jobStateEvent);
                if (JobInvocation.isTerminated(jobStateEvent.getState()).booleanValue()) {
                    streamObserver.onCompleted();
                }
            });
        } catch (Exception e) {
            LOG.error(String.format("Encountered Unexpected Exception for Invocation %s", jobId), e);
            streamObserver.onError(Status.INTERNAL.withCause(e).asException());
        } catch (StatusRuntimeException | StatusException e2) {
            streamObserver.onError(e2);
        }
    }

    public void getMessageStream(JobApi.JobMessagesRequest jobMessagesRequest, StreamObserver<JobApi.JobMessagesResponse> streamObserver) {
        String jobId = jobMessagesRequest.getJobId();
        try {
            JobInvocation invocation = getInvocation(jobId);
            StreamObserver wrapping = SynchronizedStreamObserver.wrapping(streamObserver);
            Consumer<JobApi.JobStateEvent> consumer = jobStateEvent -> {
                wrapping.onNext(JobApi.JobMessagesResponse.newBuilder().setStateResponse(jobStateEvent).build());
                if (JobInvocation.isTerminated(jobStateEvent.getState()).booleanValue()) {
                    streamObserver.onCompleted();
                }
            };
            Consumer<JobApi.JobMessage> consumer2 = jobMessage -> {
                wrapping.onNext(JobApi.JobMessagesResponse.newBuilder().setMessageResponse(jobMessage).build());
            };
            invocation.addStateListener(consumer);
            invocation.addMessageListener(consumer2);
        } catch (StatusRuntimeException | StatusException e) {
            streamObserver.onError(e);
        } catch (Exception e2) {
            LOG.error(String.format("Encountered Unexpected Exception for Invocation %s", jobId), e2);
            streamObserver.onError(Status.INTERNAL.withCause(e2).asException());
        }
    }

    public void getJobMetrics(JobApi.GetJobMetricsRequest getJobMetricsRequest, StreamObserver<JobApi.GetJobMetricsResponse> streamObserver) {
        String jobId = getJobMetricsRequest.getJobId();
        LOG.info("Getting job metrics for {}", jobId);
        try {
            streamObserver.onNext(JobApi.GetJobMetricsResponse.newBuilder().setMetrics(getInvocation(jobId).getMetrics()).build());
            streamObserver.onCompleted();
        } catch (Exception e) {
            LOG.error(String.format("Encountered exception for job invocation %s", jobId), e);
            streamObserver.onError(Status.INTERNAL.withCause(e).asException());
        } catch (StatusRuntimeException | StatusException e2) {
            streamObserver.onError(e2);
        }
        LOG.info("Finished getting job metrics for {}", jobId);
    }

    public void describePipelineOptions(JobApi.DescribePipelineOptionsRequest describePipelineOptionsRequest, StreamObserver<JobApi.DescribePipelineOptionsResponse> streamObserver) {
        LOG.trace("{} {}", JobApi.DescribePipelineOptionsRequest.class.getSimpleName(), describePipelineOptionsRequest);
        try {
            streamObserver.onNext(JobApi.DescribePipelineOptionsResponse.newBuilder().addAllOptions(PipelineOptionsFactory.describe(PipelineOptionsFactory.getRegisteredOptions())).build());
            streamObserver.onCompleted();
        } catch (Exception e) {
            LOG.error("Error describing pipeline options", e);
            streamObserver.onError(Status.INTERNAL.withCause(e).asException());
        }
    }

    @Override // org.apache.beam.repackaged.direct_java.runners.fnexecution.FnService, java.lang.AutoCloseable
    public void close() throws Exception {
    }

    private JobInvocation getInvocation(String str) throws StatusException {
        JobInvocation jobInvocation = this.invocations.get(str);
        if (jobInvocation == null) {
            throw Status.NOT_FOUND.asException();
        }
        return jobInvocation;
    }
}
