/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.src.impl;

import java.io.IOException;
import java.util.ArrayList;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.impl.AdapterTestBase;
import org.apache.flink.connector.file.src.impl.StreamFormatAdapter;
import org.apache.flink.connector.file.src.impl.TestIntReader;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.core.fs.FSDataInputStream;
import org.junit.Assert;
import org.junit.Test;

public class StreamFormatAdapterTest
extends AdapterTestBase<StreamFormat<Integer>> {
    @Override
    protected StreamFormat<Integer> createCheckpointedFormat() {
        return new CheckpointedIntFormat();
    }

    @Override
    protected StreamFormat<Integer> createNonCheckpointedFormat() {
        return new NonCheckpointedIntFormat();
    }

    @Override
    protected StreamFormat<Integer> createFormatFailingInInstantiation() {
        return new FailingInstantiationFormat();
    }

    @Override
    protected BulkFormat<Integer, FileSourceSplit> wrapWithAdapter(StreamFormat<Integer> format) {
        return new StreamFormatAdapter(format);
    }

    @Test
    public void testReadSmallBatchSize() throws IOException {
        this.simpleReadTest(1);
    }

    @Test
    public void testBatchSizeMatchesOneRecord() throws IOException {
        this.simpleReadTest(4);
    }

    @Test
    public void testBatchSizeIsRecordMultiple() throws IOException {
        this.simpleReadTest(20);
    }

    private void simpleReadTest(int batchSize) throws IOException {
        Configuration config = new Configuration();
        config.set(StreamFormat.FETCH_IO_SIZE, (Object)new MemorySize((long)batchSize));
        StreamFormatAdapter format = new StreamFormatAdapter((StreamFormat)new CheckpointedIntFormat());
        BulkFormat.Reader reader = format.createReader(config, new FileSourceSplit("test-id", testPath, 0L, 400L));
        ArrayList<Integer> result = new ArrayList<Integer>();
        StreamFormatAdapterTest.readNumbers((BulkFormat.Reader<Integer>)reader, result, 100);
        StreamFormatAdapterTest.verifyIntListResult(result);
    }

    private static final class FailingInstantiationFormat
    implements StreamFormat<Integer> {
        private FailingInstantiationFormat() {
        }

        public StreamFormat.Reader<Integer> createReader(Configuration config, FSDataInputStream stream, long fileLen, long splitEnd) throws IOException {
            throw new IOException("test exception");
        }

        public StreamFormat.Reader<Integer> restoreReader(Configuration config, FSDataInputStream stream, long restoredOffset, long fileLen, long splitEnd) throws IOException {
            throw new IOException("test exception");
        }

        public boolean isSplittable() {
            return false;
        }

        public TypeInformation<Integer> getProducedType() {
            return Types.INT;
        }
    }

    private static final class NonCheckpointedIntFormat
    extends SimpleStreamFormat<Integer> {
        private NonCheckpointedIntFormat() {
        }

        public StreamFormat.Reader<Integer> createReader(Configuration config, FSDataInputStream stream) throws IOException {
            return new TestIntReader(stream, Long.MAX_VALUE, false);
        }

        public TypeInformation<Integer> getProducedType() {
            return Types.INT;
        }
    }

    private static final class CheckpointedIntFormat
    implements StreamFormat<Integer> {
        private CheckpointedIntFormat() {
        }

        public StreamFormat.Reader<Integer> createReader(Configuration config, FSDataInputStream stream, long fileLen, long splitEnd) throws IOException {
            Assert.assertEquals((String)"invalid file length", (long)0L, (long)(fileLen % 4L));
            long currPos = stream.getPos();
            long start = currPos == 0L ? 0L : currPos + 4L - currPos % 4L;
            long end = splitEnd == fileLen ? fileLen : splitEnd + 4L - splitEnd % 4L;
            stream.seek(start);
            return new TestIntReader(stream, end, true);
        }

        public StreamFormat.Reader<Integer> restoreReader(Configuration config, FSDataInputStream stream, long restoredOffset, long fileLen, long splitEnd) throws IOException {
            Assert.assertEquals((String)"invalid file length", (long)0L, (long)(fileLen % 4L));
            long end = splitEnd == fileLen ? fileLen : splitEnd + 4L - splitEnd % 4L;
            stream.seek(restoredOffset);
            return new TestIntReader(stream, end, true);
        }

        public boolean isSplittable() {
            return true;
        }

        public TypeInformation<Integer> getProducedType() {
            return Types.INT;
        }
    }
}

