package io.confluent.ksql.rest.server;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.ServiceManager;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.rest.entity.HostStoreLags;
import io.confluent.ksql.rest.entity.KsqlHostInfoEntity;
import io.confluent.ksql.rest.entity.LagInfoEntity;
import io.confluent.ksql.rest.entity.LagReportingMessage;
import io.confluent.ksql.rest.entity.QueryStateStoreId;
import io.confluent.ksql.rest.entity.StateStoreLags;
import io.confluent.ksql.rest.server.HeartbeatAgent;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.HostStatus;
import io.confluent.ksql.util.KsqlHostInfo;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.net.URI;
import java.net.URL;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.streams.LagInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/LagReportingAgent.class */
public final class LagReportingAgent implements HeartbeatAgent.HostStatusListener {
    private static final int SERVICE_TIMEOUT_SEC = 2;
    private static final int NUM_THREADS_EXECUTOR = 1;
    private static final int SEND_LAG_DELAY_MS = 100;
    private static final Logger LOG = LoggerFactory.getLogger(LagReportingAgent.class);
    private final KsqlEngine engine;
    private final ScheduledExecutorService scheduledExecutorService;
    private final ServiceContext serviceContext;
    private final LagReportingConfig config;
    private final ServiceManager serviceManager;
    private final Clock clock;
    private final Map<KsqlHostInfo, HostStoreLags> receivedLagInfo;
    private final AtomicReference<Set<KsqlHostInfo>> aliveHostsRef;
    private URL localUrl;

    /* loaded from: input_file:io/confluent/ksql/rest/server/LagReportingAgent$Builder.class */
    public static class Builder {
        private long nestedLagSendIntervalMs = 500;
        private Clock nestedClock = Clock.systemUTC();

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder lagSendIntervalMs(long j) {
            this.nestedLagSendIntervalMs = j;
            return this;
        }

        Builder clock(Clock clock) {
            this.nestedClock = clock;
            return this;
        }

