package org.apache.hudi.async;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
import org.apache.hudi.client.AbstractClusteringClient;
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/async/AsyncClusteringService.class */
public abstract class AsyncClusteringService extends HoodieAsyncService {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class);
    private final int maxConcurrentClustering;
    private transient AbstractClusteringClient clusteringClient;

    public AsyncClusteringService(AbstractHoodieWriteClient abstractHoodieWriteClient) {
        this(abstractHoodieWriteClient, false);
    }

    public AsyncClusteringService(AbstractHoodieWriteClient abstractHoodieWriteClient, boolean z) {
        super(z);
        this.clusteringClient = createClusteringClient(abstractHoodieWriteClient);
        this.maxConcurrentClustering = 1;
    }

    protected abstract AbstractClusteringClient createClusteringClient(AbstractHoodieWriteClient abstractHoodieWriteClient);

    @Override // org.apache.hudi.async.HoodieAsyncService
    protected Pair<CompletableFuture, ExecutorService> startService() {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.maxConcurrentClustering, runnable -> {
            Thread thread = new Thread(runnable, "async_clustering_thread");
            thread.setDaemon(isRunInDaemonMode());
            return thread;
        });
        return Pair.of(CompletableFuture.allOf((CompletableFuture[]) IntStream.range(0, this.maxConcurrentClustering).mapToObj(i -> {
            return CompletableFuture.supplyAsync(() -> {
                while (!isShutdownRequested()) {
                    try {
                        HoodieInstant fetchNextAsyncServiceInstant = fetchNextAsyncServiceInstant();
                        if (null != fetchNextAsyncServiceInstant) {
                            LOG.info("Starting clustering for instant " + fetchNextAsyncServiceInstant);
                            this.clusteringClient.cluster(fetchNextAsyncServiceInstant);
                            LOG.info("Finished clustering for instant " + fetchNextAsyncServiceInstant);
                        }
                    } catch (IOException e) {
                        LOG.error("Clustering executor failed", e);
                        throw new HoodieIOException(e.getMessage(), e);
                    } catch (InterruptedException e2) {
                        LOG.warn("Clustering executor got interrupted exception! Stopping", e2);
                    }
                }
                LOG.info("Clustering executor shutting down properly");
                return true;
            }, newFixedThreadPool);
        }).toArray(i2 -> {
            return new CompletableFuture[i2];
        })), newFixedThreadPool);
    }

    public synchronized void updateWriteClient(AbstractHoodieWriteClient abstractHoodieWriteClient) {
        this.clusteringClient.updateWriteClient(abstractHoodieWriteClient);
    }
}
