/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.types.DataField;

public class CdcMultiTableParsingProcessFunction<T>
extends ProcessFunction<T, Void> {
    private final EventParser.Factory<T> parserFactory;
    private transient EventParser<T> parser;
    private transient Map<String, OutputTag<List<DataField>>> updatedDataFieldsOutputTags;
    private transient Map<String, OutputTag<CdcRecord>> recordOutputTags;

    public CdcMultiTableParsingProcessFunction(EventParser.Factory<T> parserFactory) {
        this.parserFactory = parserFactory;
    }

    public void open(Configuration parameters) throws Exception {
        this.parser = this.parserFactory.create();
        this.updatedDataFieldsOutputTags = new HashMap<String, OutputTag<List<DataField>>>();
        this.recordOutputTags = new HashMap<String, OutputTag<CdcRecord>>();
    }

    public void processElement(T raw, ProcessFunction.Context context, Collector<Void> collector) throws Exception {
        this.parser.setRawEvent(raw);
        String tableName = this.parser.parseTableName();
        List<DataField> schemaChange = this.parser.parseSchemaChange();
        if (schemaChange.size() > 0) {
            context.output(this.getUpdatedDataFieldsOutputTag(tableName), schemaChange);
        }
        this.parser.parseRecords().forEach(record -> context.output(this.getRecordOutputTag(tableName), record));
    }

    private OutputTag<List<DataField>> getUpdatedDataFieldsOutputTag(String tableName) {
        return this.updatedDataFieldsOutputTags.computeIfAbsent(tableName, CdcMultiTableParsingProcessFunction::createUpdatedDataFieldsOutputTag);
    }

    public static OutputTag<List<DataField>> createUpdatedDataFieldsOutputTag(String tableName) {
        return new OutputTag("new-data-field-list-" + tableName, (TypeInformation)new ListTypeInfo(DataField.class));
    }

    private OutputTag<CdcRecord> getRecordOutputTag(String tableName) {
        return this.recordOutputTags.computeIfAbsent(tableName, CdcMultiTableParsingProcessFunction::createRecordOutputTag);
    }

    public static OutputTag<CdcRecord> createRecordOutputTag(String tableName) {
        return new OutputTag("record-" + tableName, TypeInformation.of(CdcRecord.class));
    }
}

