package org.apache.flink.cdc.runtime.operators.transform;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.SchemaMergingUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.transform.converter.PostTransformConverters;
import org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException;
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
import org.apache.flink.shaded.guava31.com.google.common.collect.HashBasedTable;
import org.apache.flink.shaded.guava31.com.google.common.collect.Table;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.class */
public class PostTransformOperator extends AbstractStreamOperator<Event> implements OneInputStreamOperator<Event, Event>, Serializable {
    private static final long serialVersionUID = 1;
    private final String timezone;
    private final List<TransformRule> transformRules;
    private final Map<TableId, Boolean> hasAsteriskMap = new HashMap();
    private final Map<TableId, List<String>> projectedColumnsMap = new HashMap();
    private final Map<TableId, PostTransformChangeInfo> postTransformInfoMap = new ConcurrentHashMap();
    private final List<Tuple3<String, String, Map<String, String>>> udfFunctions;
    private transient List<PostTransformer> transformers;
    private transient List<UserDefinedFunctionDescriptor> udfDescriptors;
    private transient List<Object> udfFunctionInstances;
    private transient Table<TableId, PostTransformer, TransformProjectionProcessor> projectionProcessors;
    private transient Table<TableId, PostTransformer, TransformFilterProcessor> filterProcessors;

    public static PostTransformOperatorBuilder newBuilder() {
        return new PostTransformOperatorBuilder();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PostTransformOperator(List<TransformRule> list, String str, List<Tuple3<String, String, Map<String, String>>> list2) {
        this.timezone = str;
        this.transformRules = list;
        this.udfFunctions = list2;
    }

    public void open() throws Exception {
        super.open();
        this.projectionProcessors = HashBasedTable.create();
        this.filterProcessors = HashBasedTable.create();
        initializeUdf();
        this.transformers = createTransformers();
    }

    public void close() throws Exception {
        super.close();
        TransformExpressionCompiler.cleanUp();
        destroyUdf();
    }

    public void processElement(StreamRecord<Event> streamRecord) throws Exception {
        try {
            processElementInternal(streamRecord);
        } catch (Exception e) {
            ChangeEvent changeEvent = (Event) streamRecord.getValue();
            TableId tableId = null;
            Schema schema = null;
            Schema schema2 = null;
            if (changeEvent instanceof ChangeEvent) {
                tableId = changeEvent.tableId();
                PostTransformChangeInfo postTransformChangeInfo = this.postTransformInfoMap.get(tableId);
                if (postTransformChangeInfo != null) {
                    schema = postTransformChangeInfo.getPreTransformedSchema();
                    schema2 = postTransformChangeInfo.getPostTransformedSchema();
                }
            }
            throw new TransformException("post-transform", changeEvent, tableId, schema, schema2, e);
        }
    }

    private void processElementInternal(StreamRecord<Event> streamRecord) {
        ChangeEvent changeEvent = (Event) streamRecord.getValue();
        if (changeEvent == null) {
            return;
        }
        if (!(changeEvent instanceof ChangeEvent)) {
            throw new UnsupportedOperationException("Unexpected stream record event: " + changeEvent);
        }
        TableId tableId = changeEvent.tableId();
        List<PostTransformer> effectiveTransformers = getEffectiveTransformers(tableId);
        if (effectiveTransformers.isEmpty()) {
            this.output.collect(streamRecord);
            return;
        }
        if (changeEvent instanceof CreateTableEvent) {
            Optional<U> map = processCreateTableEvent((CreateTableEvent) changeEvent, effectiveTransformers).map((v1) -> {
                return new StreamRecord(v1);
            });
            Output output = this.output;
            output.getClass();
            map.ifPresent((v1) -> {
                r1.collect(v1);
            });
            invalidateCache(tableId);
            return;
        }
        if (changeEvent instanceof SchemaChangeEvent) {
            Optional<U> map2 = processSchemaChangeEvent((SchemaChangeEvent) changeEvent, effectiveTransformers).map((v1) -> {
                return new StreamRecord(v1);
            });
            Output output2 = this.output;
            output2.getClass();
            map2.ifPresent((v1) -> {
                r1.collect(v1);
            });
            invalidateCache(tableId);
            return;
        }
        if (!(changeEvent instanceof DataChangeEvent)) {
            throw new UnsupportedOperationException("Unexpected stream record event: " + changeEvent);
        }
        Optional<U> map3 = processDataChangeEvent((DataChangeEvent) changeEvent, effectiveTransformers).map((v1) -> {
            return new StreamRecord(v1);
        });
        Output output3 = this.output;
        output3.getClass();
        map3.ifPresent((v1) -> {
            r1.collect(v1);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Optional<Event> processCreateTableEvent(CreateTableEvent createTableEvent, List<PostTransformer> list) {
        TableId tableId = createTableEvent.tableId();
        Schema schema = createTableEvent.getSchema();
        Schema ensurePkNonNull = SchemaUtils.ensurePkNonNull(SchemaMergingUtils.strictlyMergeSchemas((List) list.stream().map(postTransformer -> {
            return transformSchema(schema, postTransformer);
        }).collect(Collectors.toList())));
        this.postTransformInfoMap.put(tableId, PostTransformChangeInfo.of(tableId, schema, ensurePkNonNull));
        this.hasAsteriskMap.put(tableId, Boolean.valueOf(list.stream().map((v0) -> {
            return v0.getProjection();
        }).flatMap(this::optionalToStream).map((v0) -> {
            return v0.getProjection();
        }).anyMatch(TransformParser::hasAsterisk)));
        Map<TableId, List<String>> map = this.projectedColumnsMap;
        Stream stream = schema.getColumnNames().stream();
        List columnNames = ensurePkNonNull.getColumnNames();
        columnNames.getClass();
        map.put(tableId, stream.filter((v1) -> {
            return r3.contains(v1);
        }).collect(Collectors.toList()));
        return Optional.of(new CreateTableEvent(tableId, ensurePkNonNull));
    }

    private Optional<Event> processSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent, List<PostTransformer> list) {
        TableId tableId = schemaChangeEvent.tableId();
        PostTransformChangeInfo postTransformChangeInfo = (PostTransformChangeInfo) Preconditions.checkNotNull(this.postTransformInfoMap.get(tableId));
        Schema applySchemaChangeEvent = SchemaUtils.applySchemaChangeEvent(postTransformChangeInfo.getPreTransformedSchema(), schemaChangeEvent);
        this.postTransformInfoMap.put(tableId, PostTransformChangeInfo.of(tableId, applySchemaChangeEvent, SchemaUtils.ensurePkNonNull(SchemaMergingUtils.strictlyMergeSchemas((List) list.stream().map(postTransformer -> {
            return transformSchema(applySchemaChangeEvent, postTransformer);
        }).collect(Collectors.toList())))));
        List columnNames = postTransformChangeInfo.getPostTransformedSchema().getColumnNames();
        if (this.hasAsteriskMap.getOrDefault(tableId, true).booleanValue()) {
            Optional transformSchemaChangeEvent = SchemaUtils.transformSchemaChangeEvent(true, columnNames, schemaChangeEvent);
            Class<Event> cls = Event.class;
            Event.class.getClass();
            return transformSchemaChangeEvent.map((v1) -> {
                return r1.cast(v1);
            });
        }
        Optional transformSchemaChangeEvent2 = SchemaUtils.transformSchemaChangeEvent(false, this.projectedColumnsMap.get(tableId), schemaChangeEvent);
        Class<Event> cls2 = Event.class;
        Event.class.getClass();
        return transformSchemaChangeEvent2.map((v1) -> {
            return r1.cast(v1);
        });
    }

    private Optional<Event> processDataChangeEvent(DataChangeEvent dataChangeEvent, List<PostTransformer> list) {
        TableId tableId = dataChangeEvent.tableId();
        PostTransformChangeInfo postTransformChangeInfo = (PostTransformChangeInfo) Preconditions.checkNotNull(this.postTransformInfoMap.get(tableId));
        TransformContext transformContext = new TransformContext();
        transformContext.epochTime = System.currentTimeMillis();
        transformContext.meta = dataChangeEvent.meta();
        String opTypeString = dataChangeEvent.opTypeString(false);
        String opTypeString2 = dataChangeEvent.opTypeString(true);
        for (PostTransformer postTransformer : list) {
            TransformProjectionProcessor projectionProcessor = getProjectionProcessor(tableId, postTransformer);
            TransformFilterProcessor filterProcessor = getFilterProcessor(tableId, postTransformer);
            RecordData recordData = null;
            RecordData recordData2 = null;
            boolean z = true;
            if (dataChangeEvent.before() != null) {
                transformContext.opType = opTypeString;
                Tuple2<BinaryRecordData, Boolean> transformRecord = transformRecord(dataChangeEvent.before(), postTransformChangeInfo, projectionProcessor, filterProcessor, transformContext);
                recordData = (RecordData) transformRecord.f0;
                z = ((Boolean) transformRecord.f1).booleanValue();
            }
            if (dataChangeEvent.after() != null) {
                transformContext.opType = opTypeString2;
                Tuple2<BinaryRecordData, Boolean> transformRecord2 = transformRecord(dataChangeEvent.after(), postTransformChangeInfo, projectionProcessor, filterProcessor, transformContext);
                recordData2 = (RecordData) transformRecord2.f0;
                z = ((Boolean) transformRecord2.f1).booleanValue();
            }
            if (z) {
                DataChangeEvent projectRecords = DataChangeEvent.projectRecords(dataChangeEvent, recordData, recordData2);
                if (!postTransformer.getPostTransformConverter().isPresent()) {
                    return Optional.of(projectRecords);
                }
                Optional<DataChangeEvent> convert = postTransformer.getPostTransformConverter().get().convert(projectRecords);
                Class<Event> cls = Event.class;
                Event.class.getClass();
                return convert.map((v1) -> {
                    return r1.cast(v1);
                });
            }
        }
        return Optional.empty();
    }

    private Schema transformSchema(Schema schema, PostTransformer postTransformer) {
        return schema.copy((List) TransformParser.generateProjectionColumns((String) postTransformer.getProjection().map((v0) -> {
            return v0.getProjection();
        }).orElse(null), schema.getColumns(), this.udfDescriptors, postTransformer.getSupportedMetadataColumns()).stream().map((v0) -> {
            return v0.getColumn();
        }).collect(Collectors.toList()));
    }

    private Tuple2<BinaryRecordData, Boolean> transformRecord(RecordData recordData, PostTransformChangeInfo postTransformChangeInfo, @Nullable TransformProjectionProcessor transformProjectionProcessor, @Nullable TransformFilterProcessor transformFilterProcessor, TransformContext transformContext) {
        RecordData.FieldGetter[] preTransformedFieldGetters = postTransformChangeInfo.getPreTransformedFieldGetters();
        Schema preTransformedSchema = postTransformChangeInfo.getPreTransformedSchema();
        Schema postTransformedSchema = postTransformChangeInfo.getPostTransformedSchema();
        BinaryRecordDataGenerator postTransformedRecordDataGenerator = postTransformChangeInfo.getPostTransformedRecordDataGenerator();
        Object[] objArr = new Object[preTransformedFieldGetters.length];
        for (int i = 0; i < preTransformedFieldGetters.length; i++) {
            objArr[i] = DataTypeConverter.convertToOriginal(preTransformedFieldGetters[i].getFieldOrNull(recordData), (DataType) preTransformedSchema.getColumnDataTypes().get(i));
        }
        Object[] project = transformProjectionProcessor != null ? transformProjectionProcessor.project(objArr, transformContext) : objArr;
        boolean z = transformFilterProcessor == null || transformFilterProcessor.test(objArr, project, transformContext);
        Object[] objArr2 = new Object[postTransformedSchema.getColumnCount()];
        for (int i2 = 0; i2 < project.length; i2++) {
            objArr2[i2] = DataTypeConverter.convert(project[i2], (DataType) postTransformedSchema.getColumnDataTypes().get(i2));
        }
        return Tuple2.of(postTransformedRecordDataGenerator.generate(objArr2), Boolean.valueOf(z));
    }

    private List<PostTransformer> getEffectiveTransformers(TableId tableId) {
        return (List) this.transformers.stream().filter(postTransformer -> {
            return postTransformer.getSelectors().isMatch(tableId);
        }).collect(Collectors.toList());
    }

    private TransformProjectionProcessor getProjectionProcessor(TableId tableId, PostTransformer postTransformer) {
        if (!this.projectionProcessors.contains(tableId, postTransformer)) {
            this.projectionProcessors.put(tableId, postTransformer, new TransformProjectionProcessor(this.postTransformInfoMap.get(tableId), (String) postTransformer.getProjection().map((v0) -> {
                return v0.getProjection();
            }).orElse(null), this.timezone, this.udfDescriptors, this.udfFunctionInstances, postTransformer.getSupportedMetadataColumns()));
        }
        return (TransformProjectionProcessor) this.projectionProcessors.get(tableId, postTransformer);
    }

    private TransformFilterProcessor getFilterProcessor(TableId tableId, PostTransformer postTransformer) {
        if (!this.filterProcessors.contains(tableId, postTransformer)) {
            if (postTransformer.getFilter().isPresent()) {
                this.filterProcessors.put(tableId, postTransformer, TransformFilterProcessor.of(this.postTransformInfoMap.get(tableId), postTransformer.getFilter().orElse(null), this.timezone, this.udfDescriptors, this.udfFunctionInstances, postTransformer.getSupportedMetadataColumns()));
            } else {
                this.filterProcessors.put(tableId, postTransformer, TransformFilterProcessor.ofNoOp());
            }
        }
        return (TransformFilterProcessor) this.filterProcessors.get(tableId, postTransformer);
    }

    private void invalidateCache(TableId tableId) {
        this.projectionProcessors.row(tableId).clear();
        this.filterProcessors.row(tableId).clear();
    }

    private List<PostTransformer> createTransformers() {
        ArrayList arrayList = new ArrayList();
        for (TransformRule transformRule : this.transformRules) {
            String projection = transformRule.getProjection();
            String filter = transformRule.getFilter();
            arrayList.add(new PostTransformer(new Selectors.SelectorsBuilder().includeTables(transformRule.getTableInclusions()).build(), TransformProjection.of(projection).orElse(null), TransformFilter.of(filter, this.udfDescriptors).orElse(null), PostTransformConverters.of(transformRule.getPostTransformConverter()).orElse(null), transformRule.getSupportedMetadataColumns()));
        }
        return arrayList;
    }

    private void initializeUdf() {
        this.udfDescriptors = (List) this.udfFunctions.stream().map(UserDefinedFunctionDescriptor::new).collect(Collectors.toList());
        this.udfFunctionInstances = new ArrayList();
        for (UserDefinedFunctionDescriptor userDefinedFunctionDescriptor : this.udfDescriptors) {
            try {
                Object newInstance = Class.forName(userDefinedFunctionDescriptor.getClasspath()).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                this.udfFunctionInstances.add(newInstance);
                if (userDefinedFunctionDescriptor.isCdcPipelineUdf()) {
                    newInstance.getClass().getMethod("open", UserDefinedFunctionContext.class).invoke(newInstance, () -> {
                        return Configuration.fromMap(userDefinedFunctionDescriptor.getParameters());
                    });
                }
            } catch (ReflectiveOperationException e) {
                throw new RuntimeException("Failed to instantiate UDF function " + userDefinedFunctionDescriptor, e);
            }
        }
    }

    private void destroyUdf() {
        if (this.udfDescriptors == null || this.udfFunctionInstances == null) {
            return;
        }
        for (int i = 0; i < this.udfDescriptors.size(); i++) {
            UserDefinedFunctionDescriptor userDefinedFunctionDescriptor = this.udfDescriptors.get(i);
            try {
                if (userDefinedFunctionDescriptor.isCdcPipelineUdf()) {
                    Object obj = this.udfFunctionInstances.get(i);
                    obj.getClass().getMethod("close", new Class[0]).invoke(obj, new Object[0]);
                }
            } catch (ReflectiveOperationException e) {
                throw new RuntimeException("Failed to destroy UDF " + userDefinedFunctionDescriptor, e);
            }
        }
        this.udfDescriptors.clear();
        this.udfFunctionInstances.clear();
    }

    private <T> Stream<T> optionalToStream(Optional<T> optional) {
        return (Stream) optional.map(Stream::of).orElseGet(Stream::empty);
    }
}
