package org.apache.flink.runtime.rest.handler.legacy.metrics;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.class */
public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetcher {
    private static final Logger LOG = LoggerFactory.getLogger(MetricFetcherImpl.class);
    private final GatewayRetriever<T> retriever;
    private final MetricQueryServiceRetriever queryServiceRetriever;
    private final Executor executor;
    private final Time timeout;
    private final long updateInterval;

    @GuardedBy("this")
    private long lastUpdateTime;
    private final MetricStore metrics = new MetricStore();
    private final MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer();

    @GuardedBy("this")
    private CompletableFuture<Void> fetchMetricsFuture = FutureUtils.completedVoidFuture();

    public MetricFetcherImpl(GatewayRetriever<T> gatewayRetriever, MetricQueryServiceRetriever metricQueryServiceRetriever, Executor executor, Time time, long j) {
        this.retriever = (GatewayRetriever) Preconditions.checkNotNull(gatewayRetriever);
        this.queryServiceRetriever = (MetricQueryServiceRetriever) Preconditions.checkNotNull(metricQueryServiceRetriever);
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        this.timeout = (Time) Preconditions.checkNotNull(time);
        Preconditions.checkArgument(j > 0, "The update interval must be larger than 0.");
        this.updateInterval = j;
    }

    @Override // org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher
    public MetricStore getMetricStore() {
        return this.metrics;
    }

    @Override // org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher
    public void update() {
        synchronized (this) {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastUpdateTime > this.updateInterval && this.fetchMetricsFuture.isDone()) {
                this.lastUpdateTime = currentTimeMillis;
                this.fetchMetricsFuture = fetchMetrics();
            }
        }
    }

    @Override // org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher
    public long getLastUpdateTime() {
        long j;
        synchronized (this) {
            j = this.lastUpdateTime;
        }
        return j;
    }

    private CompletableFuture<Void> fetchMetrics() {
        LOG.debug("Start fetching metrics.");
        try {
            Optional<T> now = this.retriever.getNow();
            if (!now.isPresent()) {
                return FutureUtils.completedVoidFuture();
            }
            T t = now.get();
            t.requestMultipleJobDetails(this.timeout).whenCompleteAsync((multipleJobsDetails, th) -> {
                if (th != null) {
                    LOG.debug("Fetching of JobDetails failed.", th);
                    return;
                }
                ArrayList arrayList = new ArrayList(multipleJobsDetails.getJobs().size());
                Iterator<JobDetails> it = multipleJobsDetails.getJobs().iterator();
                while (it.hasNext()) {
                    arrayList.add(it.next().getJobId().toString());
                }
                this.metrics.retainJobs(arrayList);
                this.metrics.updateCurrentExecutionAttempts(multipleJobsDetails.getJobs());
            }, this.executor);
            ArrayList arrayList = new ArrayList();
            CompletableFuture<Void> queryJmMetricsFuture = queryJmMetricsFuture(t);
            arrayList.add(queryJmMetricsFuture);
            queryJmMetricsFuture.whenCompleteAsync((r4, th2) -> {
                if (th2 != null) {
                    LOG.debug("Failed to fetch the leader's metrics.", th2);
                }
            }, this.executor);
            CompletableFuture<Void> queryTmMetricsFuture = queryTmMetricsFuture(t);
            arrayList.add(queryTmMetricsFuture);
            queryTmMetricsFuture.whenCompleteAsync((r42, th3) -> {
                if (th3 != null) {
                    LOG.debug("Failed to fetch the TaskManager's metrics.", th3);
                }
            }, this.executor);
            return FutureUtils.waitForAll(arrayList);
        } catch (Exception e) {
            LOG.debug("Exception while fetching metrics.", e);
            return FutureUtils.completedExceptionally(e);
        }
    }

    private CompletableFuture<Void> queryJmMetricsFuture(T t) {
        return t.requestMetricQueryServiceAddresses(this.timeout).thenComposeAsync(collection -> {
            ArrayList arrayList = new ArrayList();
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                arrayList.add(retrieveAndQueryMetrics((String) it.next()));
            }
            return FutureUtils.waitForAll(arrayList);
        }, this.executor);
    }

    private CompletableFuture<Void> queryTmMetricsFuture(T t) {
        return t.requestTaskManagerMetricQueryServiceAddresses(this.timeout).thenComposeAsync(collection -> {
            ArrayList arrayList = new ArrayList();
            this.metrics.retainTaskManagers((List) collection.stream().map(tuple2 -> {
                arrayList.add(this.queryServiceRetriever.retrieveService((String) tuple2.f1).thenComposeAsync(this::queryMetrics, this.executor));
                return ((ResourceID) tuple2.f0).getResourceIdString();
            }).collect(Collectors.toList()));
            return FutureUtils.waitForAll(arrayList);
        }, this.executor);
    }

    private CompletableFuture<Void> retrieveAndQueryMetrics(String str) {
        LOG.debug("Retrieve metric query service gateway for {}", str);
        return this.queryServiceRetriever.retrieveService(str).thenComposeAsync(this::queryMetrics, this.executor);
    }

    private CompletableFuture<Void> queryMetrics(MetricQueryServiceGateway metricQueryServiceGateway) {
        LOG.debug("Query metrics for {}.", metricQueryServiceGateway.getAddress());
        return metricQueryServiceGateway.queryMetrics(this.timeout).thenComposeAsync(metricSerializationResult -> {
            this.metrics.addAll(this.deserializer.deserialize(metricSerializationResult));
            return FutureUtils.completedVoidFuture();
        }, this.executor);
    }

    @Nonnull
    public static <T extends RestfulGateway> MetricFetcherImpl<T> fromConfiguration(Configuration configuration, MetricQueryServiceRetriever metricQueryServiceRetriever, GatewayRetriever<T> gatewayRetriever, ExecutorService executorService) {
        return new MetricFetcherImpl<>(gatewayRetriever, metricQueryServiceRetriever, executorService, Time.fromDuration((Duration) configuration.get(WebOptions.TIMEOUT)), ((Duration) configuration.get(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL)).toMillis());
    }
}
