/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.flink.core.memory.HeapMemorySegment;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReaderImpl;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ChannelStateReaderImplTest {
    private static final InputChannelInfo CHANNEL = new InputChannelInfo(1, 2);
    private static final byte[] DATA = ChannelStateReaderImplTest.generateData(10);
    private ChannelStateReaderImpl reader;

    @Before
    public void init() {
        this.reader = this.getReader(CHANNEL, DATA);
    }

    @After
    public void tearDown() throws Exception {
        this.reader.close();
    }

    @Test
    public void testDifferentBufferSizes() throws Exception {
        for (int bufferSize = 1; bufferSize < 2 * DATA.length; ++bufferSize) {
            try (ChannelStateReaderImpl reader = this.getReader(CHANNEL, DATA);){
                this.readAndVerify(bufferSize, CHANNEL, DATA, (ChannelStateReader)reader);
                continue;
            }
        }
    }

    @Test
    public void testWithOffsets() throws IOException {
        Map<InputChannelStateHandle, byte[]> handlesAndBytes = this.generateHandlesWithBytes(10, 20);
        ChannelStateReaderImpl reader = new ChannelStateReaderImpl(this.taskStateSnapshot(handlesAndBytes.keySet()), (ChannelStateSerializer)new ChannelStateSerializerImpl());
        for (Map.Entry<InputChannelStateHandle, byte[]> e : handlesAndBytes.entrySet()) {
            this.readAndVerify(42, (InputChannelInfo)e.getKey().getInfo(), e.getValue(), (ChannelStateReader)reader);
        }
    }

    @Test(expected=Exception.class)
    public void testReadOnlyOnce() throws IOException {
        this.reader.readInputData(CHANNEL, (Buffer)this.getBuffer(DATA.length));
        this.reader.readInputData(CHANNEL, (Buffer)this.getBuffer(DATA.length));
    }

    @Test(expected=IllegalStateException.class)
    public void testReadClosed() throws Exception {
        this.reader.close();
        this.reader.readInputData(CHANNEL, (Buffer)this.getBuffer(DATA.length));
    }

    @Test
    public void testReadUnknownChannelState() throws IOException {
        InputChannelInfo unknownChannel = new InputChannelInfo(CHANNEL.getGateIdx() + 1, CHANNEL.getInputChannelIdx() + 1);
        Assert.assertEquals((Object)ChannelStateReader.ReadResult.NO_MORE_DATA, (Object)this.reader.readInputData(unknownChannel, (Buffer)this.getBuffer(DATA.length)));
    }

    private TaskStateSnapshot taskStateSnapshot(Collection<InputChannelStateHandle> inputChannelStateHandles) {
        return new TaskStateSnapshot(Collections.singletonMap(new OperatorID(), new OperatorSubtaskState(StateObjectCollection.empty(), StateObjectCollection.empty(), StateObjectCollection.empty(), StateObjectCollection.empty(), new StateObjectCollection(inputChannelStateHandles), StateObjectCollection.empty())));
    }

    static byte[] generateData(int len) {
        byte[] bytes = new byte[len];
        new Random().nextBytes(bytes);
        return bytes;
    }

    private byte[] toBytes(NetworkBuffer buffer) {
        byte[] buf = new byte[buffer.readableBytes()];
        buffer.readBytes(buf);
        return buf;
    }

    private ChannelStateReaderImpl getReader(InputChannelInfo channel, final byte[] data) {
        return new ChannelStateReaderImpl(this.taskStateSnapshot(Collections.singletonList(new InputChannelStateHandle(channel, (StreamStateHandle)new ByteStreamStateHandle("", data), Collections.singletonList(0L)))), (ChannelStateSerializer)new ChannelStateSerializerImpl(){

            public void readHeader(InputStream stream) {
            }

            public int readLength(InputStream stream) {
                return data.length;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void readAndVerify(int bufferSize, InputChannelInfo channelInfo, byte[] data, ChannelStateReader reader) throws IOException {
        int dataSize = data.length;
        int iterations = dataSize / bufferSize + (-(dataSize % bufferSize) >>> 31);
        NetworkBuffer buffer = this.getBuffer(bufferSize);
        try {
            for (int i = 0; i < iterations; ++i) {
                String hint = String.format("dataSize=%d, bufferSize=%d, iteration=%d/%d", dataSize, bufferSize, i + 1, iterations);
                boolean isLast = i == iterations - 1;
                Assert.assertEquals((String)hint, (Object)(isLast ? ChannelStateReader.ReadResult.NO_MORE_DATA : ChannelStateReader.ReadResult.HAS_MORE_DATA), (Object)reader.readInputData(channelInfo, (Buffer)buffer));
                Assert.assertEquals((String)hint, (long)(isLast ? (long)(dataSize - bufferSize * i) : (long)bufferSize), (long)buffer.readableBytes());
                Assert.assertArrayEquals((String)hint, (byte[])Arrays.copyOfRange(data, i * bufferSize, Math.min(dataSize, (i + 1) * bufferSize)), (byte[])this.toBytes(buffer));
                buffer.resetReaderIndex();
                buffer.resetWriterIndex();
            }
        }
        finally {
            buffer.release();
        }
    }

    private NetworkBuffer getBuffer(int len) {
        return new NetworkBuffer((MemorySegment)HeapMemorySegment.FACTORY.allocateUnpooledSegment(len, null), FreeingBufferRecycler.INSTANCE);
    }

    private Map<InputChannelStateHandle, byte[]> generateHandlesWithBytes(int numHandles, int handleDataSize) throws IOException {
        HashMap<Integer, byte[]> offsetsAndBytes = new HashMap<Integer, byte[]>();
        ByteArrayOutputStream baos = new ByteArrayOutputStream(100);
        DataOutputStream out = new DataOutputStream(baos);
        ChannelStateSerializerImpl serializer = new ChannelStateSerializerImpl();
        serializer.writeHeader(out);
        for (int i = 0; i < numHandles; ++i) {
            offsetsAndBytes.put(baos.size(), this.writeSomeBytes(handleDataSize, out, (ChannelStateSerializer)serializer));
        }
        ByteStreamStateHandle sharedUnderlyingHandle = new ByteStreamStateHandle("", baos.toByteArray());
        return offsetsAndBytes.entrySet().stream().collect(Collectors.toMap(e -> new InputChannelStateHandle(new InputChannelInfo(((Integer)e.getKey()).intValue(), ((Integer)e.getKey()).intValue()), (StreamStateHandle)sharedUnderlyingHandle, Collections.singletonList(Long.valueOf(((Integer)e.getKey()).intValue()))), Map.Entry::getValue));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private byte[] writeSomeBytes(int bytesCount, DataOutputStream out, ChannelStateSerializer serializer) throws IOException {
        byte[] bytes = ChannelStateReaderImplTest.generateData(bytesCount);
        NetworkBuffer buf = this.getBuffer(bytesCount);
        try {
            buf.writeBytes(bytes);
            serializer.writeData(out, new Buffer[]{buf});
            byte[] byArray = bytes;
            return byArray;
        }
        finally {
            buf.release();
        }
    }
}

