package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.operators.sort.StageRunner;
import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/operators/sort/SpillingThread.class */
public final class SpillingThread<E> extends ThreadBase<E> {
    private static final Logger LOG = LoggerFactory.getLogger(SpillingThread.class);
    private final MemoryManager memManager;
    private final IOManager ioManager;
    private final TypeSerializer<E> serializer;
    private final TypeComparator<E> comparator;
    private final List<MemorySegment> writeMemory;
    private final List<MemorySegment> mergeReadMemory;
    private final int maxFanIn;
    private final SpillChannelManager spillChannelManager;
    private final LargeRecordHandler<E> largeRecordHandler;
    private final SpillingBehaviour<E> spillingBehaviour;
    private volatile boolean spillingBehaviourOpened;
    private final int minNumWriteBuffers;
    private final int maxNumWriteBuffers;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/operators/sort/SpillingThread$SpillingBehaviour.class */
    public interface SpillingBehaviour<E> {
        default void open() {
        }

        default void close() {
        }

        void spillBuffer(CircularElement<E> circularElement, ChannelWriterOutputView channelWriterOutputView, LargeRecordHandler<E> largeRecordHandler) throws IOException;

        void mergeRecords(MergeIterator<E> mergeIterator, ChannelWriterOutputView channelWriterOutputView) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SpillingThread(@Nullable ExceptionHandler<IOException> exceptionHandler, StageRunner.StageMessageDispatcher<E> stageMessageDispatcher, MemoryManager memoryManager, IOManager iOManager, TypeSerializer<E> typeSerializer, TypeComparator<E> typeComparator, List<MemorySegment> list, List<MemorySegment> list2, int i, SpillChannelManager spillChannelManager, @Nullable LargeRecordHandler<E> largeRecordHandler, SpillingBehaviour<E> spillingBehaviour, int i2, int i3) {
        super(exceptionHandler, "SortMerger spilling thread", stageMessageDispatcher);
        this.spillingBehaviourOpened = false;
        this.memManager = (MemoryManager) Preconditions.checkNotNull(memoryManager);
        this.ioManager = (IOManager) Preconditions.checkNotNull(iOManager);
        this.serializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.comparator = (TypeComparator) Preconditions.checkNotNull(typeComparator);
        this.mergeReadMemory = (List) Preconditions.checkNotNull(list);
        this.writeMemory = (List) Preconditions.checkNotNull(list2);
        this.maxFanIn = i;
        this.spillChannelManager = (SpillChannelManager) Preconditions.checkNotNull(spillChannelManager);
        this.largeRecordHandler = largeRecordHandler;
        this.spillingBehaviour = (SpillingBehaviour) Preconditions.checkNotNull(spillingBehaviour);
        this.minNumWriteBuffers = i2;
        this.maxNumWriteBuffers = i3;
    }

    @Override // org.apache.flink.runtime.operators.sort.ThreadBase
    public void go() throws IOException, InterruptedException {
        ArrayDeque arrayDeque = new ArrayDeque();
        boolean readCache = readCache(arrayDeque);
        if (isRunning()) {
            MutableObjectIterator<E> mutableObjectIterator = null;
            if (readCache && this.largeRecordHandler != null && this.largeRecordHandler.hasData()) {
                ArrayList arrayList = new ArrayList();
                while (true) {
                    CircularElement<E> poll = this.dispatcher.poll(StageRunner.SortStage.READ);
                    if (poll == null) {
                        break;
                    }
                    poll.getBuffer().dispose();
                    arrayList.addAll(poll.getMemory());
                }
                if (arrayList.isEmpty()) {
                    readCache = false;
                    LOG.debug("Going to disk-based merge because of large records.");
                } else {
                    LOG.debug("Sorting large records, to add them to in-memory merge.");
                    mutableObjectIterator = this.largeRecordHandler.finishWriteAndSortKeys(arrayList);
                }
            }
            if (readCache) {
                mergeInMemory(arrayDeque, mutableObjectIterator);
            } else {
                mergeOnDisk(startSpilling(arrayDeque));
            }
        }
    }

    @Override // org.apache.flink.runtime.operators.sort.ThreadBase, java.lang.AutoCloseable
    public void close() throws InterruptedException {
        super.close();
        if (this.spillingBehaviourOpened) {
            this.spillingBehaviour.close();
            this.spillingBehaviourOpened = false;
        }
    }

    private boolean readCache(Queue<CircularElement<E>> queue) throws InterruptedException {
        CircularElement<E> take;
        while (isRunning() && (take = this.dispatcher.take(StageRunner.SortStage.SPILL)) != CircularElement.SPILLING_MARKER) {
            if (take == CircularElement.EOF_MARKER) {
                return true;
            }
            queue.add(take);
        }
        return false;
    }

