package org.kie.kogito.jobs.service.stream;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.kie.kogito.jobs.service.adapter.ScheduledJobAdapter;
import org.kie.kogito.jobs.service.events.JobDataEvent;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kie/kogito/jobs/service/stream/AbstractJobStreams.class */
public abstract class AbstractJobStreams implements JobStreams {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJobStreams.class);
    protected ObjectMapper objectMapper;
    protected boolean enabled;
    protected Emitter<String> emitter;
    protected String url;

    protected AbstractJobStreams() {
    }

    protected AbstractJobStreams(ObjectMapper objectMapper, boolean z, Emitter<String> emitter, String str) {
        this.objectMapper = objectMapper;
        this.enabled = z;
        this.emitter = emitter;
        this.url = str;
    }

    @Override // org.kie.kogito.jobs.service.stream.JobStreams
    public boolean isEnabled() {
        return this.enabled;
    }

    @Override // org.kie.kogito.jobs.service.stream.JobStreams
    public void jobStatusChange(JobDetails jobDetails) {
        if (isEnabled()) {
            try {
                JobDataEvent buildEvent = buildEvent(jobDetails);
                LOGGER.debug("emit jobStatusChange, hasRequests: {}, eventId: {}, jobDetails: {}", new Object[]{Boolean.valueOf(this.emitter.hasRequests()), buildEvent.getId(), jobDetails});
                this.emitter.send(decorate(ContextAwareMessage.of(this.objectMapper.writeValueAsString(buildEvent)).withAck(() -> {
                    return onAck(buildEvent.getId(), jobDetails);
                }).withNack(th -> {
                    return onNack(th, jobDetails);
                }), buildEvent));
            } catch (Exception e) {
                LOGGER.error(String.format("An unexpected error was produced while processing a Job status change for the job: %s", jobDetails), e);
            }
        }
    }

    protected JobDataEvent buildEvent(JobDetails jobDetails) {
        return JobDataEvent.builder().source(this.url + "/jobs").data(ScheduledJobAdapter.of(jobDetails)).build();
    }

    protected CompletionStage<Void> onAck(String str, JobDetails jobDetails) {
        LOGGER.debug("Job Status change emitted successfully, eventId: {}, jobDetails: {}", str, jobDetails);
        return CompletableFuture.completedFuture(null);
    }

    protected CompletionStage<Void> onNack(Throwable th, JobDetails jobDetails) {
        LOGGER.error(String.format("An error was produced while emitting a Job status change for the job: %s", jobDetails), th);
        return CompletableFuture.completedFuture(null);
    }

    protected Message<String> decorate(Message<String> message, JobDataEvent jobDataEvent) {
        return message;
    }
}
