package ca.uhn.fhir.batch2.coordinator;

import ca.uhn.fhir.batch2.api.ChunkExecutionDetails;
import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.api.IJobStepWorker;
import ca.uhn.fhir.batch2.api.IReductionStepWorker;
import ca.uhn.fhir.batch2.api.JobExecutionFailedException;
import ca.uhn.fhir.batch2.api.JobStepFailedException;
import ca.uhn.fhir.batch2.api.ReductionStepExecutionDetails;
import ca.uhn.fhir.batch2.api.RunOutcome;
import ca.uhn.fhir.batch2.api.StepExecutionDetails;
import ca.uhn.fhir.batch2.api.VoidModel;
import ca.uhn.fhir.batch2.channel.BatchJobSender;
import ca.uhn.fhir.batch2.model.ChunkOutcome;
import ca.uhn.fhir.batch2.model.JobDefinition;
import ca.uhn.fhir.batch2.model.JobDefinitionStep;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.JobWorkCursor;
import ca.uhn.fhir.batch2.model.MarkWorkChunkAsErrorRequest;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.i18n.Msg;
import ca.uhn.fhir.model.api.IModelJson;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ca/uhn/fhir/batch2/coordinator/StepExecutionSvc.class */
public class StepExecutionSvc {
    private static final Logger ourLog = LoggerFactory.getLogger(StepExecutionSvc.class);
    public static final int MAX_CHUNK_ERROR_COUNT = 3;
    private final IJobPersistence myJobPersistence;
    private final BatchJobSender myBatchJobSender;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: ca.uhn.fhir.batch2.coordinator.StepExecutionSvc$1, reason: invalid class name */
    /* loaded from: input_file:ca/uhn/fhir/batch2/coordinator/StepExecutionSvc$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$ca$uhn$fhir$batch2$model$ChunkOutcome$Status = new int[ChunkOutcome.Status.values().length];

        static {
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$ChunkOutcome$Status[ChunkOutcome.Status.SUCCESS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$ChunkOutcome$Status[ChunkOutcome.Status.ABORT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$batch2$model$ChunkOutcome$Status[ChunkOutcome.Status.FAIL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public StepExecutionSvc(IJobPersistence iJobPersistence, BatchJobSender batchJobSender) {
        this.myJobPersistence = iJobPersistence;
        this.myBatchJobSender = batchJobSender;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> JobStepExecutorOutput<PT, IT, OT> doExecution(JobWorkCursor<PT, IT, OT> jobWorkCursor, JobInstance jobInstance, @Nullable WorkChunk workChunk) {
        JobDefinitionStep<PT, IT, OT> currentStep = jobWorkCursor.getCurrentStep();
        JobDefinition<PT> jobDefinition = jobWorkCursor.getJobDefinition();
        String instanceId = jobInstance.getInstanceId();
        Class<IT> inputType = currentStep.getInputType();
        IModelJson parameters = jobInstance.getParameters(jobDefinition.getParametersType());
        IJobStepWorker<PT, IT, OT> jobStepWorker = currentStep.getJobStepWorker();
        BaseDataSink dataSink = getDataSink(jobWorkCursor, jobDefinition, instanceId);
        if (currentStep.isReductionStep()) {
            return new JobStepExecutorOutput<>(executeReductionStep(jobInstance, currentStep, inputType, parameters, dataSink), dataSink);
        }
        Validate.notNull(workChunk);
        return new JobStepExecutorOutput<>(executeStep(getExecutionDetailsForNonReductionStep(workChunk, jobInstance, inputType, parameters), jobStepWorker, dataSink), dataSink);
    }

    protected <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> BaseDataSink<PT, IT, OT> getDataSink(JobWorkCursor<PT, IT, OT> jobWorkCursor, JobDefinition<PT> jobDefinition, String str) {
        return jobWorkCursor.isReductionStep() ? new ReductionStepDataSink(str, jobWorkCursor, jobDefinition, this.myJobPersistence) : jobWorkCursor.isFinalStep() ? new FinalStepDataSink(jobDefinition.getJobDefinitionId(), str, jobWorkCursor.asFinalCursor()) : new JobDataSink(this.myBatchJobSender, this.myJobPersistence, jobDefinition, str, jobWorkCursor);
    }

    private <PT extends IModelJson, IT extends IModelJson> StepExecutionDetails<PT, IT> getExecutionDetailsForNonReductionStep(WorkChunk workChunk, JobInstance jobInstance, Class<IT> cls, PT pt) {
        IModelJson iModelJson = null;
        if (!cls.equals(VoidModel.class)) {
            iModelJson = workChunk.getData(cls);
        }
        return new StepExecutionDetails<>(pt, iModelJson, jobInstance, workChunk.getId());
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:11:0x00a1. Please report as an issue. */
    private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> boolean executeReductionStep(JobInstance jobInstance, JobDefinitionStep<PT, IT, OT> jobDefinitionStep, Class<IT> cls, PT pt, BaseDataSink<PT, IT, OT> baseDataSink) {
        IReductionStepWorker iReductionStepWorker = (IReductionStepWorker) jobDefinitionStep.getJobStepWorker();
        Iterator<WorkChunk> fetchAllWorkChunksForStepIterator = this.myJobPersistence.fetchAllWorkChunksForStepIterator(jobInstance.getInstanceId(), jobDefinitionStep.getStepId());
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        boolean z = false;
        while (fetchAllWorkChunksForStepIterator.hasNext()) {
            WorkChunk next = fetchAllWorkChunksForStepIterator.next();
            if (StatusEnum.getIncompleteStatuses().contains(next.getStatus())) {
                if (arrayList.isEmpty()) {
                    try {
                        switch (AnonymousClass1.$SwitchMap$ca$uhn$fhir$batch2$model$ChunkOutcome$Status[iReductionStepWorker.consume(new ChunkExecutionDetails<>(next.getData(cls), pt, jobInstance.getInstanceId(), next.getId())).getStatuss().ordinal()]) {
                            case 1:
                                arrayList2.add(next.getId());
                                break;
                            case 2:
                                ourLog.error("Processing of work chunk {} resulted in aborting job.", next.getId());
                                arrayList.add(next.getId());
                                z = true;
                                break;
                            case MAX_CHUNK_ERROR_COUNT /* 3 */:
                                this.myJobPersistence.markWorkChunkAsFailed(next.getId(), "Step worker failed to process work chunk " + next.getId());
                                z = true;
                                break;
                        }
                    } catch (Exception e) {
                        String format = String.format("Reduction step failed to execute chunk reduction for chunk %s with exception: %s.", next.getId(), e.getMessage());
                        ourLog.error(format, e);
                        z = true;
                        this.myJobPersistence.markWorkChunkAsFailed(next.getId(), format);
                    }
                } else {
                    arrayList.add(next.getId());
                }
            }
        }
        if (!arrayList2.isEmpty()) {
            this.myJobPersistence.markWorkChunksWithStatusAndWipeData(jobInstance.getInstanceId(), arrayList2, StatusEnum.COMPLETED, null);
        }
        if (!arrayList.isEmpty()) {
            this.myJobPersistence.markWorkChunksWithStatusAndWipeData(jobInstance.getInstanceId(), arrayList, StatusEnum.FAILED, "JOB ABORTED");
        }
        return (arrayList2.isEmpty() || !executeStep(new ReductionStepExecutionDetails(pt, null, jobInstance), iReductionStepWorker, baseDataSink) || z) ? false : true;
    }

