package org.nuxeo.lib.stream.computation.log;

import java.time.Duration;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.Computation;
import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Watermark;
import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl;
import org.nuxeo.lib.stream.computation.internals.WatermarkMonotonicInterval;
import org.nuxeo.lib.stream.log.LogAppender;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.RebalanceException;
import org.nuxeo.lib.stream.log.RebalanceListener;

/* loaded from: input_file:org/nuxeo/lib/stream/computation/log/ComputationRunner.class */
public class ComputationRunner implements Runnable, RebalanceListener {
    protected static final long STARVING_TIMEOUT_MS = 1000;
    protected static final long INACTIVITY_BREAK_MS = 100;
    protected final LogManager logManager;
    protected final ComputationMetadataMapping metadata;
    protected final LogTailer<Record> tailer;
    protected final Supplier<Computation> supplier;
    protected final Codec<Record> inputCodec;
    protected final Codec<Record> outputCodec;
    protected ComputationContextImpl context;
    protected volatile boolean stop;
    protected volatile boolean drain;
    protected Computation computation;
    protected long counter;
    protected long inRecords;
    protected long inCheckpointRecords;
    protected long outRecords;
    protected long lastTimerExecution;
    protected String threadName;
    public static final Duration READ_TIMEOUT = Duration.ofMillis(25);
    private static final Log log = LogFactory.getLog(ComputationRunner.class);
    protected final CountDownLatch assignmentLatch = new CountDownLatch(1);
    protected final WatermarkMonotonicInterval lowWatermark = new WatermarkMonotonicInterval();
    protected long lastReadTime = System.currentTimeMillis();

    public ComputationRunner(Supplier<Computation> supplier, ComputationMetadataMapping computationMetadataMapping, List<LogPartition> list, LogManager logManager, Codec<Record> codec, Codec<Record> codec2) {
        this.supplier = supplier;
        this.metadata = computationMetadataMapping;
        this.logManager = logManager;
        this.context = new ComputationContextImpl(computationMetadataMapping);
        this.inputCodec = codec;
        this.outputCodec = codec2;
        if (computationMetadataMapping.inputStreams().isEmpty()) {
            this.tailer = null;
            this.assignmentLatch.countDown();
        } else if (logManager.supportSubscribe()) {
            this.tailer = logManager.subscribe(computationMetadataMapping.name(), computationMetadataMapping.inputStreams(), this, codec);
        } else {
            this.tailer = logManager.createTailer(computationMetadataMapping.name(), list, codec);
            this.assignmentLatch.countDown();
        }
    }

    public void stop() {
        log.debug(this.metadata.name() + ": Receives Stop signal");
        this.stop = true;
        if (this.computation != null) {
            this.computation.signalStop();
        }
    }

    public void drain() {
        log.debug(this.metadata.name() + ": Receives Drain signal");
        this.drain = true;
    }

