package org.apache.hudi.timeline.service.handlers;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.javalin.http.Context;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.conflict.detection.TimelineServerBasedDetectionStrategy;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieEarlyConflictDetectionException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.timeline.service.RequestHandler;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationDispatchingRunnable;
import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationFuture;
import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/timeline/service/handlers/MarkerHandler.class */
public class MarkerHandler extends Handler {
    private static final Logger LOG = LoggerFactory.getLogger(MarkerHandler.class);
    private final Registry metricsRegistry;
    private final ScheduledExecutorService dispatchingExecutorService;
    private final ExecutorService batchingExecutorService;
    private final int parallelism;
    private final Map<String, MarkerDirState> markerDirStateMap;
    private final MarkerCreationDispatchingRunnable markerCreationDispatchingRunnable;
    private final Object firstCreationRequestSeenLock;
    private final Object earlyConflictDetectionLock;
    private transient HoodieEngineContext hoodieEngineContext;
    private ScheduledFuture<?> dispatchingThreadFuture;
    private boolean firstCreationRequestSeen;
    private String currentMarkerDir;
    private TimelineServerBasedDetectionStrategy earlyConflictDetectionStrategy;

    public MarkerHandler(Configuration configuration, TimelineService.Config config, HoodieEngineContext hoodieEngineContext, FileSystem fileSystem, FileSystemViewManager fileSystemViewManager, Registry registry) throws IOException {
        super(configuration, config, fileSystem, fileSystemViewManager);
        this.markerDirStateMap = new ConcurrentHashMap();
        this.firstCreationRequestSeenLock = new Object();
        this.earlyConflictDetectionLock = new Object();
        this.currentMarkerDir = null;
        LOG.debug("MarkerHandler FileSystem: " + this.fileSystem.getScheme());
        LOG.debug("MarkerHandler batching params: batchNumThreads=" + config.markerBatchNumThreads + " batchIntervalMs=" + config.markerBatchIntervalMs + "ms");
        this.hoodieEngineContext = hoodieEngineContext;
        this.metricsRegistry = registry;
        this.parallelism = config.markerParallelism;
        this.dispatchingExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.batchingExecutorService = Executors.newFixedThreadPool(config.markerBatchNumThreads);
        this.markerCreationDispatchingRunnable = new MarkerCreationDispatchingRunnable(this.markerDirStateMap, this.batchingExecutorService);
        this.firstCreationRequestSeen = false;
    }

    public void stop() {
        if (this.dispatchingThreadFuture != null) {
            this.dispatchingThreadFuture.cancel(true);
        }
        this.dispatchingExecutorService.shutdown();
        this.batchingExecutorService.shutdown();
    }

    public Set<String> getAllMarkers(String str) {
        return getMarkerDirState(str).getAllMarkers();
    }

    public Set<String> getPendingMarkersToProcess(String str) {
        return this.markerDirStateMap.containsKey(str) ? (Set) getMarkerDirState(str).getPendingMarkerCreationRequests(false).stream().map((v0) -> {
            return v0.getMarkerName();
        }).collect(Collectors.toSet()) : Collections.emptySet();
    }

    public Set<String> getCreateAndMergeMarkers(String str) {
        return (Set) getAllMarkers(str).stream().filter(str2 -> {
            return !str2.endsWith(IOType.APPEND.name());
        }).collect(Collectors.toSet());
    }

    public boolean doesMarkerDirExist(String str) {
        return getMarkerDirState(str).exists();
    }

