/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

public abstract class ForkedProcessorStep<T>
extends ProcessorStep<T> {
    protected static final int MAIN = 0;
    private final AtomicInteger doneSignal = new AtomicInteger();
    private final int maxForkedProcessors;
    protected final List<ForkedProcessor> forkedProcessors = new ArrayList<ForkedProcessor>();
    private T currentBatch;
    private volatile long globalTicket;
    private volatile int processorCount = 1;
    private Throwable error;
    private Thread submitterThread;

    protected ForkedProcessorStep(StageControl control, String name, Configuration config, int maxProcessors) {
        super(control, name, config, 1, new StatsProvider[0]);
        this.maxForkedProcessors = maxProcessors == 0 ? config.maxNumberOfProcessors() : maxProcessors;
        this.applyProcessorCount();
    }

    @Override
    protected void process(T batch, BatchSender sender) throws Throwable {
        this.applyProcessorCount();
        int processorCount = this.forkedProcessors.size();
        if (processorCount == 1) {
            this.forkedProcess(0, 1, batch);
        } else {
            this.currentBatch = batch;
            this.submitterThread = Thread.currentThread();
            this.doneSignal.set(processorCount);
            ++this.globalTicket;
            this.notifyProcessors();
            while (this.doneSignal.get() > 0) {
                LockSupport.park();
            }
            if (this.error != null) {
                throw this.error;
            }
        }
        if (this.downstream != null) {
            sender.send(batch);
        }
    }

    private void notifyProcessors() {
        for (int i = 0; i < this.forkedProcessors.size(); ++i) {
            LockSupport.unpark(this.forkedProcessors.get(i));
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        for (ForkedProcessor forkedProcessor : this.forkedProcessors) {
            forkedProcessor.halt();
        }
    }

    protected abstract void forkedProcess(int var1, int var2, T var3);

    private void applyProcessorCount() {
        int processorCount = this.processorCount;
        while (processorCount != this.forkedProcessors.size()) {
            if (this.forkedProcessors.size() < processorCount) {
                this.forkedProcessors.add(new ForkedProcessor(this.forkedProcessors.size()));
                continue;
            }
            this.forkedProcessors.remove(this.forkedProcessors.size() - 1).halt();
        }
    }

    @Override
    public int processors(int delta) {
        int processors = this.processorCount;
        if ((processors += delta) < 1) {
            processors = 1;
        }
        if (processors > this.maxForkedProcessors) {
            processors = this.maxForkedProcessors;
        }
        this.processorCount = processors;
        return this.processorCount;
    }

    class ForkedProcessor
    extends Thread {
        private final int id;
        private volatile boolean halted;
        private long localTicket;

        ForkedProcessor(int id) {
            super(ForkedProcessorStep.this.name() + "-" + id);
            this.id = id;
            this.localTicket = ForkedProcessorStep.this.globalTicket;
            this.start();
        }

        @Override
        public void run() {
            while (!this.halted) {
                boolean processed = false;
                try {
                    LockSupport.park();
                    if (this.halted || this.localTicket + 1L != ForkedProcessorStep.this.globalTicket) continue;
                    processed = true;
                    ForkedProcessorStep.this.forkedProcess(this.id, ForkedProcessorStep.this.forkedProcessors.size(), ForkedProcessorStep.this.currentBatch);
                }
                catch (Throwable t) {
                    ForkedProcessorStep.this.error = t;
                }
                finally {
                    if (!processed) continue;
                    ++this.localTicket;
                    ForkedProcessorStep.this.doneSignal.decrementAndGet();
                    LockSupport.unpark(ForkedProcessorStep.this.submitterThread);
                }
            }
        }

        void halt() {
            this.halted = true;
            LockSupport.unpark(this);
        }
    }
}

