/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.neo4j.test.OtherThreadRule;
import org.neo4j.test.RandomRule;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.Configuration;
import org.neo4j.unsafe.impl.batchimport.staging.ForkedProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.SimpleStageControl;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.staging.Step;

public class ForkedProcessorStepTest {
    @Rule
    public final OtherThreadRule<Void> t2 = new OtherThreadRule();
    @Rule
    public final OtherThreadRule<Void> t3 = new OtherThreadRule();
    @Rule
    public final RandomRule random = new RandomRule();

    @Test
    public void shouldProcessBatchBySingleThread() throws Exception {
        SimpleStageControl control = new SimpleStageControl();
        final AtomicReference processed = new AtomicReference();
        final CountDownLatch latch = new CountDownLatch(1);
        try (ForkedProcessorStep<Object> step = new ForkedProcessorStep<Object>((StageControl)control, "Test", Configuration.DEFAULT, 1){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            protected void forkedProcess(int id, int processors, Object batch) {
                try {
                    Assert.assertEquals((long)0L, (long)id);
                    Assert.assertEquals((long)1L, (long)processors);
                    processed.set(batch);
                }
                finally {
                    latch.countDown();
                }
            }
        };){
            control.steps(new Step[]{step});
            step.start(1);
            Object expectedBatch = new Object();
            step.receive(0L, expectedBatch);
            latch.await();
            Assert.assertSame((Object)expectedBatch, processed.get());
            control.assertHealthy();
        }
    }

    @Test
    public void shouldProcessBatchByMultipleThreads() throws Exception {
        SimpleStageControl control = new SimpleStageControl();
        int threadCount = 10;
        final CountDownLatch latch = new CountDownLatch(threadCount);
        final Object expectedBatch = new Object();
        try (ForkedProcessorStep<Object> step = new ForkedProcessorStep<Object>((StageControl)control, "Test", Configuration.DEFAULT, threadCount){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            protected void forkedProcess(int id, int processors, Object batch) {
                try {
                    Assert.assertSame((Object)expectedBatch, (Object)batch);
                }
                finally {
                    latch.countDown();
                }
            }
        };){
            control.steps(new Step[]{step});
            step.processors(threadCount);
            step.start(1);
            step.receive(0L, expectedBatch);
            latch.await();
            control.assertHealthy();
        }
    }

    @Test
    public void shouldNotMissABeatUnderStress() throws Exception {
        SimpleStageControl control = new SimpleStageControl();
        final int maxProcessorCount = 10;
        try (ForkedProcessorStep<Object> step = new ForkedProcessorStep<Object>((StageControl)control, "Stress", Configuration.DEFAULT, maxProcessorCount){
            private boolean[] seen;
            {
                super(x0, x1, x2, x3);
                this.seen = new boolean[maxProcessorCount];
            }

            protected void forkedProcess(int id, int processors, Object batch) {
                if (this.seen[id]) {
                    Assert.fail((String)(Arrays.toString(this.seen) + " id:" + id + " processors:" + processors));
                }
                this.seen[id] = true;
            }

            protected void process(Object batch, BatchSender sender) throws Throwable {
                super.process(batch, sender);
                for (int i = 0; i < this.forkedProcessors.size(); ++i) {
                    Assert.assertTrue((boolean)this.seen[i]);
                }
                Arrays.fill(this.seen, false);
            }
        };){
            step.start(1);
            control.steps(new Step[]{step});
            this.t2.execute(arg_0 -> this.lambda$shouldNotMissABeatUnderStress$0((Step)step, maxProcessorCount, arg_0));
            this.t3.execute(arg_0 -> ForkedProcessorStepTest.lambda$shouldNotMissABeatUnderStress$1((Step)step, arg_0));
            long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(1L);
            long count = 0L;
            while (System.currentTimeMillis() < endTime) {
                step.receive(count, new Object());
                ++count;
            }
            step.endOfUpstream();
            while (!step.isCompleted()) {
                Thread.sleep(10L);
            }
            control.assertHealthy();
        }
    }

    private static /* synthetic */ Object lambda$shouldNotMissABeatUnderStress$1(Step step, Void ignore) throws Exception {
        while (!step.isCompleted()) {
            for (Thread thread : Thread.getAllStackTraces().keySet()) {
                if (!thread.getName().contains("Stress-")) continue;
                LockSupport.unpark(thread);
            }
        }
        return null;
    }

    private /* synthetic */ Object lambda$shouldNotMissABeatUnderStress$0(Step step, int maxProcessorCount, Void ignore) throws Exception {
        while (!step.isCompleted()) {
            Thread.sleep(10L);
            step.processors(this.random.nextInt(maxProcessorCount) + 1);
        }
        return null;
    }
}

