/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sorter;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.NormalizedKeyComputer;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.sort.BinaryInMemorySortBuffer;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.MutableObjectIterator;

public class SortOperator
extends TableStreamOperator<InternalRow>
implements OneInputStreamOperator<InternalRow, InternalRow>,
BoundedOneInput {
    private final RowType keyRowType;
    private final RowType valueRowType;
    private final long maxMemory;
    private final int pageSize;
    private final int arity;
    private transient BinaryExternalSortBuffer buffer;

    public SortOperator(RowType keyType, RowType valueRowType, long maxMemory, int pageSize) {
        this.keyRowType = keyType;
        this.valueRowType = valueRowType;
        this.maxMemory = maxMemory;
        this.pageSize = pageSize;
        this.arity = keyType.getFieldCount() + valueRowType.getFieldCount();
    }

    public void open() throws Exception {
        super.open();
        List<DataField> keyFields = this.keyRowType.getFields();
        List<DataField> dataFields = this.valueRowType.getFields();
        ArrayList<DataField> fields = new ArrayList<DataField>();
        fields.addAll(keyFields);
        fields.addAll(dataFields);
        RowType rowType = new RowType(fields);
        InternalRowSerializer serializer = InternalSerializers.create(rowType);
        NormalizedKeyComputer normalizedKeyComputer = CodeGenUtils.newNormalizedKeyComputer(rowType.getFieldTypes(), "MemTableKeyComputer");
        RecordComparator keyComparator = CodeGenUtils.newRecordComparator(rowType.getFieldTypes(), "MemTableComparator");
        HeapMemorySegmentPool memoryPool = new HeapMemorySegmentPool(this.maxMemory, this.pageSize);
        BinaryInMemorySortBuffer inMemorySortBuffer = BinaryInMemorySortBuffer.createBuffer(normalizedKeyComputer, serializer, keyComparator, memoryPool);
        Configuration jobConfig = this.getContainingTask().getJobConfiguration();
        this.buffer = new BinaryExternalSortBuffer(new BinaryRowSerializer(serializer.getArity()), keyComparator, memoryPool.pageSize(), inMemorySortBuffer, new IOManagerImpl(IOManagerImpl.splitPaths((String)jobConfig.get(CoreOptions.TMP_DIRS))), jobConfig.getInteger(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES));
    }

    public void endInput() throws Exception {
        if (this.buffer.size() > 0) {
            MutableObjectIterator<BinaryRow> iterator = this.buffer.sortedIterator();
            BinaryRow binaryRow = new BinaryRow(this.arity);
            while ((binaryRow = iterator.next(binaryRow)) != null) {
                this.output.collect((Object)new StreamRecord((Object)binaryRow));
            }
        }
    }

    public void processElement(StreamRecord<InternalRow> element) throws Exception {
        this.buffer.write((InternalRow)element.getValue());
    }
}

