/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.pc;

import java.util.concurrent.BlockingQueue;
import org.apache.atlas.pc.StatusReporter;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.pc.WorkItemConsumer;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.commons.lang3.RandomUtils;
import org.testng.Assert;
import org.testng.annotations.Test;

public class StatusReporterTest {
    private WorkItemManager<Integer, WorkItemConsumer> getWorkItemManger(IntegerConsumerBuilder cb, int numWorkers) {
        return new WorkItemManager((WorkItemBuilder)cb, "IntegerConsumer", 5, numWorkers, true);
    }

    @Test
    public void statusReporting() throws InterruptedException {
        int maxItems = 50;
        IntegerConsumerBuilder cb = new IntegerConsumerBuilder();
        WorkItemManager<Integer, WorkItemConsumer> wi = this.getWorkItemManger(cb, 5);
        StatusReporter statusReporter = new StatusReporter();
        for (int i = 0; i < 50; ++i) {
            wi.produce((Object)i);
            statusReporter.produced((Object)i, (Object)i);
            this.extractResults(wi, (StatusReporter<Integer, Integer>)statusReporter);
        }
        wi.drain();
        this.extractResults(wi, (StatusReporter<Integer, Integer>)statusReporter);
        Assert.assertEquals((int)((Integer)statusReporter.ack()), (int)49);
        wi.shutdown();
        Assert.assertEquals((int)statusReporter.getProducedCount(), (int)0);
        Assert.assertEquals((int)statusReporter.getProcessedCount(), (int)0);
    }

    private void extractResults(WorkItemManager<Integer, WorkItemConsumer> wi, StatusReporter<Integer, Integer> statusReporter) {
        Object result = null;
        while (true) {
            Object e = wi.getResults().poll();
            result = e;
            if (e == null) break;
            if (result == null || !(result instanceof Integer)) continue;
            statusReporter.processed((Object)result);
        }
    }

    @Test
    public void reportWithTimeout() throws InterruptedException {
        StatusReporter statusReporter = new StatusReporter(2000L);
        statusReporter.produced((Object)1, (Object)100);
        statusReporter.produced((Object)2, (Object)200);
        statusReporter.processed((Object)2);
        Integer ack = (Integer)statusReporter.ack();
        Assert.assertNull((Object)ack);
        Thread.sleep(3000L);
        ack = (Integer)statusReporter.ack();
        Assert.assertNotNull((Object)ack);
        Assert.assertEquals((Object)ack, (Object)200);
    }

    private class IntegerConsumerBuilder
    implements WorkItemBuilder<IntegerConsumer, Integer> {
        private IntegerConsumerBuilder() {
        }

        public IntegerConsumer build(BlockingQueue<Integer> queue) {
            return new IntegerConsumer(queue);
        }
    }

    private static class IntegerConsumer
    extends WorkItemConsumer<Integer> {
        private static ThreadLocal<Integer> payload = new ThreadLocal();
        private Integer current;

        public IntegerConsumer(BlockingQueue<Integer> queue) {
            super(queue);
        }

        protected void doCommit() {
            this.addResult(this.current);
        }

        protected void processItem(Integer item) {
            try {
                this.current = item;
                Thread.sleep(20 + RandomUtils.nextInt((int)5, (int)7));
                super.commit();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

