package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner;
import org.apache.druid.indexing.common.task.batch.parallel.SubTaskReport;
import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;

/* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.class */
public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTaskReportType extends SubTaskReport> implements ParallelIndexTaskRunner<SubTaskType, SubTaskReportType> {
    private static final Logger LOG = new Logger(ParallelIndexPhaseRunner.class);
    private final TaskToolbox toolbox;
    private final String taskId;
    private final String groupId;
    private final ParallelIndexTuningConfig tuningConfig;
    private final Map<String, Object> context;
    private final int maxNumConcurrentSubTasks;
    private final IndexingServiceClient indexingServiceClient;
    private volatile boolean subTaskScheduleAndMonitorStopped;
    private volatile TaskMonitor<SubTaskType> taskMonitor;
    private final BlockingQueue<TaskMonitor.SubTaskCompleteEvent<SubTaskType>> taskCompleteEvents = new LinkedBlockingDeque();
    private final ConcurrentHashMap<String, SubTaskReportType> reportsMap = new ConcurrentHashMap<>();
    private int nextSpecId = 0;

    /* renamed from: org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexPhaseRunner$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$druid$indexer$TaskState = new int[TaskState.values().length];

        static {
            try {
                $SwitchMap$org$apache$druid$indexer$TaskState[TaskState.SUCCESS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$druid$indexer$TaskState[TaskState.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner$CountingSubTaskSpecIterator.class */
    private class CountingSubTaskSpecIterator implements Iterator<SubTaskSpec<SubTaskType>> {
        private final Iterator<SubTaskSpec<SubTaskType>> delegate;
        private int count;

        private CountingSubTaskSpecIterator(Iterator<SubTaskSpec<SubTaskType>> it) {
            this.delegate = it;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.delegate.hasNext();
        }

        @Override // java.util.Iterator
        public SubTaskSpec<SubTaskType> next() {
            if (!this.delegate.hasNext()) {
                throw new NoSuchElementException();
            }
            this.count++;
            return this.delegate.next();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ParallelIndexPhaseRunner(TaskToolbox taskToolbox, String str, String str2, ParallelIndexTuningConfig parallelIndexTuningConfig, Map<String, Object> map, IndexingServiceClient indexingServiceClient) {
        this.toolbox = taskToolbox;
        this.taskId = str;
        this.groupId = str2;
        this.tuningConfig = parallelIndexTuningConfig;
        this.context = map;
        this.maxNumConcurrentSubTasks = parallelIndexTuningConfig.getMaxNumConcurrentSubTasks();
        this.indexingServiceClient = (IndexingServiceClient) Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
    }

    abstract Iterator<SubTaskSpec<SubTaskType>> subTaskSpecIterator() throws IOException;

    abstract int estimateTotalNumSubTasks() throws IOException;

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public TaskState run() throws Exception {
        CountingSubTaskSpecIterator countingSubTaskSpecIterator = new CountingSubTaskSpecIterator(subTaskSpecIterator());
        if (!countingSubTaskSpecIterator.hasNext()) {
            LOG.warn("There's no input split to process", new Object[0]);
            return TaskState.SUCCESS;
        }
        long taskStatusCheckPeriodMs = this.tuningConfig.getTaskStatusCheckPeriodMs();
        this.taskMonitor = new TaskMonitor<>((IndexingServiceClient) Preconditions.checkNotNull(this.indexingServiceClient, "indexingServiceClient"), this.tuningConfig.getMaxRetry(), estimateTotalNumSubTasks());
        TaskState taskState = TaskState.RUNNING;
        this.taskMonitor.start(taskStatusCheckPeriodMs);
        try {
            LOG.info("Submitting initial tasks", new Object[0]);
            while (isRunning() && countingSubTaskSpecIterator.hasNext() && this.taskMonitor.getNumRunningTasks() < this.maxNumConcurrentSubTasks) {
                submitNewTask(this.taskMonitor, countingSubTaskSpecIterator.next());
            }
            LOG.info("Waiting for subTasks to be completed", new Object[0]);
            while (isRunning()) {
                TaskMonitor.SubTaskCompleteEvent<SubTaskType> poll = this.taskCompleteEvents.poll(taskStatusCheckPeriodMs, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    TaskState lastState = poll.getLastState();
                    switch (AnonymousClass2.$SwitchMap$org$apache$druid$indexer$TaskState[lastState.ordinal()]) {
                        case Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK /* 1 */:
                            TaskStatusPlus lastStatus = poll.getLastStatus();
                            if (lastStatus != null) {
                                if (!this.reportsMap.containsKey(lastStatus.getId())) {
                                    throw new ISE("Missing reports from task[%s]!", new Object[]{lastStatus.getId()});
                                }
                                if (countingSubTaskSpecIterator.hasNext()) {
                                    if (this.taskMonitor.getNumRunningTasks() >= this.maxNumConcurrentSubTasks) {
                                        break;
                                    } else {
                                        submitNewTask(this.taskMonitor, countingSubTaskSpecIterator.next());
                                        break;
                                    }
                                } else if (this.taskMonitor.getNumRunningTasks() == 0 && this.taskCompleteEvents.isEmpty()) {
                                    this.subTaskScheduleAndMonitorStopped = true;
                                    if (countingSubTaskSpecIterator.count != this.taskMonitor.getNumSucceededTasks()) {
                                        ParallelIndexingPhaseProgress progress = this.taskMonitor.getProgress();
                                        throw new ISE("Expected [%d] tasks to succeed, but we got [%d] succeeded tasks and [%d] failed tasks", new Object[]{Integer.valueOf(countingSubTaskSpecIterator.count), Integer.valueOf(progress.getSucceeded()), Integer.valueOf(progress.getFailed())});
                                    }
                                    taskState = TaskState.SUCCESS;
                                    break;
                                }
                            } else {
                                throw new ISE("Last status of complete task is missing!", new Object[0]);
                            }
                            break;
                        case 2:
                            taskState = TaskState.FAILED;
                            this.subTaskScheduleAndMonitorStopped = true;
                            TaskStatusPlus lastStatus2 = poll.getLastStatus();
                            if (lastStatus2 == null) {
                                SinglePhaseSubTaskSpec singlePhaseSubTaskSpec = (SinglePhaseSubTaskSpec) poll.getSpec();
                                InputRowParser parser = singlePhaseSubTaskSpec.getIngestionSpec().getDataSchema().getParser();
                                Logger logger = LOG;
                                Object[] objArr = new Object[1];
                                objArr[0] = getSplitsIfSplittable(singlePhaseSubTaskSpec.getIngestionSpec().m38getIOConfig().getNonNullInputSource(parser), singlePhaseSubTaskSpec.getIngestionSpec().m38getIOConfig().getNonNullInputFormat(parser == null ? null : parser.getParseSpec()), this.tuningConfig.getSplitHintSpec());
                                logger.error("Failed to run sub tasks for inputSplits[%s]", objArr);
                                break;
                            } else {
                                LOG.error("Failed because of the failed sub task[%s]", new Object[]{lastStatus2.getId()});
                                break;
                            }
                        default:
                            throw new ISE("spec[%s] is in an invalid state[%s]", new Object[]{poll.getSpec().getId(), lastState});
                    }
                }
            }
            stopInternal();
            if (!taskState.isComplete()) {
                taskState = TaskState.FAILED;
            }
            return taskState;
        } catch (Throwable th) {
            stopInternal();
            if (!taskState.isComplete()) {
                TaskState taskState2 = TaskState.FAILED;
            }
            throw th;
        }
    }

    private boolean isRunning() {
        return (this.subTaskScheduleAndMonitorStopped || Thread.currentThread().isInterrupted()) ? false : true;
    }

    private void submitNewTask(TaskMonitor<SubTaskType> taskMonitor, final SubTaskSpec<SubTaskType> subTaskSpec) {
        LOG.info("Submit a new task for spec[%s] and inputSplit[%s]", new Object[]{subTaskSpec.getId(), subTaskSpec.getInputSplit()});
        Futures.addCallback(taskMonitor.submit(subTaskSpec), new FutureCallback<TaskMonitor.SubTaskCompleteEvent<SubTaskType>>() { // from class: org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexPhaseRunner.1
            public void onSuccess(TaskMonitor.SubTaskCompleteEvent<SubTaskType> subTaskCompleteEvent) {
                ParallelIndexPhaseRunner.this.taskCompleteEvents.offer(subTaskCompleteEvent);
            }

            public void onFailure(Throwable th) {
                ParallelIndexPhaseRunner.LOG.error(th, "Error while running a task for subTaskSpec[%s]", new Object[]{subTaskSpec});
                ParallelIndexPhaseRunner.this.taskCompleteEvents.offer(TaskMonitor.SubTaskCompleteEvent.fail(subTaskSpec, th));
            }
        });
    }

    private static List<InputSplit> getSplitsIfSplittable(InputSource inputSource, InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException {
        if (inputSource instanceof SplittableInputSource) {
            return (List) ((SplittableInputSource) inputSource).createSplits(inputFormat, splitHintSpec).collect(Collectors.toList());
        }
        throw new ISE("inputSource[%s] is not splittable", new Object[]{inputSource.getClass().getSimpleName()});
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public void stopGracefully() {
        this.subTaskScheduleAndMonitorStopped = true;
        stopInternal();
    }

    private void stopInternal() {
        LOG.info("Cleaning up resources", new Object[0]);
        this.taskCompleteEvents.clear();
        if (this.taskMonitor != null) {
            this.taskMonitor.stop();
        }
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public void collectReport(SubTaskReportType subtaskreporttype) {
        this.reportsMap.compute(subtaskreporttype.getTaskId(), (str, subTaskReport) -> {
            if (subTaskReport != null) {
                Preconditions.checkState(subTaskReport.equals(subtaskreporttype), "task[%s] sent two or more reports and previous report[%s] is different from the current one[%s]", new Object[]{str, subTaskReport, subtaskreporttype});
            }
            return subtaskreporttype;
        });
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public Map<String, SubTaskReportType> getReports() {
        return this.reportsMap;
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public ParallelIndexingPhaseProgress getProgress() {
        return this.taskMonitor == null ? ParallelIndexingPhaseProgress.notRunning() : this.taskMonitor.getProgress();
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public Set<String> getRunningTaskIds() {
        return this.taskMonitor == null ? Collections.emptySet() : this.taskMonitor.getRunningTaskIds();
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public List<SubTaskSpec<SubTaskType>> getSubTaskSpecs() {
        if (this.taskMonitor == null) {
            return Collections.emptyList();
        }
        List<SubTaskSpec<SubTaskType>> runningSubTaskSpecs = this.taskMonitor.getRunningSubTaskSpecs();
        List<SubTaskSpec<SubTaskType>> completeSubTaskSpecs = this.taskMonitor.getCompleteSubTaskSpecs();
        HashMap hashMap = new HashMap(runningSubTaskSpecs.size() + completeSubTaskSpecs.size());
        runningSubTaskSpecs.forEach(subTaskSpec -> {
        });
        completeSubTaskSpecs.forEach(subTaskSpec2 -> {
        });
        return new ArrayList(hashMap.values());
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public List<SubTaskSpec<SubTaskType>> getRunningSubTaskSpecs() {
        return this.taskMonitor == null ? Collections.emptyList() : this.taskMonitor.getRunningSubTaskSpecs();
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    public List<SubTaskSpec<SubTaskType>> getCompleteSubTaskSpecs() {
        return this.taskMonitor == null ? Collections.emptyList() : this.taskMonitor.getCompleteSubTaskSpecs();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    @Nullable
    public SubTaskSpec<SubTaskType> getSubTaskSpec(String str) {
        if (this.taskMonitor == null) {
            return null;
        }
        TaskMonitor<T>.MonitorEntry runningTaskMonitorEntry = this.taskMonitor.getRunningTaskMonitorEntry(str);
        TaskHistory<SubTaskType> completeSubTaskSpecHistory = this.taskMonitor.getCompleteSubTaskSpecHistory(str);
        return runningTaskMonitorEntry != null ? runningTaskMonitorEntry.getSpec() : completeSubTaskSpecHistory != null ? completeSubTaskSpecHistory.getSpec() : null;
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    @Nullable
    public ParallelIndexTaskRunner.SubTaskSpecStatus getSubTaskState(String str) {
        if (this.taskMonitor == null) {
            return null;
        }
        TaskMonitor<T>.MonitorEntry runningTaskMonitorEntry = this.taskMonitor.getRunningTaskMonitorEntry(str);
        TaskHistory<SubTaskType> completeSubTaskSpecHistory = this.taskMonitor.getCompleteSubTaskSpecHistory(str);
        return runningTaskMonitorEntry != null ? new ParallelIndexTaskRunner.SubTaskSpecStatus((SinglePhaseSubTaskSpec) runningTaskMonitorEntry.getSpec(), runningTaskMonitorEntry.getRunningStatus(), runningTaskMonitorEntry.getTaskHistory()) : (completeSubTaskSpecHistory == null || completeSubTaskSpecHistory.isEmpty()) ? null : new ParallelIndexTaskRunner.SubTaskSpecStatus((SinglePhaseSubTaskSpec) completeSubTaskSpecHistory.getSpec(), null, completeSubTaskSpecHistory.getAttemptHistory());
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner
    @Nullable
    public TaskHistory<SubTaskType> getCompleteSubTaskSpecAttemptHistory(String str) {
        if (this.taskMonitor == null) {
            return null;
        }
        return this.taskMonitor.getCompleteSubTaskSpecHistory(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getTaskId() {
        return this.taskId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getGroupId() {
        return this.groupId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, Object> getContext() {
        return this.context;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ParallelIndexTuningConfig getTuningConfig() {
        return this.tuningConfig;
    }

    @VisibleForTesting
    TaskToolbox getToolbox() {
        return this.toolbox;
    }

    @VisibleForTesting
    @Nullable
    TaskMonitor<SubTaskType> getTaskMonitor() {
        return this.taskMonitor;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public int getAndIncrementNextSpecId() {
        int i = this.nextSpecId;
        this.nextSpecId = i + 1;
        return i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public IndexingServiceClient getIndexingServiceClient() {
        return this.indexingServiceClient;
    }
}
