package org.apache.hudi.client.embedded;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.util.NetworkUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.timeline.service.TimelineService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/client/embedded/EmbeddedTimelineService.class */
public class EmbeddedTimelineService {
    private static final Object SERVICE_LOCK = new Object();
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedTimelineService.class);
    private static final AtomicInteger NUM_SERVERS_RUNNING = new AtomicInteger(0);
    private static final Map<TimelineServiceIdentifier, EmbeddedTimelineService> RUNNING_SERVICES = new HashMap();
    private static final Registry METRICS_REGISTRY = Registry.getRegistry("TimelineService");
    private static final String NUM_EMBEDDED_TIMELINE_SERVERS = "numEmbeddedTimelineServers";
    private int serverPort;
    private String hostAddr;
    private final HoodieEngineContext context;
    private final SerializableConfiguration hadoopConf;
    private final HoodieWriteConfig writeConfig;
    private TimelineService.Config serviceConfig;
    private final TimelineServiceIdentifier timelineServiceIdentifier;
    private final Set<String> basePaths;
    private transient FileSystemViewManager viewManager;
    private transient TimelineService server;

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/hudi/client/embedded/EmbeddedTimelineService$TimelineServiceCreator.class */
    public interface TimelineServiceCreator {
        TimelineService create(HoodieEngineContext hoodieEngineContext, Configuration configuration, TimelineService.Config config, FileSystem fileSystem, FileSystemViewManager fileSystemViewManager) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hudi/client/embedded/EmbeddedTimelineService$TimelineServiceIdentifier.class */
    public static class TimelineServiceIdentifier {
        private final String hostAddr;
        private final MarkerType markerType;
        private final boolean isMetadataEnabled;
        private final boolean isEarlyConflictDetectionEnable;

        public TimelineServiceIdentifier(String str, MarkerType markerType, boolean z, boolean z2) {
            this.hostAddr = str;
            this.markerType = markerType;
            this.isMetadataEnabled = z;
            this.isEarlyConflictDetectionEnable = z2;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof TimelineServiceIdentifier)) {
                return false;
            }
            TimelineServiceIdentifier timelineServiceIdentifier = (TimelineServiceIdentifier) obj;
            return (this.hostAddr == null || timelineServiceIdentifier.hostAddr == null) ? this.hostAddr == null && timelineServiceIdentifier.hostAddr == null : this.isMetadataEnabled == timelineServiceIdentifier.isMetadataEnabled && this.isEarlyConflictDetectionEnable == timelineServiceIdentifier.isEarlyConflictDetectionEnable && this.hostAddr.equals(timelineServiceIdentifier.hostAddr) && this.markerType == timelineServiceIdentifier.markerType;
        }

        public int hashCode() {
            return Objects.hash(this.hostAddr, this.markerType, Boolean.valueOf(this.isMetadataEnabled), Boolean.valueOf(this.isEarlyConflictDetectionEnable));
        }
    }

    private EmbeddedTimelineService(HoodieEngineContext hoodieEngineContext, String str, HoodieWriteConfig hoodieWriteConfig, TimelineServiceIdentifier timelineServiceIdentifier) {
        setHostAddr(str);
        this.context = hoodieEngineContext;
        this.writeConfig = hoodieWriteConfig;
        this.timelineServiceIdentifier = timelineServiceIdentifier;
        this.basePaths = new HashSet();
        this.basePaths.add(hoodieWriteConfig.getBasePath());
        this.hadoopConf = hoodieEngineContext.getHadoopConf();
        this.viewManager = createViewManager();
    }

    public static EmbeddedTimelineService getOrStartEmbeddedTimelineService(HoodieEngineContext hoodieEngineContext, String str, HoodieWriteConfig hoodieWriteConfig) throws IOException {
        return getOrStartEmbeddedTimelineService(hoodieEngineContext, str, hoodieWriteConfig, TimelineService::new);
    }

    static EmbeddedTimelineService getOrStartEmbeddedTimelineService(HoodieEngineContext hoodieEngineContext, String str, HoodieWriteConfig hoodieWriteConfig, TimelineServiceCreator timelineServiceCreator) throws IOException {
        TimelineServiceIdentifier timelineServiceIdentifier = getTimelineServiceIdentifier(str, hoodieWriteConfig);
        if (!hoodieWriteConfig.isEmbeddedTimelineServerReuseEnabled()) {
            return createAndStartService(hoodieEngineContext, str, hoodieWriteConfig, timelineServiceCreator, timelineServiceIdentifier);
        }
        synchronized (SERVICE_LOCK) {
            if (!RUNNING_SERVICES.containsKey(timelineServiceIdentifier)) {
                EmbeddedTimelineService createAndStartService = createAndStartService(hoodieEngineContext, str, hoodieWriteConfig, timelineServiceCreator, timelineServiceIdentifier);
                RUNNING_SERVICES.put(timelineServiceIdentifier, createAndStartService);
                return createAndStartService;
            }
            RUNNING_SERVICES.get(timelineServiceIdentifier).addBasePath(hoodieWriteConfig.getBasePath());
            LOG.info("Reusing existing embedded timeline server with configuration: " + RUNNING_SERVICES.get(timelineServiceIdentifier).serviceConfig);
            return RUNNING_SERVICES.get(timelineServiceIdentifier);
        }
    }

    private static EmbeddedTimelineService createAndStartService(HoodieEngineContext hoodieEngineContext, String str, HoodieWriteConfig hoodieWriteConfig, TimelineServiceCreator timelineServiceCreator, TimelineServiceIdentifier timelineServiceIdentifier) throws IOException {
        EmbeddedTimelineService embeddedTimelineService = new EmbeddedTimelineService(hoodieEngineContext, str, hoodieWriteConfig, timelineServiceIdentifier);
        embeddedTimelineService.startServer(timelineServiceCreator);
        METRICS_REGISTRY.set(NUM_EMBEDDED_TIMELINE_SERVERS, NUM_SERVERS_RUNNING.incrementAndGet());
        return embeddedTimelineService;
    }

    public static void shutdownAllTimelineServers() {
        RUNNING_SERVICES.entrySet().forEach(entry -> {
            LOG.info("Closing Timeline server");
            ((EmbeddedTimelineService) entry.getValue()).server.close();
            METRICS_REGISTRY.set(NUM_EMBEDDED_TIMELINE_SERVERS, NUM_SERVERS_RUNNING.decrementAndGet());
            LOG.info("Closed Timeline server");
        });
        RUNNING_SERVICES.clear();
    }

    private FileSystemViewManager createViewManager() {
        FileSystemViewStorageConfig.Builder fromProperties = FileSystemViewStorageConfig.newBuilder().fromProperties(this.writeConfig.getClientSpecifiedViewStorageConfig().getProps());
        FileSystemViewStorageType storageType = fromProperties.build().getStorageType();
        if (storageType.equals(FileSystemViewStorageType.REMOTE_ONLY) || storageType.equals(FileSystemViewStorageType.REMOTE_FIRST)) {
            fromProperties.withStorageType(FileSystemViewStorageType.MEMORY);
        }
        return FileSystemViewManager.createViewManagerWithTableMetadata(this.context, this.writeConfig.getMetadataConfig(), fromProperties.build(), this.writeConfig.getCommonConfig());
    }

    private void startServer(TimelineServiceCreator timelineServiceCreator) throws IOException {
        TimelineService.Config.Builder async = TimelineService.Config.builder().serverPort(this.writeConfig.getEmbeddedTimelineServerPort()).numThreads(this.writeConfig.getEmbeddedTimelineServerThreads()).compress(this.writeConfig.getEmbeddedTimelineServerCompressOutput()).async(this.writeConfig.getEmbeddedTimelineServerUseAsync());
        if (this.writeConfig.getMarkersType() == MarkerType.TIMELINE_SERVER_BASED) {
            async.enableMarkerRequests(true).markerBatchNumThreads(this.writeConfig.getMarkersTimelineServerBasedBatchNumThreads()).markerBatchIntervalMs(this.writeConfig.getMarkersTimelineServerBasedBatchIntervalMs()).markerParallelism(this.writeConfig.getMarkersDeleteParallelism());
        }
        if (this.writeConfig.isEarlyConflictDetectionEnable()) {
            async.earlyConflictDetectionEnable(true).earlyConflictDetectionStrategy(this.writeConfig.getEarlyConflictDetectionStrategyClassName()).earlyConflictDetectionCheckCommitConflict(Boolean.valueOf(this.writeConfig.earlyConflictDetectionCheckCommitConflict())).asyncConflictDetectorInitialDelayMs(this.writeConfig.getAsyncConflictDetectorInitialDelayMs()).asyncConflictDetectorPeriodMs(this.writeConfig.getAsyncConflictDetectorPeriodMs()).earlyConflictDetectionMaxAllowableHeartbeatIntervalInMs(Long.valueOf(this.writeConfig.getHoodieClientHeartbeatIntervalInMs().longValue() * this.writeConfig.getHoodieClientHeartbeatTolerableMisses().intValue()));
        }
        this.serviceConfig = async.build();
        this.server = timelineServiceCreator.create(this.context, this.hadoopConf.newCopy(), this.serviceConfig, FSUtils.getFs(this.writeConfig.getBasePath(), this.hadoopConf.newCopy()), this.viewManager);
        this.serverPort = this.server.startService();
        LOG.info("Started embedded timeline server at " + this.hostAddr + ":" + this.serverPort);
    }

    private void setHostAddr(String str) {
        if (str != null) {
            LOG.info("Overriding hostIp to (" + str + ") found in spark-conf. It was " + this.hostAddr);
            this.hostAddr = str;
        } else {
            LOG.warn("Unable to find driver bind address from spark config");
            this.hostAddr = NetworkUtils.getHostname();
        }
    }

    public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() {
        return FileSystemViewStorageConfig.newBuilder().withStorageType(this.writeConfig.getClientSpecifiedViewStorageConfig().shouldEnableBackupForRemoteFileSystemView() ? FileSystemViewStorageType.REMOTE_FIRST : FileSystemViewStorageType.REMOTE_ONLY).withRemoteServerHost(this.hostAddr).withRemoteServerPort(Integer.valueOf(this.serverPort)).withRemoteTimelineClientTimeoutSecs(this.writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientTimeoutSecs()).withRemoteTimelineClientRetry(this.writeConfig.getClientSpecifiedViewStorageConfig().isRemoteTimelineClientRetryEnabled()).withRemoteTimelineClientMaxRetryNumbers(this.writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryNumbers()).withRemoteTimelineInitialRetryIntervalMs(this.writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineInitialRetryIntervalMs()).withRemoteTimelineClientMaxRetryIntervalMs(this.writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryIntervalMs()).withRemoteTimelineClientRetryExceptions(this.writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientRetryExceptions()).build();
    }

    public FileSystemViewManager getViewManager() {
        return this.viewManager;
    }

    private void addBasePath(String str) {
        this.basePaths.add(str);
    }

    public void stopForBasePath(String str) {
        synchronized (SERVICE_LOCK) {
            this.basePaths.remove(str);
            if (this.basePaths.isEmpty()) {
                RUNNING_SERVICES.remove(this.timelineServiceIdentifier);
            }
        }
        if (this.server != null) {
            this.server.unregisterBasePath(str);
        }
        if (!this.basePaths.isEmpty() || null == this.server) {
            return;
        }
        LOG.info("Closing Timeline server");
        this.server.close();
        METRICS_REGISTRY.set(NUM_EMBEDDED_TIMELINE_SERVERS, NUM_SERVERS_RUNNING.decrementAndGet());
        this.server = null;
        this.viewManager = null;
        LOG.info("Closed Timeline server");
    }

    private static TimelineServiceIdentifier getTimelineServiceIdentifier(String str, HoodieWriteConfig hoodieWriteConfig) {
        return new TimelineServiceIdentifier(str, hoodieWriteConfig.getMarkersType(), hoodieWriteConfig.isMetadataTableEnabled(), hoodieWriteConfig.isEarlyConflictDetectionEnable());
    }
}
