package org.kie.kogito.jobs.service.stream;

import io.smallrye.reactive.messaging.annotations.Broadcast;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler;
import org.kie.kogito.jobs.service.utils.ErrorHandling;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/kie/kogito/jobs/service/stream/JobStreamsEventPublisher.class */
public class JobStreamsEventPublisher implements JobEventPublisher {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobStreamsEventPublisher.class);

    @Inject
    ReactiveJobScheduler scheduler;

    @Inject
    @Channel(AvailableStreams.JOB_ERROR)
    @OnOverflow(OnOverflow.Strategy.NONE)
    Emitter<JobExecutionResponse> jobErrorEmitter;

    @Inject
    @Channel(AvailableStreams.JOB_SUCCESS)
    @OnOverflow(OnOverflow.Strategy.NONE)
    Emitter<JobExecutionResponse> jobSuccessEmitter;

    @Inject
    @Channel(AvailableStreams.JOB_STATUS_CHANGE)
    @OnOverflow(OnOverflow.Strategy.NONE)
    Emitter<JobDetails> jobStatusChangeEmitter;

    @Override // org.kie.kogito.jobs.service.stream.JobEventPublisher
    public JobExecutionResponse publishJobError(JobExecutionResponse jobExecutionResponse) {
        this.jobErrorEmitter.send(jobExecutionResponse);
        return jobExecutionResponse;
    }

    @Override // org.kie.kogito.jobs.service.stream.JobEventPublisher
    public JobExecutionResponse publishJobSuccess(JobExecutionResponse jobExecutionResponse) {
        this.jobSuccessEmitter.send(jobExecutionResponse);
        return jobExecutionResponse;
    }

    @Override // org.kie.kogito.jobs.service.stream.JobEventPublisher
    public JobDetails publishJobStatusChange(JobDetails jobDetails) {
        this.jobStatusChangeEmitter.send(jobDetails);
        return jobDetails;
    }

    @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
    @Incoming(AvailableStreams.JOB_ERROR_EVENTS)
    public CompletionStage<Boolean> jobErrorProcessor(JobExecutionResponse jobExecutionResponse) {
        LOGGER.warn("Error received {}", jobExecutionResponse);
        ReactiveJobScheduler reactiveJobScheduler = this.scheduler;
        Objects.requireNonNull(reactiveJobScheduler);
        return ErrorHandling.skipErrorPublisherBuilder(reactiveJobScheduler::handleJobExecutionError, jobExecutionResponse).findFirst().run().thenApply((v0) -> {
            return v0.isPresent();
        }).exceptionally(th -> {
            LOGGER.error("Error handling error {}", jobExecutionResponse, th);
            return false;
        });
    }

    @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
    @Incoming(AvailableStreams.JOB_SUCCESS_EVENTS)
    public CompletionStage<Boolean> jobSuccessProcessor(JobExecutionResponse jobExecutionResponse) {
        LOGGER.debug("Success received to be processed {}", jobExecutionResponse);
        ReactiveJobScheduler reactiveJobScheduler = this.scheduler;
        Objects.requireNonNull(reactiveJobScheduler);
        return ErrorHandling.skipErrorPublisherBuilder(reactiveJobScheduler::handleJobExecutionSuccess, jobExecutionResponse).findFirst().run().thenApply((v0) -> {
            return v0.isPresent();
        }).exceptionally(th -> {
            LOGGER.error("Error handling error {}", jobExecutionResponse, th);
            return false;
        });
    }

    @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
    @Outgoing(AvailableStreams.JOB_ERROR_EVENTS)
    @Incoming(AvailableStreams.JOB_ERROR)
    public JobExecutionResponse jobErrorBroadcast(JobExecutionResponse jobExecutionResponse) {
        LOGGER.debug("Error broadcast published {}", jobExecutionResponse);
        return jobExecutionResponse;
    }

    @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
    @Outgoing(AvailableStreams.JOB_SUCCESS_EVENTS)
    @Broadcast
    @Incoming(AvailableStreams.JOB_SUCCESS)
    public JobExecutionResponse jobSuccessBroadcast(JobExecutionResponse jobExecutionResponse) {
        LOGGER.debug("Success broadcast published {}", jobExecutionResponse);
        return jobExecutionResponse;
    }

    @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
    @Outgoing(AvailableStreams.JOB_STATUS_CHANGE_EVENTS)
    @Broadcast
    @Incoming(AvailableStreams.JOB_STATUS_CHANGE)
    public JobDetails jobStatusChangeBroadcast(JobDetails jobDetails) {
        LOGGER.debug("Status change broadcast for Job {}", jobDetails);
        return jobDetails;
    }
}
