/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.disk;

import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.RandomAccessInputView;
import org.apache.paimon.data.SimpleCollectingOutputView;
import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.disk.BufferFileReaderInputView;
import org.apache.paimon.disk.BufferFileWriter;
import org.apache.paimon.disk.ChannelWithMeta;
import org.apache.paimon.disk.FileIOChannel;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.memory.Buffer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.utils.MutableObjectIterator;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExternalBuffer {
    private static final Logger LOG = LoggerFactory.getLogger(ExternalBuffer.class);
    private final IOManager ioManager;
    private final MemorySegmentPool pool;
    private final BinaryRowSerializer binaryRowSerializer;
    private final InMemoryBuffer inMemoryBuffer;
    private final int segmentSize;
    private final List<ChannelWithMeta> spilledChannelIDs;
    private int numRows;
    private boolean addCompleted;

    public ExternalBuffer(IOManager ioManager, MemorySegmentPool pool, AbstractRowDataSerializer<?> serializer) {
        this.ioManager = ioManager;
        this.pool = pool;
        this.binaryRowSerializer = serializer instanceof BinaryRowSerializer ? (BinaryRowSerializer)serializer.duplicate() : new BinaryRowSerializer(serializer.getArity());
        this.segmentSize = pool.pageSize();
        this.spilledChannelIDs = new ArrayList<ChannelWithMeta>();
        this.numRows = 0;
        this.addCompleted = false;
        this.inMemoryBuffer = new InMemoryBuffer(serializer);
    }

    public void reset() {
        this.clearChannels();
        this.inMemoryBuffer.reset();
        this.numRows = 0;
        this.addCompleted = false;
    }

    public void add(InternalRow row) throws IOException {
        Preconditions.checkState(!this.addCompleted, "This buffer has add completed.");
        if (!this.inMemoryBuffer.write(row)) {
            if (this.inMemoryBuffer.getCurrentDataBufferOffset() == 0L) {
                this.throwTooBigException(row);
            }
            this.spill();
            if (!this.inMemoryBuffer.write(row)) {
                this.throwTooBigException(row);
            }
        }
        ++this.numRows;
    }

    public void complete() {
        this.addCompleted = true;
    }

    public BufferIterator newIterator() {
        Preconditions.checkState(this.addCompleted, "This buffer has not add completed.");
        return new BufferIterator();
    }

    private void throwTooBigException(InternalRow row) throws IOException {
        int rowSize = this.inMemoryBuffer.serializer.toBinaryRow(row).toBytes().length;
        throw new IOException("Record is too big, it can't be added to a empty InMemoryBuffer! Record size: " + rowSize + ", Buffer: " + this.memorySize());
    }

    private void spill() throws IOException {
        FileIOChannel.ID channel = this.ioManager.createChannel();
        BufferFileWriter writer = this.ioManager.createBufferFileWriter(channel);
        int numRecordBuffers = this.inMemoryBuffer.getNumRecordBuffers();
        ArrayList segments = this.inMemoryBuffer.getRecordBufferSegments();
        try {
            for (int i = 0; i < numRecordBuffers; ++i) {
                MemorySegment segment = (MemorySegment)segments.get(i);
                int bufferSize = i == numRecordBuffers - 1 ? this.inMemoryBuffer.getNumBytesInLastBuffer() : segment.size();
                writer.writeBlock(Buffer.create(segment, bufferSize));
            }
            LOG.info("here spill the reset buffer data with {} records {} bytes", (Object)this.inMemoryBuffer.numRecords, (Object)writer.getSize());
            writer.close();
        }
        catch (IOException e) {
            writer.closeAndDelete();
            throw e;
        }
        this.spilledChannelIDs.add(new ChannelWithMeta(channel, this.inMemoryBuffer.getNumRecordBuffers(), this.inMemoryBuffer.getNumBytesInLastBuffer()));
        this.inMemoryBuffer.reset();
    }

    public int size() {
        return this.numRows;
    }

    private int memorySize() {
        return this.pool.freePages() * this.segmentSize;
    }

    private void clearChannels() {
        for (ChannelWithMeta meta : this.spilledChannelIDs) {
            File f = new File(meta.getChannel().getPath());
            if (!f.exists()) continue;
            f.delete();
        }
        this.spilledChannelIDs.clear();
    }

    @VisibleForTesting
    List<ChannelWithMeta> getSpillChannels() {
        return this.spilledChannelIDs;
    }

    private static class InMemoryBufferIterator
    implements MutableObjectIterator<BinaryRow>,
    Closeable {
        private final RandomAccessInputView recordBuffer;
        private final AbstractRowDataSerializer<InternalRow> serializer;

        private InMemoryBufferIterator(RandomAccessInputView recordBuffer, AbstractRowDataSerializer<InternalRow> serializer) {
            this.recordBuffer = recordBuffer;
            this.serializer = serializer;
        }

        @Override
        public BinaryRow next(BinaryRow reuse) throws IOException {
            try {
                return (BinaryRow)this.serializer.mapFromPages(reuse, this.recordBuffer);
            }
            catch (EOFException e) {
                return null;
            }
        }

        @Override
        public BinaryRow next() throws IOException {
            throw new RuntimeException("Not support!");
        }

        @Override
        public void close() {
        }
    }

    private class InMemoryBuffer {
        private final AbstractRowDataSerializer<InternalRow> serializer;
        private final ArrayList<MemorySegment> recordBufferSegments;
        private final SimpleCollectingOutputView recordCollector;
        private long currentDataBufferOffset;
        private int numBytesInLastBuffer;
        private int numRecords = 0;

        private InMemoryBuffer(AbstractRowDataSerializer<InternalRow> serializer) {
            this.serializer = (AbstractRowDataSerializer)serializer.duplicate();
            this.recordBufferSegments = new ArrayList();
            this.recordCollector = new SimpleCollectingOutputView(this.recordBufferSegments, ExternalBuffer.this.pool, ExternalBuffer.this.segmentSize);
        }

        private void reset() {
            this.currentDataBufferOffset = 0L;
            this.numRecords = 0;
            this.returnToSegmentPool();
            this.recordCollector.reset();
        }

        private void returnToSegmentPool() {
            ExternalBuffer.this.pool.returnAll(this.recordBufferSegments);
            this.recordBufferSegments.clear();
        }

        public boolean write(InternalRow row) throws IOException {
            try {
                this.serializer.serializeToPages(row, this.recordCollector);
                this.currentDataBufferOffset = this.recordCollector.getCurrentOffset();
                this.numBytesInLastBuffer = this.recordCollector.getCurrentPositionInSegment();
                ++this.numRecords;
                return true;
            }
            catch (EOFException e) {
                return false;
            }
        }

        private ArrayList<MemorySegment> getRecordBufferSegments() {
            return this.recordBufferSegments;
        }

        private long getCurrentDataBufferOffset() {
            return this.currentDataBufferOffset;
        }

        private int getNumRecordBuffers() {
            int result = (int)(this.currentDataBufferOffset / (long)ExternalBuffer.this.segmentSize);
            long mod = this.currentDataBufferOffset % (long)ExternalBuffer.this.segmentSize;
            if (mod != 0L) {
                ++result;
            }
            return result;
        }

        private int getNumBytesInLastBuffer() {
            return this.numBytesInLastBuffer;
        }

        private InMemoryBufferIterator newIterator() {
            RandomAccessInputView recordBuffer = new RandomAccessInputView(this.recordBufferSegments, ExternalBuffer.this.segmentSize, this.numBytesInLastBuffer);
            return new InMemoryBufferIterator(recordBuffer, this.serializer);
        }
    }

    public class BufferIterator
    implements Closeable {
        private MutableObjectIterator<BinaryRow> currentIterator;
        private final BinaryRow reuse;
        private int currentChannelID;
        private BinaryRow row;
        private boolean closed;
        private BufferFileReaderInputView channelReader;

        private BufferIterator() {
            this.reuse = ExternalBuffer.this.binaryRowSerializer.createInstance();
            this.currentChannelID = -1;
            this.closed = false;
        }

        private void checkValidity() {
            if (this.closed) {
                throw new RuntimeException("This iterator is closed!");
            }
        }

        @Override
        public void close() {
            if (this.closed) {
                return;
            }
            try {
                this.closeCurrentFileReader();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            this.closed = true;
        }

        public boolean advanceNext() {
            this.checkValidity();
            try {
                do {
                    if (this.currentIterator == null || (this.row = this.currentIterator.next(this.reuse)) == null) continue;
                    return true;
                } while (this.nextIterator());
                return false;
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        private boolean nextIterator() throws IOException {
            if (this.currentChannelID == Integer.MAX_VALUE) {
                return false;
            }
            if (this.currentChannelID < ExternalBuffer.this.spilledChannelIDs.size() - 1) {
                this.nextSpilledIterator();
            } else {
                this.newMemoryIterator();
            }
            return true;
        }

        public BinaryRow getRow() {
            return this.row;
        }

        private void closeCurrentFileReader() throws IOException {
            if (this.channelReader != null) {
                this.channelReader.close();
                this.channelReader = null;
            }
        }

        private void nextSpilledIterator() throws IOException {
            ChannelWithMeta channel = (ChannelWithMeta)ExternalBuffer.this.spilledChannelIDs.get(this.currentChannelID + 1);
            ++this.currentChannelID;
            this.closeCurrentFileReader();
            this.channelReader = new BufferFileReaderInputView(channel.getChannel(), ExternalBuffer.this.ioManager, ExternalBuffer.this.segmentSize);
            this.currentIterator = this.channelReader.createBinaryRowIterator(ExternalBuffer.this.binaryRowSerializer);
        }

        private void newMemoryIterator() {
            this.currentChannelID = Integer.MAX_VALUE;
            this.currentIterator = ExternalBuffer.this.inMemoryBuffer.newIterator();
        }
    }
}

