package org.apache.hudi.common.util.queue;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.class */
public class BoundedInMemoryExecutor<I, O, E> {
    private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class);
    private static final long TERMINATE_WAITING_TIME_SECS = 60;
    private final ExecutorService producerExecutorService;
    private final ExecutorService consumerExecutorService;
    private final BoundedInMemoryQueue<I, O> queue;
    private final List<BoundedInMemoryQueueProducer<I>> producers;
    private final Option<BoundedInMemoryQueueConsumer<O, E>> consumer;
    private final Runnable preExecuteRunnable;

    public BoundedInMemoryExecutor(long j, Iterator<I> it2, BoundedInMemoryQueueConsumer<O, E> boundedInMemoryQueueConsumer, Function<I, O> function, Runnable runnable) {
        this(j, new IteratorBasedQueueProducer(it2), Option.of(boundedInMemoryQueueConsumer), function, runnable);
    }

    public BoundedInMemoryExecutor(long j, BoundedInMemoryQueueProducer<I> boundedInMemoryQueueProducer, Option<BoundedInMemoryQueueConsumer<O, E>> option, Function<I, O> function) {
        this(j, boundedInMemoryQueueProducer, option, function, Functions.noop());
    }

    public BoundedInMemoryExecutor(long j, BoundedInMemoryQueueProducer<I> boundedInMemoryQueueProducer, Option<BoundedInMemoryQueueConsumer<O, E>> option, Function<I, O> function, Runnable runnable) {
        this(j, Collections.singletonList(boundedInMemoryQueueProducer), option, function, new DefaultSizeEstimator(), runnable);
    }

    public BoundedInMemoryExecutor(long j, List<BoundedInMemoryQueueProducer<I>> list, Option<BoundedInMemoryQueueConsumer<O, E>> option, Function<I, O> function, SizeEstimator<O> sizeEstimator, Runnable runnable) {
        this.producers = list;
        this.consumer = option;
        this.preExecuteRunnable = runnable;
        this.producerExecutorService = Executors.newFixedThreadPool(list.size(), new CustomizedThreadFactory("producer"));
        this.consumerExecutorService = Executors.newSingleThreadExecutor(new CustomizedThreadFactory("consumer"));
        this.queue = new BoundedInMemoryQueue<>(j, function, sizeEstimator);
    }

    public ExecutorCompletionService<Boolean> startProducers() {
        CountDownLatch countDownLatch = new CountDownLatch(this.producers.size());
        ExecutorCompletionService<Boolean> executorCompletionService = new ExecutorCompletionService<>(this.producerExecutorService);
        this.producers.stream().map(boundedInMemoryQueueProducer -> {
            return executorCompletionService.submit(() -> {
                try {
                    try {
                        this.preExecuteRunnable.run();
                        boundedInMemoryQueueProducer.produce(this.queue);
                        synchronized (countDownLatch) {
                            countDownLatch.countDown();
                            if (countDownLatch.getCount() == 0) {
                                this.queue.close();
                            }
                        }
                        return true;
                    } finally {
                    }
                } catch (Throwable th) {
                    synchronized (countDownLatch) {
                        countDownLatch.countDown();
                        if (countDownLatch.getCount() == 0) {
                            this.queue.close();
                        }
                        throw th;
                    }
                }
            });
        }).collect(Collectors.toList());
        return executorCompletionService;
    }

    private Future<E> startConsumer() {
        return (Future) this.consumer.map(boundedInMemoryQueueConsumer -> {
            return this.consumerExecutorService.submit(() -> {
                LOG.info("starting consumer thread");
                this.preExecuteRunnable.run();
                try {
                    Object consume = boundedInMemoryQueueConsumer.consume(this.queue);
                    LOG.info("Queue Consumption is done; notifying producer threads");
                    return consume;
                } catch (Exception e) {
                    LOG.error("error consuming records", e);
                    this.queue.markAsFailed(e);
                    throw e;
                }
            });
        }).orElse(CompletableFuture.completedFuture(null));
    }

    public E execute() {
        try {
            startProducers();
            return startConsumer().get();
        } catch (InterruptedException e) {
            shutdownNow();
            Thread.currentThread().interrupt();
            throw new HoodieException(e);
        } catch (Exception e2) {
            throw new HoodieException(e2);
        }
    }

    public boolean isRemaining() {
        return this.queue.iterator().hasNext();
    }

    public void shutdownNow() {
        this.producerExecutorService.shutdownNow();
        this.consumerExecutorService.shutdownNow();
        this.queue.close();
    }

    public boolean awaitTermination() {
        boolean interrupted = Thread.interrupted();
        boolean z = false;
        boolean z2 = false;
        try {
            z = this.producerExecutorService.awaitTermination(60L, TimeUnit.SECONDS);
            z2 = this.consumerExecutorService.awaitTermination(60L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
        return z && z2;
    }

    public BoundedInMemoryQueue<I, O> getQueue() {
        return this.queue;
    }
}
