/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.iterative.concurrent;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class BlockingBackChannelTest {
    private static final int NUM_ITERATIONS = 3;
    private static final Integer INPUT_COMPLETELY_PROCESSED_MESSAGE = 1;

    @Test
    public void multiThreaded() throws InterruptedException {
        ArrayBlockingQueue<Integer> dataChannel = new ArrayBlockingQueue<Integer>(1);
        ArrayList<String> actionLog = new ArrayList<String>();
        SerializedUpdateBuffer buffer = (SerializedUpdateBuffer)Mockito.mock(SerializedUpdateBuffer.class);
        BlockingBackChannel channel = new BlockingBackChannel(buffer);
        Thread head = new Thread(new IterationHead(channel, dataChannel, actionLog));
        Thread tail = new Thread(new IterationTail(channel, dataChannel, actionLog));
        tail.start();
        head.start();
        head.join();
        tail.join();
        Assert.assertEquals((long)12L, (long)actionLog.size());
        Assert.assertEquals((Object)"head sends data", actionLog.get(0));
        Assert.assertEquals((Object)"tail receives data", actionLog.get(1));
        Assert.assertEquals((Object)"tail writes in iteration 0", actionLog.get(2));
        Assert.assertEquals((Object)"head reads in iteration 0", actionLog.get(3));
        Assert.assertEquals((Object)"head sends data", actionLog.get(4));
        Assert.assertEquals((Object)"tail receives data", actionLog.get(5));
        Assert.assertEquals((Object)"tail writes in iteration 1", actionLog.get(6));
        Assert.assertEquals((Object)"head reads in iteration 1", actionLog.get(7));
        Assert.assertEquals((Object)"head sends data", actionLog.get(8));
        Assert.assertEquals((Object)"tail receives data", actionLog.get(9));
        Assert.assertEquals((Object)"tail writes in iteration 2", actionLog.get(10));
        Assert.assertEquals((Object)"head reads in iteration 2", actionLog.get(11));
    }

    class IterationTail
    implements Runnable {
        private final BlockingBackChannel backChannel;
        private final BlockingQueue<Integer> dataChannel;
        private final Random random;
        private final List<String> actionLog;

        IterationTail(BlockingBackChannel backChannel, BlockingQueue<Integer> dataChannel, List<String> actionLog) {
            this.backChannel = backChannel;
            this.dataChannel = dataChannel;
            this.actionLog = actionLog;
            this.random = new Random();
        }

        @Override
        public void run() {
            try {
                for (int n = 0; n < 3; ++n) {
                    DataOutputView writeEnd = this.backChannel.getWriteEnd();
                    this.readInputFromDataChannel();
                    Thread.sleep(this.random.nextInt(10));
                    DataInputView inputView = (DataInputView)Mockito.mock(DataInputView.class);
                    this.actionLog.add("tail writes in iteration " + n);
                    writeEnd.write(inputView, 1);
                    this.backChannel.notifyOfEndOfSuperstep();
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        void readInputFromDataChannel() throws InterruptedException {
            this.dataChannel.take();
            this.actionLog.add("tail receives data");
        }
    }

    class IterationHead
    implements Runnable {
        private final BlockingBackChannel backChannel;
        private final BlockingQueue<Integer> dataChannel;
        private final Random random;
        private final List<String> actionLog;

        IterationHead(BlockingBackChannel backChannel, BlockingQueue<Integer> dataChannel, List<String> actionLog) {
            this.backChannel = backChannel;
            this.dataChannel = dataChannel;
            this.actionLog = actionLog;
            this.random = new Random();
        }

        @Override
        public void run() {
            this.processInputAndSendMessageThroughDataChannel();
            for (int n = 0; n < 3; ++n) {
                try {
                    this.backChannel.getReadEndAfterSuperstepEnded();
                    this.actionLog.add("head reads in iteration " + n);
                    Thread.sleep(this.random.nextInt(100));
                    if (n == 2) continue;
                    this.processInputAndSendMessageThroughDataChannel();
                    continue;
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        void processInputAndSendMessageThroughDataChannel() {
            this.actionLog.add("head sends data");
            this.dataChannel.offer(INPUT_COMPLETELY_PROCESSED_MESSAGE);
        }
    }
}

