package org.kie.kogito.jobs.service.job;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import mutiny.zero.flow.adapters.AdaptersToFlow;
import org.kie.kogito.jobs.service.exception.JobExecutionException;
import org.kie.kogito.jobs.service.executor.JobExecutor;
import org.kie.kogito.jobs.service.executor.JobExecutorResolver;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobDetailsContext;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler;
import org.kie.kogito.jobs.service.utils.ErrorHandling;
import org.kie.kogito.timer.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/kogito/jobs/service/job/DelegateJob.class */
public class DelegateJob implements Job<JobDetailsContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DelegateJob.class);
    private final JobExecutorResolver jobExecutorResolver;
    ReactiveJobScheduler scheduler;

    public DelegateJob(JobExecutorResolver jobExecutorResolver, ReactiveJobScheduler reactiveJobScheduler) {
        this.jobExecutorResolver = jobExecutorResolver;
        this.scheduler = reactiveJobScheduler;
    }

    public void execute(JobDetailsContext jobDetailsContext) {
        AtomicReference atomicReference = new AtomicReference();
        JobDetails jobDetails = (JobDetails) Objects.requireNonNull(jobDetailsContext.getJobDetails(), (Supplier<String>) () -> {
            return String.format("JobDetails cannot be null for context: %s", jobDetailsContext);
        });
        JobExecutor jobExecutor = (JobExecutor) Objects.requireNonNull(this.jobExecutorResolver.get(jobDetails), (Supplier<String>) () -> {
            return String.format("No JobExecutor was found for jobDetails: %s", jobDetails);
        });
        LOGGER.info("Executing job for context: {}", jobDetails);
        jobExecutor.execute(jobDetails).flatMap(jobExecutionResponse -> {
            atomicReference.set(jobExecutionResponse);
            return handleJobExecutionSuccess(jobExecutionResponse);
        }).onFailure(JobExecutionException.class).recoverWithUni(th -> {
            atomicReference.set(JobExecutionResponse.builder().message(th.getMessage()).now().jobId(((JobExecutionException) th).getJobId()).build());
            return handleJobExecutionError((JobExecutionResponse) atomicReference.get());
        }).runSubscriptionOn(Infrastructure.getDefaultWorkerPool()).subscribe().with(jobDetails2 -> {
            LOGGER.info("Job execution response processing has finished: {}", atomicReference.get());
        });
    }

    public Uni<JobDetails> handleJobExecutionSuccess(JobExecutionResponse jobExecutionResponse) {
        LOGGER.debug("Job execution success response received: {}", jobExecutionResponse);
        UniCreate createFrom = Uni.createFrom();
        ReactiveJobScheduler reactiveJobScheduler = this.scheduler;
        Objects.requireNonNull(reactiveJobScheduler);
        return createFrom.publisher(AdaptersToFlow.publisher(ErrorHandling.skipErrorPublisherBuilder(reactiveJobScheduler::handleJobExecutionSuccess, jobExecutionResponse).buildRs()));
    }

    public Uni<JobDetails> handleJobExecutionError(JobExecutionResponse jobExecutionResponse) {
        LOGGER.error("Job execution error response received: {}", jobExecutionResponse);
        UniCreate createFrom = Uni.createFrom();
        ReactiveJobScheduler reactiveJobScheduler = this.scheduler;
        Objects.requireNonNull(reactiveJobScheduler);
        return createFrom.publisher(AdaptersToFlow.publisher(ErrorHandling.skipErrorPublisherBuilder(reactiveJobScheduler::handleJobExecutionError, jobExecutionResponse).buildRs()));
    }
}
