package org.apache.paimon.flink.sink.index;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.ExternalBuffer;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.sink.RowDataPartitionKeyExtractor;
import org.apache.paimon.flink.utils.ProjectToRowDataFunction;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.SerializableFunction;

/* loaded from: input_file:org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.class */
public class GlobalIndexAssignerOperator<T> extends AbstractStreamOperator<Tuple2<T, Integer>> implements OneInputStreamOperator<Tuple2<KeyPartOrRow, T>, Tuple2<T, Integer>>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    private final Table table;
    private final GlobalIndexAssigner<T> assigner;
    private final SerializableFunction<T, InternalRow> toRow;
    private final SerializableFunction<InternalRow, T> fromRow;
    private transient ExternalBuffer bootstrapBuffer;

    public GlobalIndexAssignerOperator(Table table, GlobalIndexAssigner<T> globalIndexAssigner, SerializableFunction<T, InternalRow> serializableFunction, SerializableFunction<InternalRow, T> serializableFunction2) {
        this.table = table;
        this.assigner = globalIndexAssigner;
        this.toRow = serializableFunction;
        this.fromRow = serializableFunction2;
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        IOManager iOManager = getContainingTask().getEnvironment().getIOManager();
        File[] spillingDirectories = iOManager.getSpillingDirectories();
        this.assigner.open(spillingDirectories[ThreadLocalRandom.current().nextInt(spillingDirectories.length)], getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask(), (v1, v2) -> {
            collect(v1, v2);
        });
        Options fromMap = Options.fromMap(this.table.options());
        this.bootstrapBuffer = new ExternalBuffer(org.apache.paimon.disk.IOManager.create(iOManager.getSpillingDirectoriesPaths()), new HeapMemorySegmentPool(((MemorySize) fromMap.get(CoreOptions.WRITE_BUFFER_SIZE)).getBytes(), (int) ((MemorySize) fromMap.get(CoreOptions.PAGE_SIZE)).getBytes()), new InternalRowSerializer(this.table.rowType()));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<Tuple2<KeyPartOrRow, T>> streamRecord) throws Exception {
        Object obj = ((Tuple2) streamRecord.getValue()).f1;
        switch ((KeyPartOrRow) r0.f0) {
            case KEY_PART:
                this.assigner.bootstrap(obj);
                return;
            case ROW:
                if (this.bootstrapBuffer != null) {
                    this.bootstrapBuffer.add((InternalRow) this.toRow.apply(obj));
                    return;
                } else {
                    this.assigner.process(obj);
                    return;
                }
            default:
                return;
        }
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        endBootstrap();
    }

    public void endInput() throws Exception {
        endBootstrap();
    }

    private void endBootstrap() throws Exception {
        if (this.bootstrapBuffer != null) {
            this.bootstrapBuffer.complete();
            ExternalBuffer.BufferIterator newIterator = this.bootstrapBuffer.newIterator();
            Throwable th = null;
            while (newIterator.advanceNext()) {
                try {
                    try {
                        this.assigner.process(this.fromRow.apply(newIterator.getRow()));
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (newIterator != null) {
                        if (th != null) {
                            try {
                                newIterator.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            newIterator.close();
                        }
                    }
                    throw th3;
                }
            }
            if (newIterator != null) {
                if (0 != 0) {
                    try {
                        newIterator.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    newIterator.close();
                }
            }
            this.bootstrapBuffer.reset();
            this.bootstrapBuffer = null;
        }
    }

    private void collect(T t, int i) {
        this.output.collect(new StreamRecord(new Tuple2(t, Integer.valueOf(i))));
    }

    public void close() throws IOException {
        this.assigner.close();
    }

    public static GlobalIndexAssignerOperator<RowData> forRowData(Table table) {
        return new GlobalIndexAssignerOperator<>(table, createRowDataAssigner(table), FlinkRowWrapper::new, FlinkRowData::new);
    }

    public static GlobalIndexAssigner<RowData> createRowDataAssigner(Table table) {
        int fieldCount = IndexBootstrap.bootstrapType(((AbstractFileStoreTable) table).schema()).getFieldCount() - 1;
        return new GlobalIndexAssigner<>(table, RowDataPartitionKeyExtractor::new, KeyPartPartitionKeyExtractor::new, rowData -> {
            return Integer.valueOf(rowData.getInt(fieldCount));
        }, new ProjectToRowDataFunction(table.rowType(), table.partitionKeys()), (rowData2, rowKind) -> {
            rowData2.setRowKind(FlinkRowData.toFlinkRowKind(rowKind));
            return rowData2;
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1109939493:
                if (implMethodName.equals("lambda$createRowDataAssigner$10a11322$1")) {
                    z = true;
                    break;
                }
                break;
            case -205235446:
                if (implMethodName.equals("lambda$createRowDataAssigner$7c4216c4$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/FlinkRowWrapper") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/table/data/RowData;)V")) {
                    return FlinkRowWrapper::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/FlinkRowData") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/data/InternalRow;)V")) {
                    return FlinkRowData::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/RowDataPartitionKeyExtractor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/schema/TableSchema;)V")) {
                    return RowDataPartitionKeyExtractor::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/index/KeyPartPartitionKeyExtractor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/schema/TableSchema;)V")) {
                    return KeyPartPartitionKeyExtractor::new;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerBiFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/table/data/RowData;Lorg/apache/paimon/types/RowKind;)Lorg/apache/flink/table/data/RowData;")) {
                    return (rowData2, rowKind) -> {
                        rowData2.setRowKind(FlinkRowData.toFlinkRowKind(rowKind));
                        return rowData2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator") && serializedLambda.getImplMethodSignature().equals("(ILorg/apache/flink/table/data/RowData;)Ljava/lang/Integer;")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return rowData -> {
                        return Integer.valueOf(rowData.getInt(intValue));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