    public boolean waitForAssignments(Duration duration) throws InterruptedException {
        if (this.assignmentLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
            return true;
        }
        log.warn(this.metadata.name() + ": Timeout waiting for assignment");
        return false;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.threadName = Thread.currentThread().getName();
        this.computation = this.supplier.get();
        log.debug(this.metadata.name() + ": Init");
        this.computation.init(this.context);
        log.debug(this.metadata.name() + ": Start");
        try {
            try {
                processLoop();
                try {
                    this.computation.destroy();
                    closeTailer();
                    log.debug(this.metadata.name() + ": Exited");
                    if (0 != 0) {
                        Thread.currentThread().interrupt();
                    }
                } finally {
                }
            } catch (InterruptedException e) {
                String str = this.metadata.name() + ": Interrupted";
                if (log.isTraceEnabled()) {
                    log.debug(str, e);
                } else {
                    log.debug(str);
                }
                try {
                    this.computation.destroy();
                    closeTailer();
                    log.debug(this.metadata.name() + ": Exited");
                    if (1 != 0) {
                        Thread.currentThread().interrupt();
                    }
                } catch (Throwable th) {
                    if (1 != 0) {
                        Thread.currentThread().interrupt();
                    }
                    throw th;
                }
            } catch (Exception e2) {
                if (!Thread.currentThread().isInterrupted()) {
                    log.error(this.metadata.name() + ": Exception in processLoop: " + e2.getMessage(), e2);
                    throw e2;
                }
                log.info(this.metadata.name() + ": Interrupted", e2);
                try {
                    this.computation.destroy();
                    closeTailer();
                    log.debug(this.metadata.name() + ": Exited");
                    if (0 != 0) {
                        Thread.currentThread().interrupt();
                    }
                } finally {
                    if (0 != 0) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        } catch (Throwable th2) {
            try {
                this.computation.destroy();
                closeTailer();
                log.debug(this.metadata.name() + ": Exited");
                if (0 != 0) {
                    Thread.currentThread().interrupt();
                }
                throw th2;
            } finally {
            }
        }
    }

    protected void closeTailer() {
        if (this.tailer == null || this.tailer.closed()) {
            return;
        }
        this.tailer.close();
    }

    protected void processLoop() throws InterruptedException {
        while (continueLoop()) {
            boolean processTimer = processTimer() | processRecord();
            this.counter++;
            if (!processTimer) {
                Thread.sleep(INACTIVITY_BREAK_MS);
            }
        }
    }

    protected boolean continueLoop() {
        if (this.stop || Thread.currentThread().isInterrupted()) {
            return false;
        }
        if (!this.drain) {
            return true;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (!this.metadata.inputStreams().isEmpty()) {
            if (currentTimeMillis - this.lastReadTime <= STARVING_TIMEOUT_MS) {
                return true;
            }
            log.info(this.metadata.name() + ": End of drain no more input after " + (currentTimeMillis - this.lastReadTime) + " ms, " + this.inRecords + " records read, " + this.counter + " reads attempt");
            return false;
        }
        if (this.lastTimerExecution <= 0 || currentTimeMillis - this.lastTimerExecution <= STARVING_TIMEOUT_MS) {
            return true;
        }
        log.info(this.metadata.name() + ": End of source drain, last timer " + STARVING_TIMEOUT_MS + " ms ago");
        return false;
    }

    protected boolean processTimer() {
        Map<String, Long> timers = this.context.getTimers();
        if (timers.isEmpty()) {
            return false;
        }
        if (this.tailer != null && this.tailer.assignments().isEmpty()) {
            return false;
        }
        long currentTimeMillis = System.currentTimeMillis();
        boolean[] zArr = {false};
        ((LinkedHashMap) timers.entrySet().stream().filter(entry -> {
            return ((Long) entry.getValue()).longValue() <= currentTimeMillis;
        }).sorted(Map.Entry.comparingByValue()).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }, (l, l2) -> {
            return l;
        }, LinkedHashMap::new))).forEach((str, l3) -> {
            this.context.removeTimer(str);
            this.computation.processTimer(this.context, str, l3.longValue());
            zArr[0] = true;
        });
        if (!zArr[0]) {
            return false;
        }
        checkSourceLowWatermark();
        this.lastTimerExecution = currentTimeMillis;
        setThreadName("timer");
        checkpointIfNecessary();
        if (!this.context.requireTerminate()) {
            return true;
        }
        this.stop = true;
        return true;
    }

    protected boolean processRecord() throws InterruptedException {
        if (this.context.requireTerminate()) {
            this.stop = true;
            return true;
        }
        if (this.tailer == null) {
            return false;
        }
        LogRecord<Record> logRecord = null;
        try {
            logRecord = this.tailer.read(getTimeoutDuration());
        } catch (RebalanceException e) {
        }
        if (logRecord == null) {
            return false;
        }
        Record message = logRecord.message();
        this.lastReadTime = System.currentTimeMillis();
        this.inRecords++;
        this.lowWatermark.mark(message.getWatermark());
        this.computation.processRecord(this.context, this.metadata.reverseMap(logRecord.offset().partition().name()), message);
        checkRecordFlags(message);
        checkSourceLowWatermark();
        setThreadName("record");
        checkpointIfNecessary();
        return true;
    }

