/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.index;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.AbstractInnerTableScan;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

public class IndexBootstrap
implements Serializable {
    private static final long serialVersionUID = 1L;
    public static final String BUCKET_FIELD = "_BUCKET";
    private final Table table;

    public IndexBootstrap(Table table) {
        this.table = table;
    }

    public void bootstrap(int numAssigners, int assignId, Consumer<InternalRow> collector) throws IOException {
        RowType rowType = this.table.rowType();
        List<String> fieldNames = rowType.getFieldNames();
        List keyPartFields = Stream.concat(this.table.primaryKeys().stream(), this.table.partitionKeys().stream()).collect(Collectors.toList());
        int[] projection = keyPartFields.stream().map(fieldNames::indexOf).mapToInt(Integer::intValue).toArray();
        ReadBuilder readBuilder = this.table.copy(Collections.singletonMap(CoreOptions.SCAN_MODE.key(), CoreOptions.StartupMode.LATEST.toString())).newReadBuilder().withProjection(projection);
        AbstractInnerTableScan tableScan = (AbstractInnerTableScan)readBuilder.newScan();
        TableScan.Plan plan = tableScan.withBucketFilter(bucket -> bucket % numAssigners == assignId).plan();
        for (Split split : plan.splits()) {
            RecordReader<InternalRow> reader = readBuilder.newRead().createReader(split);
            Throwable throwable = null;
            try {
                int bucket2 = ((DataSplit)split).bucket();
                GenericRow bucketRow = GenericRow.of(bucket2);
                JoinedRow joinedRow = new JoinedRow();
                reader.transform(row -> joinedRow.replace((InternalRow)row, bucketRow)).forEachRemaining(collector);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (reader == null) continue;
                if (throwable != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                reader.close();
            }
        }
    }

    public static RowType bootstrapType(TableSchema schema) {
        List<String> primaryKeys = schema.primaryKeys();
        List<String> partitionKeys = schema.partitionKeys();
        ArrayList<DataField> bootstrapFields = new ArrayList<DataField>(schema.projectedLogicalRowType(Stream.concat(primaryKeys.stream(), partitionKeys.stream()).collect(Collectors.toList())).getFields());
        bootstrapFields.add(new DataField(RowType.currentHighestFieldId(bootstrapFields) + 1, BUCKET_FIELD, DataTypes.INT().notNull()));
        return new RowType(bootstrapFields);
    }
}