    private void mergeOnDisk(List<ChannelWithBlockCount> list) throws IOException {
        List<MemorySegment> list2;
        List<MemorySegment> arrayList;
        MutableObjectIterator<E> mutableObjectIterator = null;
        if (this.largeRecordHandler == null || !this.largeRecordHandler.hasData()) {
            list2 = this.mergeReadMemory;
        } else {
            if (list.isEmpty()) {
                arrayList = this.mergeReadMemory;
                list2 = Collections.emptyList();
            } else {
                int min = Math.min(this.maxFanIn, list.size());
                int max = min * Math.max(this.minNumWriteBuffers, Math.min(this.maxNumWriteBuffers, (this.mergeReadMemory.size() / 2) / min));
                list2 = new ArrayList(max);
                for (int i = 0; i < max; i++) {
                    list2.add(this.mergeReadMemory.get(i));
                }
                arrayList = new ArrayList();
                for (int i2 = max; i2 < this.mergeReadMemory.size(); i2++) {
                    arrayList.add(this.mergeReadMemory.get(i2));
                }
            }
            LOG.debug("Sorting keys for large records.");
            mutableObjectIterator = this.largeRecordHandler.finishWriteAndSortKeys(arrayList);
        }
        while (isRunning() && list.size() > this.maxFanIn) {
            list = mergeChannelList(list, list2, this.writeMemory);
        }
        this.memManager.release(this.writeMemory);
        this.writeMemory.clear();
        if (!list.isEmpty()) {
            LOG.debug("Beginning final merge.");
            ArrayList arrayList2 = new ArrayList(list.size());
            getSegmentsForReaders(arrayList2, list2, list.size());
            this.dispatcher.sendResult(getMergingIterator(list, arrayList2, new ArrayList(list.size()), mutableObjectIterator));
        } else if (mutableObjectIterator == null) {
            this.dispatcher.sendResult(EmptyMutableObjectIterator.get());
        } else {
            this.dispatcher.sendResult(mutableObjectIterator);
        }
        LOG.debug("Spilling and merging thread done.");
    }

