/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.jobs.service.scheduler.impl;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.kie.kogito.jobs.service.executor.JobExecutorResolver;
import org.kie.kogito.jobs.service.job.DelegateJob;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobDetailsContext;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.model.ManageableJobHandle;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.BaseTimerJobScheduler;
import org.kie.kogito.jobs.service.scheduler.impl.VertxTimerServiceScheduler;
import org.kie.kogito.jobs.service.stream.JobStreams;
import org.kie.kogito.jobs.service.utils.ErrorHandling;
import org.kie.kogito.timer.JobContext;
import org.kie.kogito.timer.Trigger;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class TimerDelegateJobScheduler
extends BaseTimerJobScheduler {
    private static final Logger LOGGER = LoggerFactory.getLogger(TimerDelegateJobScheduler.class);
    private JobExecutorResolver jobExecutorResolver;
    private VertxTimerServiceScheduler delegate;
    private JobStreams jobStreams;

    protected TimerDelegateJobScheduler() {
    }

    @Inject
    public TimerDelegateJobScheduler(ReactiveJobRepository jobRepository, @ConfigProperty(name="kogito.jobs-service.backoffRetryMillis", defaultValue="1000") long backoffRetryMillis, @ConfigProperty(name="kogito.jobs-service.maxIntervalLimitToRetryMillis", defaultValue="60000") long maxIntervalLimitToRetryMillis, @ConfigProperty(name="kogito.jobs-service.schedulerChunkInMinutes", defaultValue="10") long schedulerChunkInMinutes, @ConfigProperty(name="kogito.jobs-service.forceExecuteExpiredJobs", defaultValue="true") boolean forceExecuteExpiredJobs, JobExecutorResolver jobExecutorResolver, VertxTimerServiceScheduler delegate, JobStreams jobStreams) {
        super(jobRepository, backoffRetryMillis, maxIntervalLimitToRetryMillis, schedulerChunkInMinutes, forceExecuteExpiredJobs);
        this.jobExecutorResolver = jobExecutorResolver;
        this.delegate = delegate;
        this.jobStreams = jobStreams;
    }

    @Override
    public PublisherBuilder<ManageableJobHandle> doSchedule(JobDetails job, Optional<Trigger> trigger) {
        LOGGER.debug("Job Scheduling {}", (Object)job);
        return ReactiveStreams.of((Object)job).map(j -> this.delegate.scheduleJob(new DelegateJob(this.jobExecutorResolver, this.jobStreams), (JobContext)new JobDetailsContext(j), trigger.orElse(j.getTrigger())));
    }

    @Override
    public Publisher<ManageableJobHandle> doCancel(JobDetails scheduledJob) {
        return ReactiveStreams.of((Object)scheduledJob).map(JobDetails::getScheduledId).filter(Objects::nonNull).map(scheduledId -> {
            ManageableJobHandle handle = new ManageableJobHandle(scheduledId);
            handle.setCancel(this.delegate.removeJob(handle));
            return handle;
        }).buildRs();
    }

    @Incoming(value="job-error-events")
    @Acknowledgment(value=Acknowledgment.Strategy.PRE_PROCESSING)
    public CompletionStage<Boolean> jobErrorProcessor(JobExecutionResponse response) {
        LOGGER.warn("Error received {}", (Object)response);
        return ErrorHandling.skipErrorPublisherBuilder(this::handleJobExecutionError, response).findFirst().run().thenApply(Optional::isPresent).exceptionally(e -> {
            LOGGER.error("Error handling error {}", (Object)response, e);
            return false;
        });
    }

    @Incoming(value="job-success-events")
    @Acknowledgment(value=Acknowledgment.Strategy.PRE_PROCESSING)
    public CompletionStage<Boolean> jobSuccessProcessor(JobExecutionResponse response) {
        LOGGER.debug("Success received to be processed {}", (Object)response);
        return ErrorHandling.skipErrorPublisherBuilder(this::handleJobExecutionSuccess, response).findFirst().run().thenApply(Optional::isPresent).exceptionally(e -> {
            LOGGER.error("Error handling error {}", (Object)response, e);
            return false;
        });
    }
}

