package org.apache.hudi.common.util.queue;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/common/util/queue/DisruptorMessageQueue.class */
public class DisruptorMessageQueue<I, O> implements HoodieMessageQueue<I, O> {
    private static final Logger LOG = LogManager.getLogger(DisruptorMessageQueue.class);
    private final Disruptor<DisruptorMessageQueue<I, O>.HoodieDisruptorEvent> queue;
    private final Function<I, O> transformFunction;
    private final RingBuffer<DisruptorMessageQueue<I, O>.HoodieDisruptorEvent> ringBuffer;
    private boolean isShutdown = false;
    private boolean isStarted = false;
    private static final long TIMEOUT_WAITING_SECS = 10;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hudi/common/util/queue/DisruptorMessageQueue$HoodieDisruptorEvent.class */
    public class HoodieDisruptorEvent {
        private O value;

        HoodieDisruptorEvent() {
        }

        public void set(O o) {
            this.value = o;
        }

        public O get() {
            return this.value;
        }
    }

    public DisruptorMessageQueue(int i, Function<I, O> function, String str, int i2, Runnable runnable) {
        WaitStrategy build = WaitStrategyFactory.build(str);
        this.queue = new Disruptor<>(() -> {
            return new HoodieDisruptorEvent();
        }, i, new CustomizedThreadFactory("disruptor", true, runnable), i2 > 1 ? ProducerType.MULTI : ProducerType.SINGLE, build);
        this.ringBuffer = this.queue.getRingBuffer();
        this.transformFunction = function;
    }

    @Override // org.apache.hudi.common.util.queue.HoodieMessageQueue
    public long size() {
        return this.ringBuffer.getBufferSize() - this.ringBuffer.remainingCapacity();
    }

    @Override // org.apache.hudi.common.util.queue.HoodieMessageQueue
    public void insertRecord(I i) throws Exception {
        if (!this.isStarted) {
            throw new HoodieException("Can't insert into the queue since the queue is not started yet");
        }
        if (this.isShutdown) {
            throw new HoodieException("Can't insert into the queue after it had already been closed");
        }
        O apply = this.transformFunction.apply(i);
        this.queue.getRingBuffer().publishEvent((hoodieDisruptorEvent, j) -> {
            hoodieDisruptorEvent.set(apply);
        });
    }

    @Override // org.apache.hudi.common.util.queue.HoodieMessageQueue
    public Option<O> readNextRecord() {
        throw new UnsupportedOperationException("Should not call readNextRecord here. And let DisruptorMessageHandler to handle consuming logic");
    }

    @Override // org.apache.hudi.common.util.queue.HoodieMessageQueue
    public void markAsFailed(Throwable th) {
    }

    @Override // org.apache.hudi.common.util.queue.HoodieMessageQueue
    public boolean isEmpty() {
        return ((long) this.ringBuffer.getBufferSize()) == this.ringBuffer.remainingCapacity();
    }

    @Override // org.apache.hudi.common.util.queue.HoodieMessageQueue
    public void seal() {
    }

    @Override // org.apache.hudi.common.util.queue.HoodieMessageQueue, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        synchronized (this) {
            if (!this.isShutdown) {
                this.isShutdown = true;
                this.isStarted = false;
                if (Thread.currentThread().isInterrupted()) {
                    LOG.error("Disruptor Queue has been interrupted! Shutdown now.");
                    try {
                        this.queue.shutdown(TIMEOUT_WAITING_SECS, TimeUnit.SECONDS);
                        throw new HoodieException("Disruptor Queue has been interrupted! Shutdown now.");
                    } catch (TimeoutException e) {
                        LOG.error("Disruptor queue shutdown timeout: " + e);
                        throw new HoodieException((Throwable) e);
                    }
                }
                this.queue.shutdown();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setHandlers(HoodieConsumer<O, ?> hoodieConsumer) {
        this.queue.handleEventsWith(new EventHandler[]{(hoodieDisruptorEvent, j, z) -> {
            try {
                hoodieConsumer.consume(hoodieDisruptorEvent.get());
            } catch (Exception e) {
                LOG.error("Failed consuming records", e);
            }
        }});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void start() {
        synchronized (this) {
            if (!this.isStarted) {
                this.queue.start();
                this.isStarted = true;
            }
        }
    }
}
