/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.src.impl;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
import org.apache.flink.connector.file.src.util.CheckpointedPosition;
import org.apache.flink.connector.file.src.util.RecordAndPosition;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public abstract class AdapterTestBase<FormatT> {
    @ClassRule
    public static final TemporaryFolder TMP_DIR = new TemporaryFolder();
    protected static final int NUM_NUMBERS = 100;
    protected static final long FILE_LEN = 400L;
    protected static Path testPath;

    @BeforeClass
    public static void writeTestFile() throws IOException {
        File testFile = new File(TMP_DIR.getRoot(), "testFile");
        testPath = Path.fromLocalFile((File)testFile);
        try (DataOutputStream out = new DataOutputStream(new FileOutputStream(testFile));){
            for (int i = 0; i < 100; ++i) {
                out.writeInt(i);
            }
        }
    }

    protected abstract FormatT createCheckpointedFormat();

    protected abstract FormatT createNonCheckpointedFormat();

    protected abstract FormatT createFormatFailingInInstantiation();

    protected abstract BulkFormat<Integer, FileSourceSplit> wrapWithAdapter(FormatT var1);

    @Test
    public void testRecoverCheckpointedFormatOneSplit() throws IOException {
        this.testReading(this.createCheckpointedFormat(), 1, 5, 44);
    }

    @Test
    public void testRecoverCheckpointedFormatMultipleSplits() throws IOException {
        this.testReading(this.createCheckpointedFormat(), 3, 11, 33, 56);
    }

    @Test
    public void testRecoverNonCheckpointedFormatOneSplit() throws IOException {
        this.testReading(this.createNonCheckpointedFormat(), 1, 5, 44);
    }

    private void testReading(FormatT format, int numSplits, int ... recoverAfterRecords) throws IOException {
        int[] boundaries = Arrays.copyOf(recoverAfterRecords, recoverAfterRecords.length + 1);
        boundaries[boundaries.length - 1] = 100;
        Configuration config = new Configuration();
        config.set(StreamFormat.FETCH_IO_SIZE, (Object)new MemorySize(10L));
        BulkFormat<Integer, FileSourceSplit> adapter = this.wrapWithAdapter(format);
        Queue<FileSourceSplit> splits = AdapterTestBase.buildSplits(numSplits);
        ArrayList<Integer> result = new ArrayList<Integer>();
        FileSourceSplit currentSplit = null;
        BulkFormat.Reader currentReader = null;
        for (int nextRecordToRecover : boundaries) {
            FileSourceSplit toRecoverFrom;
            currentSplit = toRecoverFrom = AdapterTestBase.readNumbers(currentReader, currentSplit, adapter, splits, config, result, nextRecordToRecover - result.size());
            currentReader = toRecoverFrom == null ? null : adapter.restoreReader(config, toRecoverFrom);
        }
        AdapterTestBase.verifyIntListResult(result);
    }

    @Test
    public void testClosesStreamIfReaderCreationFails() throws Exception {
        Path testPath = new Path("testFs:///testpath-1");
        CloseTestingInputStream in = new CloseTestingInputStream();
        TestingFileSystem testFs = TestingFileSystem.createForFileStatus("testFs", TestingFileSystem.TestFileStatus.forFileWithStream(testPath, 1024L, in));
        testFs.register();
        BulkFormat<Integer, FileSourceSplit> adapter = this.wrapWithAdapter(this.createFormatFailingInInstantiation());
        try {
            adapter.createReader(new Configuration(), new FileSourceSplit("id", testPath, 0L, 1024L));
        }
        catch (IOException iOException) {
            // empty catch block
        }
        Assert.assertTrue((boolean)in.closed);
        testFs.unregister();
    }

    @Test
    public void testClosesStreamIfReaderRestoreFails() throws Exception {
        Path testPath = new Path("testFs:///testpath-1");
        CloseTestingInputStream in = new CloseTestingInputStream();
        TestingFileSystem testFs = TestingFileSystem.createForFileStatus("testFs", TestingFileSystem.TestFileStatus.forFileWithStream(testPath, 1024L, in));
        testFs.register();
        BulkFormat<Integer, FileSourceSplit> adapter = this.wrapWithAdapter(this.createFormatFailingInInstantiation());
        FileSourceSplit split = new FileSourceSplit("id", testPath, 0L, 1024L, new String[0], new CheckpointedPosition(0L, 5L));
        try {
            adapter.restoreReader(new Configuration(), split);
        }
        catch (IOException iOException) {
            // empty catch block
        }
        Assert.assertTrue((boolean)in.closed);
        testFs.unregister();
    }

    protected static void verifyIntListResult(List<Integer> result) {
        Assert.assertEquals((String)"wrong result size", (long)100L, (long)result.size());
        int nextExpected = 0;
        for (int next : result) {
            if (next == nextExpected++) continue;
            Assert.fail((String)("Wrong result: " + result));
        }
    }

    protected static void readNumbers(BulkFormat.Reader<Integer> reader, List<Integer> result, int num) throws IOException {
        AdapterTestBase.readNumbers(reader, null, null, null, null, result, num);
    }

    protected static FileSourceSplit readNumbers(BulkFormat.Reader<Integer> currentReader, FileSourceSplit currentSplit, BulkFormat<Integer, FileSourceSplit> format, Queue<FileSourceSplit> moreSplits, Configuration config, List<Integer> result, int num) throws IOException {
        long offset = Long.MIN_VALUE;
        long skip = Long.MIN_VALUE;
        while (num > 0) {
            BulkFormat.RecordIterator nextBatch;
            if (currentReader == null) {
                currentSplit = moreSplits.poll();
                Assert.assertNotNull((Object)currentSplit);
                currentReader = format.createReader(config, currentSplit);
            }
            while (num > 0 && (nextBatch = currentReader.readBatch()) != null) {
                RecordAndPosition next;
                while (num > 0 && (next = nextBatch.next()) != null) {
                    --num;
                    result.add((Integer)next.getRecord());
                    offset = next.getOffset();
                    skip = next.getRecordSkipCount();
                }
            }
            currentReader.close();
            currentReader = null;
        }
        return currentSplit != null ? currentSplit.updateWithCheckpointedPosition(new CheckpointedPosition(offset, skip)) : null;
    }

    static Queue<FileSourceSplit> buildSplits(int numSplits) {
        ArrayDeque<FileSourceSplit> splits = new ArrayDeque<FileSourceSplit>();
        long rangeForSplit = 400L / (long)numSplits;
        for (int i = 0; i < numSplits - 1; ++i) {
            splits.add(new FileSourceSplit("ID-" + i, testPath, (long)i * rangeForSplit, rangeForSplit));
        }
        long startOfLast = (long)(numSplits - 1) * rangeForSplit;
        splits.add(new FileSourceSplit("ID-" + (numSplits - 1), testPath, startOfLast, 400L - startOfLast));
        return splits;
    }

    private static class CloseTestingInputStream
    extends FSDataInputStream {
        boolean closed;

        private CloseTestingInputStream() {
        }

        public void seek(long desired) throws IOException {
        }

        public long getPos() throws IOException {
            return 0L;
        }

        public int read() throws IOException {
            throw new UnsupportedOperationException();
        }

        public void close() throws IOException {
            this.closed = true;
        }
    }
}

