package org.apache.hudi.timeline.service.handlers;

import io.javalin.http.Context;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationDispatchingRunnable;
import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationFuture;
import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/timeline/service/handlers/MarkerHandler.class */
public class MarkerHandler extends Handler {
    private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
    private final Registry metricsRegistry;
    private final ScheduledExecutorService dispatchingExecutorService;
    private final ExecutorService batchingExecutorService;
    private final int parallelism;
    private final Map<String, MarkerDirState> markerDirStateMap;
    private final MarkerCreationDispatchingRunnable markerCreationDispatchingRunnable;
    private final Object firstCreationRequestSeenLock;
    private transient HoodieEngineContext hoodieEngineContext;
    private ScheduledFuture<?> dispatchingThreadFuture;
    private boolean firstCreationRequestSeen;

    public MarkerHandler(Configuration configuration, TimelineService.Config config, HoodieEngineContext hoodieEngineContext, FileSystem fileSystem, FileSystemViewManager fileSystemViewManager, Registry registry) throws IOException {
        super(configuration, config, fileSystem, fileSystemViewManager);
        this.markerDirStateMap = new ConcurrentHashMap();
        this.firstCreationRequestSeenLock = new Object();
        LOG.debug("MarkerHandler FileSystem: " + this.fileSystem.getScheme());
        LOG.debug("MarkerHandler batching params: batchNumThreads=" + config.markerBatchNumThreads + " batchIntervalMs=" + config.markerBatchIntervalMs + "ms");
        this.hoodieEngineContext = hoodieEngineContext;
        this.metricsRegistry = registry;
        this.parallelism = config.markerParallelism;
        this.dispatchingExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.batchingExecutorService = Executors.newFixedThreadPool(config.markerBatchNumThreads);
        this.markerCreationDispatchingRunnable = new MarkerCreationDispatchingRunnable(this.markerDirStateMap, this.batchingExecutorService);
        this.firstCreationRequestSeen = false;
    }

    public void stop() {
        if (this.dispatchingThreadFuture != null) {
            this.dispatchingThreadFuture.cancel(true);
        }
        this.dispatchingExecutorService.shutdown();
        this.batchingExecutorService.shutdown();
    }

    public Set<String> getAllMarkers(String str) {
        return getMarkerDirState(str).getAllMarkers();
    }

    public Set<String> getCreateAndMergeMarkers(String str) {
        return (Set) getAllMarkers(str).stream().filter(str2 -> {
            return !str2.endsWith(IOType.APPEND.name());
        }).collect(Collectors.toSet());
    }

    public boolean doesMarkerDirExist(String str) {
        return getMarkerDirState(str).exists();
    }

    public CompletableFuture<String> createMarker(Context context, String str, String str2) {
        LOG.info("Request: create marker " + str + " " + str2);
        MarkerCreationFuture markerCreationFuture = new MarkerCreationFuture(context, str, str2);
        getMarkerDirState(str).addMarkerCreationFuture(markerCreationFuture);
        if (!this.firstCreationRequestSeen) {
            synchronized (this.firstCreationRequestSeenLock) {
                if (!this.firstCreationRequestSeen) {
                    this.dispatchingThreadFuture = this.dispatchingExecutorService.scheduleAtFixedRate(this.markerCreationDispatchingRunnable, this.timelineServiceConfig.markerBatchIntervalMs, this.timelineServiceConfig.markerBatchIntervalMs, TimeUnit.MILLISECONDS);
                    this.firstCreationRequestSeen = true;
                }
            }
        }
        return markerCreationFuture;
    }

    public Boolean deleteMarkers(String str) {
        boolean deleteAllMarkers = getMarkerDirState(str).deleteAllMarkers();
        this.markerDirStateMap.remove(str);
        return Boolean.valueOf(deleteAllMarkers);
    }

    private MarkerDirState getMarkerDirState(String str) {
        MarkerDirState markerDirState = this.markerDirStateMap.get(str);
        if (markerDirState == null) {
            synchronized (this.markerDirStateMap) {
                if (this.markerDirStateMap.get(str) == null) {
                    markerDirState = new MarkerDirState(str, this.timelineServiceConfig.markerBatchNumThreads, this.fileSystem, this.metricsRegistry, this.hoodieEngineContext, this.parallelism);
                    this.markerDirStateMap.put(str, markerDirState);
                } else {
                    markerDirState = this.markerDirStateMap.get(str);
                }
            }
        }
        return markerDirState;
    }
}
