/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.sort;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.data.AbstractPagedOutputView;
import org.apache.paimon.disk.ChannelReaderInputView;
import org.apache.paimon.disk.ChannelWithMeta;
import org.apache.paimon.disk.ChannelWriterOutputView;
import org.apache.paimon.disk.FileChannelUtil;
import org.apache.paimon.disk.FileIOChannel;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.sort.BinaryMergeIterator;
import org.apache.paimon.sort.SpillChannelManager;
import org.apache.paimon.utils.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractBinaryExternalMerger<Entry>
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractBinaryExternalMerger.class);
    private volatile boolean closed;
    private final int maxFanIn;
    private final SpillChannelManager channelManager;
    private final BlockCompressionFactory compressionCodecFactory;
    private final int compressionBlockSize;
    protected final int pageSize;
    protected final IOManager ioManager;

    public AbstractBinaryExternalMerger(IOManager ioManager, int pageSize, int maxFanIn, SpillChannelManager channelManager, BlockCompressionFactory compressionCodecFactory, int compressionBlockSize) {
        this.ioManager = ioManager;
        this.pageSize = pageSize;
        this.maxFanIn = maxFanIn;
        this.channelManager = channelManager;
        this.compressionCodecFactory = compressionCodecFactory;
        this.compressionBlockSize = compressionBlockSize;
    }

    @Override
    public void close() {
        this.closed = true;
    }

    public BinaryMergeIterator<Entry> getMergingIterator(List<ChannelWithMeta> channelIDs, List<FileIOChannel> openChannels) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Performing merge of " + channelIDs.size() + " sorted streams.");
        }
        ArrayList iterators = new ArrayList(channelIDs.size() + 1);
        for (ChannelWithMeta channel : channelIDs) {
            ChannelReaderInputView view = FileChannelUtil.createInputView(this.ioManager, channel, openChannels, this.compressionCodecFactory, this.compressionBlockSize);
            iterators.add(this.channelReaderInputViewIterator(view));
        }
        return new BinaryMergeIterator(iterators, this.mergeReusedEntries(channelIDs.size()), this.mergeComparator());
    }

    public List<ChannelWithMeta> mergeChannelList(List<ChannelWithMeta> channelIDs) throws IOException {
        double scale = Math.ceil(Math.log(channelIDs.size()) / Math.log(this.maxFanIn)) - 1.0;
        int numStart = channelIDs.size();
        int numEnd = (int)Math.pow(this.maxFanIn, scale);
        int numMerges = (int)Math.ceil((double)(numStart - numEnd) / (double)(this.maxFanIn - 1));
        int numNotMerged = numEnd - numMerges;
        int numToMerge = numStart - numNotMerged;
        ArrayList<ChannelWithMeta> mergedChannelIDs = new ArrayList<ChannelWithMeta>(numEnd);
        mergedChannelIDs.addAll(channelIDs.subList(0, numNotMerged));
        int channelsToMergePerStep = (int)Math.ceil((double)numToMerge / (double)numMerges);
        ArrayList<ChannelWithMeta> channelsToMergeThisStep = new ArrayList<ChannelWithMeta>(channelsToMergePerStep);
        int channelNum = numNotMerged;
        while (!this.closed && channelNum < channelIDs.size()) {
            channelsToMergeThisStep.clear();
            for (int i = 0; i < channelsToMergePerStep && channelNum < channelIDs.size(); ++i, ++channelNum) {
                channelsToMergeThisStep.add(channelIDs.get(channelNum));
            }
            mergedChannelIDs.add(this.mergeChannels(channelsToMergeThisStep));
        }
        return mergedChannelIDs;
    }

    private ChannelWithMeta mergeChannels(List<ChannelWithMeta> channelIDs) throws IOException {
        int numBlocksWritten;
        int numBytesInLastBlock;
        ArrayList<FileIOChannel> openChannels = new ArrayList<FileIOChannel>(channelIDs.size());
        BinaryMergeIterator<Entry> mergeIterator = this.getMergingIterator(channelIDs, openChannels);
        FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
        this.channelManager.addChannel(mergedChannelID);
        ChannelWriterOutputView output = null;
        try {
            output = FileChannelUtil.createOutputView(this.ioManager, mergedChannelID, this.compressionCodecFactory, this.compressionBlockSize);
            this.writeMergingOutput(mergeIterator, output);
            numBytesInLastBlock = output.close();
            numBlocksWritten = output.getBlockCount();
        }
        catch (IOException e) {
            if (output != null) {
                output.close();
                output.getChannel().deleteChannel();
            }
            throw e;
        }
        for (FileIOChannel channel : openChannels) {
            this.channelManager.removeChannel(channel.getChannelID());
            try {
                channel.closeAndDelete();
            }
            catch (Throwable throwable) {}
        }
        return new ChannelWithMeta(mergedChannelID, numBlocksWritten, numBytesInLastBlock);
    }

    protected abstract MutableObjectIterator<Entry> channelReaderInputViewIterator(ChannelReaderInputView var1);

    protected abstract Comparator<Entry> mergeComparator();

    protected abstract List<Entry> mergeReusedEntries(int var1);

    protected abstract void writeMergingOutput(MutableObjectIterator<Entry> var1, AbstractPagedOutputView var2) throws IOException;
}