    private <PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> boolean executeStep(StepExecutionDetails<PT, IT> stepExecutionDetails, IJobStepWorker<PT, IT, OT> iJobStepWorker, BaseDataSink<PT, IT, OT> baseDataSink) {
        String jobDefinitionId = baseDataSink.getJobDefinitionId();
        String stepId = baseDataSink.getTargetStep().getStepId();
        String chunkId = stepExecutionDetails.getChunkId();
        try {
            RunOutcome run = iJobStepWorker.run(stepExecutionDetails, baseDataSink);
            Validate.notNull(run, "Step theWorker returned null: %s", new Object[]{iJobStepWorker.getClass()});
            if (!stepExecutionDetails.hasAssociatedWorkChunk()) {
                return true;
            }
            int recordsProcessed = run.getRecordsProcessed();
            int recoveredErrorCount = baseDataSink.getRecoveredErrorCount();
            this.myJobPersistence.markWorkChunkAsCompletedAndClearData(chunkId, recordsProcessed);
            if (recoveredErrorCount <= 0) {
                return true;
            }
            this.myJobPersistence.incrementWorkChunkErrorCount(chunkId, recoveredErrorCount);
            return true;
        } catch (JobExecutionFailedException e) {
            ourLog.error("Unrecoverable failure executing job {} step {}", new Object[]{jobDefinitionId, stepId, e});
            if (!stepExecutionDetails.hasAssociatedWorkChunk()) {
                return false;
            }
            this.myJobPersistence.markWorkChunkAsFailed(chunkId, e.toString());
            return false;
        } catch (Exception e2) {
            ourLog.error("Failure executing job {} step {}", new Object[]{jobDefinitionId, stepId, e2});
            if (stepExecutionDetails.hasAssociatedWorkChunk()) {
                MarkWorkChunkAsErrorRequest markWorkChunkAsErrorRequest = new MarkWorkChunkAsErrorRequest();
                markWorkChunkAsErrorRequest.setChunkId(chunkId);
                markWorkChunkAsErrorRequest.setErrorMsg(e2.getMessage());
                Optional<WorkChunk> markWorkChunkAsErroredAndIncrementErrorCount = this.myJobPersistence.markWorkChunkAsErroredAndIncrementErrorCount(markWorkChunkAsErrorRequest);
                if (markWorkChunkAsErroredAndIncrementErrorCount.isPresent()) {
                    WorkChunk workChunk = markWorkChunkAsErroredAndIncrementErrorCount.get();
                    if (workChunk.getErrorCount() > 3) {
                        this.myJobPersistence.markWorkChunkAsFailed(chunkId, "Too many errors: " + workChunk.getErrorCount());
                        return false;
                    }
                }
            }
            throw new JobStepFailedException(Msg.code(2041) + e2.getMessage(), e2);
        } catch (Throwable th) {
            ourLog.error("Unexpected failure executing job {} step {}", new Object[]{jobDefinitionId, stepId, th});
            if (!stepExecutionDetails.hasAssociatedWorkChunk()) {
                return false;
            }
            this.myJobPersistence.markWorkChunkAsFailed(chunkId, th.toString());
            return false;
        }
    }
}
