/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.serialization;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Random;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType;
import org.apache.flink.runtime.io.network.util.DeserializationUtils;
import org.apache.flink.testutils.serialization.types.IntType;
import org.apache.flink.testutils.serialization.types.SerializationTestType;
import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.testutils.serialization.types.Util;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class SpanningRecordSerializationTest
extends TestLogger {
    private static final Random RANDOM = new Random(42L);
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    @Test
    public void testIntRecordsSpanningMultipleSegments() throws Exception {
        boolean segmentSize = true;
        int numValues = 10;
        this.testSerializationRoundTrip((Iterable<SerializationTestType>)Util.randomRecords((int)10, (SerializationTestTypeFactory)SerializationTestTypeFactory.INT), 1);
    }

    @Test
    public void testIntRecordsWithAlignedBuffers() throws Exception {
        int segmentSize = 64;
        int numValues = 64;
        this.testSerializationRoundTrip((Iterable<SerializationTestType>)Util.randomRecords((int)64, (SerializationTestTypeFactory)SerializationTestTypeFactory.INT), 64);
    }

    @Test
    public void testIntRecordsWithUnalignedBuffers() throws Exception {
        int segmentSize = 31;
        int numValues = 248;
        this.testSerializationRoundTrip((Iterable<SerializationTestType>)Util.randomRecords((int)248, (SerializationTestTypeFactory)SerializationTestTypeFactory.INT), 31);
    }

    @Test
    public void testRandomRecords() throws Exception {
        int segmentSize = 127;
        int numValues = 10000;
        this.testSerializationRoundTrip((Iterable<SerializationTestType>)Util.randomRecords((int)10000), 127);
    }

    @Test
    public void testHandleMixedLargeRecords() throws Exception {
        int numValues = 99;
        int segmentSize = 32768;
        ArrayList<SerializationTestType> originalRecords = new ArrayList<SerializationTestType>(50);
        LargeObjectType genLarge = new LargeObjectType();
        Random rnd = new Random();
        for (int i = 0; i < 99; ++i) {
            if (i % 2 == 0) {
                originalRecords.add((SerializationTestType)new IntType(42));
                continue;
            }
            originalRecords.add(genLarge.getRandom(rnd));
        }
        this.testSerializationRoundTrip(originalRecords, 32768);
    }

    private void testSerializationRoundTrip(Iterable<SerializationTestType> records, int segmentSize) throws Exception {
        SpillingAdaptiveSpanningRecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer(new String[]{this.tempFolder.getRoot().getAbsolutePath()});
        SpanningRecordSerializationTest.testSerializationRoundTrip(records, segmentSize, (RecordDeserializer<SerializationTestType>)deserializer);
    }

    private static void testSerializationRoundTrip(Iterable<SerializationTestType> records, int segmentSize, RecordDeserializer<SerializationTestType> deserializer) throws Exception {
        DataOutputSerializer serializer = new DataOutputSerializer(128);
        ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<SerializationTestType>();
        BufferAndSerializerResult serializationResult = SpanningRecordSerializationTest.setNextBufferForSerializer(serializer.wrapAsByteBuffer(), segmentSize);
        int numRecords = 0;
        for (SerializationTestType record : records) {
            serializedRecords.add(record);
            ++numRecords;
            serializer.clear();
            ByteBuffer serializedRecord = RecordWriter.serializeRecord((DataOutputSerializer)serializer, (IOReadableWritable)record);
            serializationResult.getBufferBuilder().appendAndCommit(serializedRecord);
            if (serializationResult.getBufferBuilder().isFull()) {
                deserializer.setNextBuffer(serializationResult.buildBuffer());
                numRecords -= DeserializationUtils.deserializeRecords(serializedRecords, deserializer);
                while ((serializationResult = SpanningRecordSerializationTest.setNextBufferForSerializer(serializedRecord, segmentSize)).isFullBuffer()) {
                    deserializer.setNextBuffer(serializationResult.buildBuffer());
                }
            }
            Assert.assertFalse((boolean)serializedRecord.hasRemaining());
        }
        deserializer.setNextBuffer(serializationResult.buildBuffer());
        while (!serializedRecords.isEmpty()) {
            SerializationTestType expected = (SerializationTestType)serializedRecords.poll();
            SerializationTestType actual = (SerializationTestType)expected.getClass().newInstance();
            RecordDeserializer.DeserializationResult result = deserializer.getNextRecord((IOReadableWritable)actual);
            Assert.assertTrue((boolean)result.isFullRecord());
            Assert.assertEquals((Object)expected, (Object)actual);
            --numRecords;
        }
        Assert.assertEquals((long)0L, (long)numRecords);
    }

    @Test
    public void testSmallRecordUnconsumedBuffer() throws Exception {
        SpillingAdaptiveSpanningRecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer(new String[]{this.tempFolder.getRoot().getAbsolutePath()});
        this.testUnconsumedBuffer((RecordDeserializer<SerializationTestType>)deserializer, Util.randomRecord((SerializationTestTypeFactory)SerializationTestTypeFactory.INT), 1024, new byte[0]);
    }

    @Test
    public void testSpanningRecordUnconsumedBuffer() throws Exception {
        SpillingAdaptiveSpanningRecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer(new String[]{this.tempFolder.getRoot().getAbsolutePath()});
        this.testUnconsumedBuffer((RecordDeserializer<SerializationTestType>)deserializer, Util.randomRecord((SerializationTestTypeFactory)SerializationTestTypeFactory.INT), 1, new byte[0]);
    }

    @Test
    public void testLargeSpanningRecordUnconsumedBuffer() throws Exception {
        SpillingAdaptiveSpanningRecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer(new String[]{this.tempFolder.getRoot().getAbsolutePath()});
        this.testUnconsumedBuffer((RecordDeserializer<SerializationTestType>)deserializer, Util.randomRecord((SerializationTestTypeFactory)SerializationTestTypeFactory.BYTE_ARRAY), 1, new byte[0]);
    }

    @Test
    public void testLargeSpanningRecordUnconsumedBufferWithLeftOverBytes() throws Exception {
        SpillingAdaptiveSpanningRecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer(new String[]{this.tempFolder.getRoot().getAbsolutePath()});
        this.testUnconsumedBuffer((RecordDeserializer<SerializationTestType>)deserializer, Util.randomRecord((SerializationTestTypeFactory)SerializationTestTypeFactory.BYTE_ARRAY), 1, 42, 43, 44);
        deserializer.clear();
        this.testUnconsumedBuffer((RecordDeserializer<SerializationTestType>)deserializer, Util.randomRecord((SerializationTestTypeFactory)SerializationTestTypeFactory.BYTE_ARRAY), 1, 42, 43, 44);
    }

    public void testUnconsumedBuffer(RecordDeserializer<SerializationTestType> deserializer, SerializationTestType record, int segmentSize, byte ... leftOverBytes) throws Exception {
        try (ByteArrayOutputStream unconsumedBytes = new ByteArrayOutputStream();){
            DataOutputSerializer serializer = new DataOutputSerializer(128);
            ByteBuffer serializedRecord = RecordWriter.serializeRecord((DataOutputSerializer)serializer, (IOReadableWritable)record);
            BufferAndSerializerResult serializationResult = SpanningRecordSerializationTest.setNextBufferForSerializer(serializedRecord, segmentSize);
            serializationResult.getBufferBuilder().appendAndCommit(serializedRecord);
            if (serializationResult.getBufferBuilder().isFull()) {
                Buffer buffer = serializationResult.buildBuffer();
                SpanningRecordSerializationTest.writeBuffer(buffer.readOnlySlice().getNioBufferReadable(), unconsumedBytes);
                deserializer.setNextBuffer(buffer);
                SpanningRecordSerializationTest.assertUnconsumedBuffer(unconsumedBytes, (CloseableIterator<Buffer>)deserializer.getUnconsumedBuffer());
                deserializer.getNextRecord((IOReadableWritable)record.getClass().newInstance());
                while ((serializationResult = SpanningRecordSerializationTest.setNextBufferForSerializer(serializedRecord, segmentSize)).isFullBuffer()) {
                    buffer = serializationResult.buildBuffer();
                    if (serializationResult.isFullRecord()) {
                        buffer = SpanningRecordSerializationTest.appendLeftOverBytes(buffer, leftOverBytes);
                    }
                    SpanningRecordSerializationTest.writeBuffer(buffer.readOnlySlice().getNioBufferReadable(), unconsumedBytes);
                    deserializer.setNextBuffer(buffer);
                    SpanningRecordSerializationTest.assertUnconsumedBuffer(unconsumedBytes, (CloseableIterator<Buffer>)deserializer.getUnconsumedBuffer());
                    deserializer.getNextRecord((IOReadableWritable)record.getClass().newInstance());
                }
            }
        }
    }

    /*
     * Exception decompiling
     */
    private static Buffer appendLeftOverBytes(Buffer buffer, byte[] leftOverBytes) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private static void assertUnconsumedBuffer(ByteArrayOutputStream expected, CloseableIterator<Buffer> actual) throws Exception {
        if (!actual.hasNext()) {
            Assert.assertEquals((long)expected.size(), (long)0L);
        }
        ByteBuffer expectedByteBuffer = ByteBuffer.wrap(expected.toByteArray());
        ByteBuffer actualByteBuffer = ((Buffer)actual.next()).getNioBufferReadable();
        Assert.assertEquals((Object)expectedByteBuffer, (Object)actualByteBuffer);
        actual.close();
    }

    private static void writeBuffer(ByteBuffer buffer, OutputStream stream) throws IOException {
        WritableByteChannel channel = Channels.newChannel(stream);
        channel.write(buffer);
    }

    private static BufferAndSerializerResult setNextBufferForSerializer(ByteBuffer serializedRecord, int segmentSize) throws IOException {
        int startingOffset = segmentSize > 2 ? RANDOM.nextInt(segmentSize / 2) : 0;
        BufferBuilder bufferBuilder = BufferBuilderTestUtils.createFilledBufferBuilder(segmentSize + startingOffset, startingOffset);
        BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
        bufferConsumer.build().recycleBuffer();
        bufferBuilder.close();
        bufferBuilder.appendAndCommit(serializedRecord);
        return new BufferAndSerializerResult(bufferBuilder, bufferConsumer, bufferBuilder.isFull(), !serializedRecord.hasRemaining());
    }

    private static class BufferAndSerializerResult {
        private final BufferBuilder bufferBuilder;
        private final BufferConsumer bufferConsumer;
        private final boolean isFullBuffer;
        private final boolean isFullRecord;

        public BufferAndSerializerResult(BufferBuilder bufferBuilder, BufferConsumer bufferConsumer, boolean isFullBuffer, boolean isFullRecord) {
            this.bufferBuilder = bufferBuilder;
            this.bufferConsumer = bufferConsumer;
            this.isFullBuffer = isFullBuffer;
            this.isFullRecord = isFullRecord;
        }

        public BufferBuilder getBufferBuilder() {
            return this.bufferBuilder;
        }

        public Buffer buildBuffer() {
            return BufferBuilderTestUtils.buildSingleBuffer(this.bufferConsumer);
        }

        public boolean isFullBuffer() {
            return this.isFullBuffer;
        }

        public boolean isFullRecord() {
            return this.isFullRecord;
        }
    }
}