    private void mergeInMemory(Queue<CircularElement<E>> queue, MutableObjectIterator<E> mutableObjectIterator) throws IOException {
        LOG.debug("Initiating in memory merge.");
        ArrayList arrayList = new ArrayList(queue.size() + 1);
        Iterator<CircularElement<E>> it = queue.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getBuffer().getIterator());
        }
        if (mutableObjectIterator != null) {
            arrayList.add(mutableObjectIterator);
        }
        LOG.debug("Releasing unused sort-buffer memory.");
        disposeSortBuffers(true);
        if (arrayList.isEmpty()) {
            this.dispatcher.sendResult(EmptyMutableObjectIterator.get());
        } else if (arrayList.size() == 1) {
            this.dispatcher.sendResult((MutableObjectIterator) arrayList.get(0));
        } else {
            this.dispatcher.sendResult(new MergeIterator(arrayList, this.comparator));
        }
    }

    private List<ChannelWithBlockCount> startSpilling(Queue<CircularElement<E>> queue) throws IOException, InterruptedException {
        FileIOChannel.Enumerator createChannelEnumerator = this.ioManager.createChannelEnumerator();
        ArrayList arrayList = new ArrayList();
        openSpillingBehaviour();
        while (isRunning()) {
            CircularElement<E> take = queue.isEmpty() ? this.dispatcher.take(StageRunner.SortStage.SPILL) : queue.poll();
            if (!isRunning()) {
                return Collections.emptyList();
            }
            if (take == CircularElement.EOF_MARKER) {
                break;
            }
            FileIOChannel.ID next = createChannelEnumerator.next();
            this.spillChannelManager.registerChannelToBeRemovedAtShutdown(next);
            BlockChannelWriter<MemorySegment> createBlockChannelWriter = this.ioManager.createBlockChannelWriter(next);
            this.spillChannelManager.registerOpenChannelToBeRemovedAtShutdown(createBlockChannelWriter);
            ChannelWriterOutputView channelWriterOutputView = new ChannelWriterOutputView(createBlockChannelWriter, this.writeMemory, this.memManager.getPageSize());
            LOG.debug("Spilling buffer " + take.getId() + ScopeFormat.SCOPE_SEPARATOR);
            this.spillingBehaviour.spillBuffer(take, channelWriterOutputView, this.largeRecordHandler);
            LOG.debug("Spilled buffer " + take.getId() + ScopeFormat.SCOPE_SEPARATOR);
            channelWriterOutputView.close();
            this.spillChannelManager.unregisterOpenChannelToBeRemovedAtShutdown(createBlockChannelWriter);
            if (channelWriterOutputView.getBytesWritten() > 0) {
                arrayList.add(new ChannelWithBlockCount(next, channelWriterOutputView.getBlockCount()));
            }
            take.getBuffer().reset();
            this.dispatcher.send(StageRunner.SortStage.READ, take);
        }
        LOG.debug("Spilling done.");
        LOG.debug("Releasing sort-buffer memory.");
        disposeSortBuffers(false);
        return arrayList;
    }

    private void openSpillingBehaviour() {
        if (this.spillingBehaviourOpened) {
            return;
        }
        this.spillingBehaviour.open();
        this.spillingBehaviourOpened = true;
    }

    private void disposeSortBuffers(boolean z) {
        while (true) {
            CircularElement<E> poll = this.dispatcher.poll(StageRunner.SortStage.READ);
            if (poll == null) {
                return;
            }
            poll.getBuffer().dispose();
            if (z) {
                this.memManager.release(poll.getMemory());
            }
        }
    }

    private MergeIterator<E> getMergingIterator(List<ChannelWithBlockCount> list, List<List<MemorySegment>> list2, List<FileIOChannel> list3, MutableObjectIterator<E> mutableObjectIterator) throws IOException {
        LOG.debug("Performing merge of {} sorted streams.", Integer.valueOf(list.size()));
        ArrayList arrayList = new ArrayList(list.size() + 1);
        for (int i = 0; i < list.size(); i++) {
            ChannelWithBlockCount channelWithBlockCount = list.get(i);
            List<MemorySegment> list4 = list2.get(i);
            BlockChannelReader<MemorySegment> createBlockChannelReader = this.ioManager.createBlockChannelReader(channelWithBlockCount.getChannel());
            list3.add(createBlockChannelReader);
            this.spillChannelManager.registerOpenChannelToBeRemovedAtShutdown(createBlockChannelReader);
            this.spillChannelManager.unregisterChannelToBeRemovedAtShutdown(channelWithBlockCount.getChannel());
            arrayList.add(new ChannelReaderInputViewIterator(new ChannelReaderInputView(createBlockChannelReader, list4, channelWithBlockCount.getBlockCount(), false), null, this.serializer));
        }
        if (mutableObjectIterator != null) {
            arrayList.add(mutableObjectIterator);
        }
        return new MergeIterator<>(arrayList, this.comparator);
    }

    private List<ChannelWithBlockCount> mergeChannelList(List<ChannelWithBlockCount> list, List<MemorySegment> list2, List<MemorySegment> list3) throws IOException {
        double ceil = Math.ceil(Math.log(list.size()) / Math.log(this.maxFanIn)) - 1.0d;
        int size = list.size();
        int pow = (int) Math.pow(this.maxFanIn, ceil);
        int ceil2 = (int) Math.ceil((size - pow) / (this.maxFanIn - 1));
        int i = pow - ceil2;
        int i2 = size - i;
        ArrayList arrayList = new ArrayList(pow);
        arrayList.addAll(list.subList(0, i));
        int ceil3 = (int) Math.ceil(i2 / ceil2);
        ArrayList arrayList2 = new ArrayList(ceil3);
        getSegmentsForReaders(arrayList2, list2, ceil3);
        ArrayList arrayList3 = new ArrayList(ceil3);
        int i3 = i;
        while (isRunning() && i3 < list.size()) {
            arrayList3.clear();
            int i4 = 0;
            while (i4 < ceil3 && i3 < list.size()) {
                arrayList3.add(list.get(i3));
                i4++;
                i3++;
            }
            arrayList.add(mergeChannels(arrayList3, arrayList2, list3));
        }
        return arrayList;
    }

    private ChannelWithBlockCount mergeChannels(List<ChannelWithBlockCount> list, List<List<MemorySegment>> list2, List<MemorySegment> list3) throws IOException {
        ArrayList arrayList = new ArrayList(list.size());
        MergeIterator<E> mergingIterator = getMergingIterator(list, list2, arrayList, null);
        FileIOChannel.ID createChannel = this.ioManager.createChannel();
        this.spillChannelManager.registerChannelToBeRemovedAtShutdown(createChannel);
        BlockChannelWriter<MemorySegment> createBlockChannelWriter = this.ioManager.createBlockChannelWriter(createChannel);
        this.spillChannelManager.registerOpenChannelToBeRemovedAtShutdown(createBlockChannelWriter);
        ChannelWriterOutputView channelWriterOutputView = new ChannelWriterOutputView(createBlockChannelWriter, list3, this.memManager.getPageSize());
        openSpillingBehaviour();
        this.spillingBehaviour.mergeRecords(mergingIterator, channelWriterOutputView);
        channelWriterOutputView.close();
        int blockCount = channelWriterOutputView.getBlockCount();
        this.spillChannelManager.unregisterOpenChannelToBeRemovedAtShutdown(createBlockChannelWriter);
        for (FileIOChannel fileIOChannel : arrayList) {
            fileIOChannel.closeAndDelete();
            this.spillChannelManager.unregisterOpenChannelToBeRemovedAtShutdown(fileIOChannel);
        }
        return new ChannelWithBlockCount(createChannel, blockCount);
    }

    private void getSegmentsForReaders(List<List<MemorySegment>> list, List<MemorySegment> list2, int i) {
        int size = list2.size();
        int i2 = size / i;
        int i3 = size % i;
        Iterator<MemorySegment> it = list2.iterator();
        for (int i4 = 0; i4 < i3; i4++) {
            ArrayList arrayList = new ArrayList(i2 + 1);
            list.add(arrayList);
            for (int i5 = i2; i5 >= 0; i5--) {
                arrayList.add(it.next());
            }
        }
        for (int i6 = i3; i6 < i; i6++) {
            ArrayList arrayList2 = new ArrayList(i2);
            list.add(arrayList2);
            for (int i7 = i2; i7 > 0; i7--) {
                arrayList2.add(it.next());
            }
        }
    }
}