        public LagReportingAgent build(KsqlEngine ksqlEngine, ServiceContext serviceContext) {
            return new LagReportingAgent(ksqlEngine, Executors.newScheduledThreadPool(LagReportingAgent.NUM_THREADS_EXECUTOR), serviceContext, new LagReportingConfig(this.nestedLagSendIntervalMs), this.nestedClock);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/LagReportingAgent$LagReportingConfig.class */
    public static class LagReportingConfig {
        private final long lagSendIntervalMs;

        LagReportingConfig(long j) {
            this.lagSendIntervalMs = j;
        }
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/LagReportingAgent$SendLagService.class */
    class SendLagService extends AbstractScheduledService {
        SendLagService() {
        }

        protected void runOneIteration() {
            List persistentQueries = LagReportingAgent.this.engine.getPersistentQueries();
            if (persistentQueries.isEmpty()) {
                return;
            }
            LagReportingMessage createLagReportingMessage = createLagReportingMessage((Map) persistentQueries.stream().map(persistentQueryMetadata -> {
                return Pair.of(persistentQueryMetadata, persistentQueryMetadata.getAllLocalStorePartitionLags());
            }).map(pair -> {
                return (Map) ((Map) pair.getRight()).entrySet().stream().collect(Collectors.toMap(entry -> {
                    return QueryStateStoreId.of(((PersistentQueryMetadata) pair.getLeft()).getQueryApplicationId(), (String) entry.getKey());
                }, (v0) -> {
                    return v0.getValue();
                }));
            }).flatMap(map -> {
                return map.entrySet().stream();
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            })));
            for (KsqlHostInfo ksqlHostInfo : (Set) LagReportingAgent.this.aliveHostsRef.get()) {
                try {
                    URI buildRemoteUri = ServerUtil.buildRemoteUri(LagReportingAgent.this.localUrl, ksqlHostInfo.host(), ksqlHostInfo.port());
                    LagReportingAgent.LOG.debug("Sending lag to host {} at {}", ksqlHostInfo.host(), Long.valueOf(LagReportingAgent.this.clock.millis()));
                    LagReportingAgent.this.serviceContext.getKsqlClient().makeAsyncLagReportRequest(buildRemoteUri, createLagReportingMessage);
                } catch (Throwable th) {
                    LagReportingAgent.LOG.error("Request to server: " + ksqlHostInfo.host() + ":" + ksqlHostInfo.port() + " failed with exception: " + th.getMessage(), th);
                }
            }
        }

        private LagReportingMessage createLagReportingMessage(Map<QueryStateStoreId, Map<Integer, LagInfo>> map) {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            for (Map.Entry<QueryStateStoreId, Map<Integer, LagInfo>> entry : map.entrySet()) {
                builder.put(entry.getKey(), new StateStoreLags((ImmutableMap) entry.getValue().entrySet().stream().map(entry2 -> {
                    LagInfo lagInfo = (LagInfo) entry2.getValue();
                    return Pair.of(entry2.getKey(), new LagInfoEntity(lagInfo.currentOffsetPosition(), lagInfo.endOffsetPosition(), lagInfo.offsetLag()));
                }).collect(ImmutableMap.toImmutableMap((v0) -> {
                    return v0.getLeft();
                }, (v0) -> {
                    return v0.getRight();
                }))));
            }
            return new LagReportingMessage(new KsqlHostInfoEntity(LagReportingAgent.this.localUrl.getHost(), LagReportingAgent.this.localUrl.getPort()), new HostStoreLags(builder.build(), LagReportingAgent.this.clock.millis()));
        }

        protected AbstractScheduledService.Scheduler scheduler() {
            return AbstractScheduledService.Scheduler.newFixedRateSchedule(100L, LagReportingAgent.this.config.lagSendIntervalMs, TimeUnit.MILLISECONDS);
        }

        protected ScheduledExecutorService executor() {
            return LagReportingAgent.this.scheduledExecutorService;
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    private LagReportingAgent(KsqlEngine ksqlEngine, ScheduledExecutorService scheduledExecutorService, ServiceContext serviceContext, LagReportingConfig lagReportingConfig, Clock clock) {
        this.engine = (KsqlEngine) Objects.requireNonNull(ksqlEngine, "engine");
        this.scheduledExecutorService = scheduledExecutorService;
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.config = (LagReportingConfig) Objects.requireNonNull(lagReportingConfig, "configuration parameters");
        this.clock = clock;
        this.serviceManager = new ServiceManager(Arrays.asList(new SendLagService()));
        this.receivedLagInfo = new ConcurrentHashMap();
        this.aliveHostsRef = new AtomicReference<>(Collections.emptySet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setLocalAddress(String str) {
        try {
            this.localUrl = new URL(str);
        } catch (Exception e) {
            throw new IllegalStateException("Failed to convert remote host info to URL. remoteInfo: " + str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startAgent() {
        try {
            this.serviceManager.startAsync().awaitHealthy(2L, TimeUnit.SECONDS);
        } catch (IllegalStateException | TimeoutException e) {
            LOG.error("Failed to start heartbeat services with exception " + e.getMessage(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopAgent() {
        try {
            this.serviceManager.stopAsync().awaitStopped(2L, TimeUnit.SECONDS);
        } catch (IllegalStateException | TimeoutException e) {
            LOG.error("Failed to stop heartbeat services with exception " + e.getMessage(), e);
        } finally {
            this.scheduledExecutorService.shutdownNow();
        }
    }

    public void receiveHostLag(LagReportingMessage lagReportingMessage) {
        HostStoreLags hostStoreLags = lagReportingMessage.getHostStoreLags();
        long updateTimeMs = hostStoreLags.getUpdateTimeMs();
        KsqlHostInfoEntity ksqlHost = lagReportingMessage.getKsqlHost();
        KsqlHostInfo ksqlHost2 = ksqlHost.toKsqlHost();
        LOG.debug("Receive lag at: {} from host: {} lag: {} ", new Object[]{Long.valueOf(updateTimeMs), ksqlHost, hostStoreLags.getStateStoreLags()});
        this.receivedLagInfo.compute(ksqlHost2, (ksqlHostInfo, hostStoreLags2) -> {
            return (hostStoreLags2 == null || hostStoreLags2.getUpdateTimeMs() <= updateTimeMs) ? hostStoreLags : hostStoreLags2;
        });
    }

    public Optional<LagInfoEntity> getLagInfoForHost(KsqlHostInfo ksqlHostInfo, QueryStateStoreId queryStateStoreId, int i) {
        return getLagPerHost(ksqlHostInfo).flatMap(hostStoreLags -> {
            return hostStoreLags.getStateStoreLags(queryStateStoreId);
        }).flatMap(stateStoreLags -> {
            return stateStoreLags.getLagByPartition(i);
        });
    }

    public ImmutableMap<KsqlHostInfoEntity, HostStoreLags> getAllLags() {
        return (ImmutableMap) this.receivedLagInfo.entrySet().stream().collect(ImmutableMap.toImmutableMap(entry -> {
            return new KsqlHostInfoEntity(((KsqlHostInfo) entry.getKey()).host(), ((KsqlHostInfo) entry.getKey()).port());
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    public Optional<HostStoreLags> getLagPerHost(KsqlHostInfo ksqlHostInfo) {
        return Optional.ofNullable(this.receivedLagInfo.get(ksqlHostInfo));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.confluent.ksql.rest.server.HeartbeatAgent.HostStatusListener
    public void onHostStatusUpdated(Map<KsqlHostInfo, HostStatus> map) {
        this.aliveHostsRef.set(map.entrySet().stream().filter(entry -> {
            return ((HostStatus) entry.getValue()).isHostAlive();
        }).map((v0) -> {
            return v0.getKey();
        }).collect(ImmutableSet.toImmutableSet()));
    }
}
