package org.apache.hudi.async;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
import org.apache.hudi.client.BaseCompactor;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/async/AsyncCompactService.class */
public abstract class AsyncCompactService extends HoodieAsyncTableService {
    public static final String COMPACT_POOL_NAME = "hoodiecompact";
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class);
    private final int maxConcurrentCompaction;
    protected transient HoodieEngineContext context;
    private transient BaseCompactor compactor;

    public AsyncCompactService(HoodieEngineContext hoodieEngineContext, BaseHoodieWriteClient baseHoodieWriteClient) {
        this(hoodieEngineContext, baseHoodieWriteClient, false);
    }

    public AsyncCompactService(HoodieEngineContext hoodieEngineContext, BaseHoodieWriteClient baseHoodieWriteClient, boolean z) {
        super(baseHoodieWriteClient.getConfig(), z);
        this.context = hoodieEngineContext;
        this.compactor = createCompactor(baseHoodieWriteClient);
        this.maxConcurrentCompaction = 1;
    }

    protected abstract BaseCompactor createCompactor(BaseHoodieWriteClient baseHoodieWriteClient);

    @Override // org.apache.hudi.async.HoodieAsyncService
    protected Pair<CompletableFuture, ExecutorService> startService() {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.maxConcurrentCompaction, new CustomizedThreadFactory("async_compact_thread", isRunInDaemonMode()));
        return Pair.of(CompletableFuture.allOf((CompletableFuture[]) IntStream.range(0, this.maxConcurrentCompaction).mapToObj(i -> {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    LOG.info("Setting pool name for compaction to hoodiecompact");
                    this.context.setProperty(EngineProperty.COMPACTION_POOL_NAME, COMPACT_POOL_NAME);
                    while (!isShutdownRequested()) {
                        HoodieInstant fetchNextAsyncServiceInstant = fetchNextAsyncServiceInstant();
                        if (null != fetchNextAsyncServiceInstant) {
                            LOG.info("Starting Compaction for instant " + fetchNextAsyncServiceInstant);
                            this.compactor.compact(fetchNextAsyncServiceInstant);
                            LOG.info("Finished Compaction for instant " + fetchNextAsyncServiceInstant);
                        }
                    }
                    LOG.info("Compactor shutting down properly!!");
                } catch (IOException e) {
                    this.hasError = true;
                    LOG.error("Compactor executor failed due to IOException", e);
                    throw new HoodieIOException(e.getMessage(), e);
                } catch (InterruptedException e2) {
                    this.hasError = true;
                    LOG.warn("Compactor executor thread got interrupted exception. Stopping", e2);
                } catch (Exception e3) {
                    this.hasError = true;
                    LOG.error("Compactor executor failed", e3);
                    throw e3;
                }
                return true;
            }, newFixedThreadPool);
        }).toArray(i2 -> {
            return new CompletableFuture[i2];
        })), newFixedThreadPool);
    }

    protected boolean shouldStopCompactor() {
        return false;
    }

    public synchronized void updateWriteClient(BaseHoodieWriteClient baseHoodieWriteClient) {
        this.compactor.updateWriteClient(baseHoodieWriteClient);
    }
}
