/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.index;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.index.IndexBootstrap;
import org.apache.paimon.flink.sink.index.KeyPartOrRow;
import org.apache.paimon.utils.SerializableFunction;

public class IndexBootstrapOperator<T>
extends AbstractStreamOperator<Tuple2<KeyPartOrRow, T>>
implements OneInputStreamOperator<T, Tuple2<KeyPartOrRow, T>> {
    private static final long serialVersionUID = 1L;
    private final IndexBootstrap bootstrap;
    private final SerializableFunction<InternalRow, T> converter;

    public IndexBootstrapOperator(IndexBootstrap bootstrap, SerializableFunction<InternalRow, T> converter) {
        this.bootstrap = bootstrap;
        this.converter = converter;
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.bootstrap.bootstrap(this.getRuntimeContext().getNumberOfParallelSubtasks(), this.getRuntimeContext().getIndexOfThisSubtask(), this::collect);
    }

    public void processElement(StreamRecord<T> streamRecord) throws Exception {
        this.output.collect((Object)new StreamRecord((Object)new Tuple2((Object)KeyPartOrRow.ROW, streamRecord.getValue())));
    }

    private void collect(InternalRow row) {
        this.output.collect((Object)new StreamRecord((Object)new Tuple2((Object)KeyPartOrRow.KEY_PART, this.converter.apply(row))));
    }
}