    public CompletableFuture<String> createMarker(Context context, String str, String str2, String str3) {
        if (this.timelineServiceConfig.earlyConflictDetectionEnable.booleanValue()) {
            try {
                synchronized (this.earlyConflictDetectionLock) {
                    if (this.earlyConflictDetectionStrategy == null) {
                        String str4 = this.timelineServiceConfig.earlyConflictDetectionStrategy;
                        if (!ReflectionUtils.isSubClass(str4, TimelineServerBasedDetectionStrategy.class)) {
                            LOG.warn("Cannot use " + str4 + " for timeline-server-based markers.");
                            str4 = "org.apache.hudi.timeline.service.handlers.marker.AsyncTimelineServerBasedDetectionStrategy";
                            LOG.warn("Falling back to " + str4);
                        }
                        this.earlyConflictDetectionStrategy = (TimelineServerBasedDetectionStrategy) ReflectionUtils.loadClass(str4, new Object[]{str3, str, str2, this.timelineServiceConfig.checkCommitConflict});
                    }
                    if (!str.equalsIgnoreCase(this.currentMarkerDir)) {
                        this.currentMarkerDir = str;
                        Set createSet = CollectionUtils.createSet(new String[]{"commit", "deltacommit", "replacecommit"});
                        this.earlyConflictDetectionStrategy.startAsyncDetection(this.timelineServiceConfig.asyncConflictDetectorInitialDelayMs, this.timelineServiceConfig.asyncConflictDetectorPeriodMs, str, str3, this.timelineServiceConfig.maxAllowableHeartbeatIntervalInMs, this.fileSystem, this, new HashSet(this.viewManager.getFileSystemView(str3).getTimeline().filterCompletedInstants().filter(hoodieInstant -> {
                            return createSet.contains(hoodieInstant.getAction());
                        }).getInstants()));
                    }
                }
                this.earlyConflictDetectionStrategy.detectAndResolveConflictIfNecessary();
            } catch (Exception e) {
                LOG.warn("Failed to execute early conflict detection." + e.getMessage());
                return addMarkerCreationRequestForAsyncProcessing(context, str, str2);
            } catch (HoodieEarlyConflictDetectionException e2) {
                LOG.warn("Detected the write conflict due to a concurrent writer, failing the marker creation as the early conflict detection is enabled", e2);
                return finishCreateMarkerFuture(context, str, str2);
            }
        }
        return addMarkerCreationRequestForAsyncProcessing(context, str, str2);
    }

    private MarkerCreationFuture addMarkerCreationRequestForAsyncProcessing(Context context, String str, String str2) {
        LOG.info("Request: create marker: " + str2);
        MarkerCreationFuture markerCreationFuture = new MarkerCreationFuture(context, str, str2);
        getMarkerDirState(str).addMarkerCreationFuture(markerCreationFuture);
        if (!this.firstCreationRequestSeen) {
            synchronized (this.firstCreationRequestSeenLock) {
                if (!this.firstCreationRequestSeen) {
                    this.dispatchingThreadFuture = this.dispatchingExecutorService.scheduleAtFixedRate(this.markerCreationDispatchingRunnable, this.timelineServiceConfig.markerBatchIntervalMs, this.timelineServiceConfig.markerBatchIntervalMs, TimeUnit.MILLISECONDS);
                    this.firstCreationRequestSeen = true;
                }
            }
        }
        return markerCreationFuture;
    }

    private CompletableFuture<String> finishCreateMarkerFuture(Context context, String str, String str2) {
        MarkerCreationFuture markerCreationFuture = new MarkerCreationFuture(context, str, str2);
        try {
            markerCreationFuture.complete(RequestHandler.jsonifyResult(markerCreationFuture.getContext(), Boolean.valueOf(markerCreationFuture.isSuccessful()), this.metricsRegistry, new ObjectMapper(), LOG));
            return markerCreationFuture;
        } catch (JsonProcessingException e) {
            throw new HoodieException("Failed to JSON encode the value", e);
        }
    }

    public Boolean deleteMarkers(String str) {
        boolean deleteAllMarkers = getMarkerDirState(str).deleteAllMarkers();
        this.markerDirStateMap.remove(str);
        return Boolean.valueOf(deleteAllMarkers);
    }

    private MarkerDirState getMarkerDirState(String str) {
        MarkerDirState markerDirState = this.markerDirStateMap.get(str);
        if (markerDirState == null) {
            synchronized (this.markerDirStateMap) {
                if (this.markerDirStateMap.get(str) == null) {
                    markerDirState = new MarkerDirState(str, this.timelineServiceConfig.markerBatchNumThreads, (!this.timelineServiceConfig.earlyConflictDetectionEnable.booleanValue() || this.earlyConflictDetectionStrategy == null) ? Option.empty() : Option.of(this.earlyConflictDetectionStrategy), this.fileSystem, this.metricsRegistry, this.hoodieEngineContext, this.parallelism);
                    this.markerDirStateMap.put(str, markerDirState);
                } else {
                    markerDirState = this.markerDirStateMap.get(str);
                }
            }
        }
        return markerDirState;
    }
}