    protected Duration getTimeoutDuration() {
        return Duration.ofMillis(Math.min(READ_TIMEOUT.toMillis(), System.currentTimeMillis() - this.lastReadTime));
    }

    protected void checkSourceLowWatermark() {
        long sourceLowWatermark = this.context.getSourceLowWatermark();
        if (sourceLowWatermark > 0) {
            this.lowWatermark.mark(Watermark.ofValue(sourceLowWatermark));
            this.context.setSourceLowWatermark(0L);
        }
    }

    protected void checkRecordFlags(Record record) {
        if (record.getFlags().contains(Record.Flag.POISON_PILL)) {
            log.info(this.metadata.name() + ": Receive POISON PILL");
            this.context.askForCheckpoint();
            this.stop = true;
        } else if (record.getFlags().contains(Record.Flag.COMMIT)) {
            this.context.askForCheckpoint();
        }
    }

    protected void checkpointIfNecessary() {
        if (this.context.requireCheckpoint()) {
            boolean z = false;
            try {
                checkpoint();
                z = true;
                this.inCheckpointRecords = this.inRecords;
                if (1 == 0) {
                    log.error(this.metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates.");
                }
            } catch (Throwable th) {
                if (!z) {
                    log.error(this.metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates.");
                }
                throw th;
            }
        }
    }

    protected void checkpoint() {
        sendRecords();
        saveTimers();
        saveState();
        saveOffsets();
        this.lowWatermark.checkpoint();
        this.context.removeCheckpointFlag();
        log.debug(this.metadata.name() + ": checkpoint");
        setThreadName("checkpoint");
    }

    protected void saveTimers() {
    }

    protected void saveState() {
    }

    protected void saveOffsets() {
        if (this.tailer != null) {
            this.tailer.commit();
        }
    }

    protected void sendRecords() {
        for (String str : this.metadata.outputStreams()) {
            LogAppender appender = this.logManager.getAppender(str, this.outputCodec);
            for (Record record : this.context.getRecords(str)) {
                if (record.getWatermark() == 0) {
                    record.setWatermark(this.lowWatermark.getLow().getValue());
                }
                appender.append(record.getKey(), (String) record);
                this.outRecords++;
            }
            this.context.getRecords(str).clear();
        }
    }

    public Watermark getLowWatermark() {
        return this.lowWatermark.getLow();
    }

    protected void setThreadName(String str) {
        String str2 = this.threadName + ",in:" + this.inRecords + ",inCheckpoint:" + this.inCheckpointRecords + ",out:" + this.outRecords + ",lastRead:" + this.lastReadTime + ",lastTimer:" + this.lastTimerExecution + ",wm:" + this.lowWatermark.getLow().getValue() + ",loop:" + this.counter;
        if (str != null) {
            str2 = str2 + "," + str;
        }
        Thread.currentThread().setName(str2);
    }

    @Override // org.nuxeo.lib.stream.log.RebalanceListener
    public void onPartitionsRevoked(Collection<LogPartition> collection) {
        setThreadName("rebalance revoked");
    }

    @Override // org.nuxeo.lib.stream.log.RebalanceListener
    public void onPartitionsAssigned(Collection<LogPartition> collection) {
        this.lastReadTime = System.currentTimeMillis();
        setThreadName("rebalance assigned");
        this.context = new ComputationContextImpl(this.metadata);
        log.debug(this.metadata.name() + ": Init");
        this.computation.init(this.context);
        this.lastReadTime = System.currentTimeMillis();
        this.lastTimerExecution = 0L;
        this.assignmentLatch.countDown();
    }
}
