/*
 * Decompiled with CFR 0.152.
 */
package io.openlineage.spark.agent;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.spark.executor.OutputMetrics;
import org.apache.spark.executor.TaskMetrics;

public class JobMetricsHolder {
    private final Map<Integer, Set<Integer>> jobStages = new ConcurrentHashMap<Integer, Set<Integer>>();
    private final Map<Integer, TaskMetrics> stageMetrics = new ConcurrentHashMap<Integer, TaskMetrics>();

    JobMetricsHolder() {
    }

    public void addJobStages(int jobId, Set<Integer> stages) {
        if (stages != null) {
            this.jobStages.put(jobId, stages);
        }
    }

    public void addMetrics(int stage, TaskMetrics taskMetrics) {
        if (taskMetrics != null) {
            this.stageMetrics.put(stage, taskMetrics);
        }
    }

    public Map<Metric, Number> pollMetrics(int jobId) {
        return Optional.ofNullable(this.jobStages.remove(jobId)).map(stages -> stages.stream().map(this.stageMetrics::remove).filter(Objects::nonNull).collect(Collectors.toList())).filter(l -> !l.isEmpty()).map(this::mapOutputMetrics).orElse(Collections.emptyMap());
    }

    public void cleanUp(int jobId) {
        Set<Integer> stages = this.jobStages.remove(jobId);
        stages = stages == null ? Collections.emptySet() : stages;
        stages.forEach(s -> this.jobStages.remove(s));
    }

    private Map<Metric, Number> mapOutputMetrics(List<TaskMetrics> jobMetrics) {
        HashMap<Metric, Number> result = new HashMap<Metric, Number>();
        for (TaskMetrics taskMetric : jobMetrics) {
            OutputMetrics outputMetrics = taskMetric.outputMetrics();
            if (!Objects.nonNull(outputMetrics)) continue;
            result.merge(Metric.WRITE_BYTES, outputMetrics.bytesWritten(), (m, b) -> m.longValue() + b.longValue());
            result.merge(Metric.WRITE_RECORDS, outputMetrics.recordsWritten(), (m, b) -> m.longValue() + b.longValue());
        }
        return result;
    }

    public static JobMetricsHolder getInstance() {
        return SingletonHolder.instance;
    }

    private static class SingletonHolder {
        public static final JobMetricsHolder instance = new JobMetricsHolder();

        private SingletonHolder() {
        }
    }

    public static enum Metric {
        WRITE_BYTES,
        WRITE_RECORDS;

    }
}

