/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.index;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.ExternalBuffer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.sink.RowDataPartitionKeyExtractor;
import org.apache.paimon.flink.sink.index.GlobalIndexAssigner;
import org.apache.paimon.flink.sink.index.IndexBootstrap;
import org.apache.paimon.flink.sink.index.KeyPartOrRow;
import org.apache.paimon.flink.sink.index.KeyPartPartitionKeyExtractor;
import org.apache.paimon.flink.utils.ProjectToRowDataFunction;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SerializableFunction;

public class GlobalIndexAssignerOperator<T>
extends AbstractStreamOperator<Tuple2<T, Integer>>
implements OneInputStreamOperator<Tuple2<KeyPartOrRow, T>, Tuple2<T, Integer>>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    private final Table table;
    private final GlobalIndexAssigner<T> assigner;
    private final SerializableFunction<T, InternalRow> toRow;
    private final SerializableFunction<InternalRow, T> fromRow;
    private transient ExternalBuffer bootstrapBuffer;

    public GlobalIndexAssignerOperator(Table table, GlobalIndexAssigner<T> assigner, SerializableFunction<T, InternalRow> toRow, SerializableFunction<InternalRow, T> fromRow) {
        this.table = table;
        this.assigner = assigner;
        this.toRow = toRow;
        this.fromRow = fromRow;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        org.apache.flink.runtime.io.disk.iomanager.IOManager ioManager = this.getContainingTask().getEnvironment().getIOManager();
        File[] tmpDirs = ioManager.getSpillingDirectories();
        File tmpDir = tmpDirs[ThreadLocalRandom.current().nextInt(tmpDirs.length)];
        this.assigner.open(tmpDir, this.getRuntimeContext().getNumberOfParallelSubtasks(), this.getRuntimeContext().getIndexOfThisSubtask(), this::collect);
        Options options = Options.fromMap(this.table.options());
        long bufferSize = options.get(CoreOptions.WRITE_BUFFER_SIZE).getBytes();
        long pageSize = options.get(CoreOptions.PAGE_SIZE).getBytes();
        this.bootstrapBuffer = new ExternalBuffer(IOManager.create(ioManager.getSpillingDirectoriesPaths()), new HeapMemorySegmentPool(bufferSize, (int)pageSize), new InternalRowSerializer(this.table.rowType()));
    }

    public void processElement(StreamRecord<Tuple2<KeyPartOrRow, T>> streamRecord) throws Exception {
        Tuple2 tuple2 = (Tuple2)streamRecord.getValue();
        Object value = tuple2.f1;
        switch ((KeyPartOrRow)((Object)tuple2.f0)) {
            case KEY_PART: {
                this.assigner.bootstrap(value);
                break;
            }
            case ROW: {
                if (this.bootstrapBuffer != null) {
                    this.bootstrapBuffer.add((InternalRow)this.toRow.apply(value));
                    break;
                }
                this.assigner.process(value);
            }
        }
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        this.endBootstrap();
    }

    public void endInput() throws Exception {
        this.endBootstrap();
    }

    private void endBootstrap() throws Exception {
        if (this.bootstrapBuffer != null) {
            this.bootstrapBuffer.complete();
            try (ExternalBuffer.BufferIterator iterator = this.bootstrapBuffer.newIterator();){
                while (iterator.advanceNext()) {
                    this.assigner.process(this.fromRow.apply(iterator.getRow()));
                }
            }
            this.bootstrapBuffer.reset();
            this.bootstrapBuffer = null;
        }
    }

    private void collect(T value, int bucket) {
        this.output.collect((Object)new StreamRecord((Object)new Tuple2(value, (Object)bucket)));
    }

    public void close() throws IOException {
        this.assigner.close();
    }

    public static GlobalIndexAssignerOperator<RowData> forRowData(Table table) {
        return new GlobalIndexAssignerOperator<RowData>(table, GlobalIndexAssignerOperator.createRowDataAssigner(table), FlinkRowWrapper::new, FlinkRowData::new);
    }

    public static GlobalIndexAssigner<RowData> createRowDataAssigner(Table t) {
        RowType bootstrapType = IndexBootstrap.bootstrapType(((AbstractFileStoreTable)t).schema());
        int bucketIndex = bootstrapType.getFieldCount() - 1;
        return new GlobalIndexAssigner<RowData>(t, RowDataPartitionKeyExtractor::new, KeyPartPartitionKeyExtractor::new, row -> row.getInt(bucketIndex), new ProjectToRowDataFunction(t.rowType(), t.partitionKeys()), (rowData, rowKind) -> {
            rowData.setRowKind(FlinkRowData.toFlinkRowKind(rowKind));
            return rowData;
        });
    }
}

