package org.apache.flink.cdc.runtime.operators.transform;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException;
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

/* loaded from: input_file:org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.class */
public class PreTransformOperator extends AbstractStreamOperator<Event> implements OneInputStreamOperator<Event, Event>, Serializable {
    private static final long serialVersionUID = 1;
    private final List<TransformRule> transformRules;
    private final List<Tuple3<String, String, Map<String, String>>> udfFunctions;
    private final boolean shouldStoreSchemasInState;
    private transient ListState<byte[]> state;
    private transient List<PreTransformer> transforms;
    private transient List<UserDefinedFunctionDescriptor> udfDescriptors;
    private transient Map<TableId, Boolean> hasAsteriskMap;
    private final Map<TableId, PreTransformChangeInfo> preTransformChangeInfoMap = new ConcurrentHashMap();
    private final Set<TableId> alreadySentCreateTableEvents = new HashSet();
    private transient Map<TableId, PreTransformProcessor> preTransformProcessorMap = new ConcurrentHashMap();
    private final List<Tuple2<Selectors, SchemaMetadataTransform>> schemaMetadataTransformers = new ArrayList();

    public static PreTransformOperatorBuilder newBuilder() {
        return new PreTransformOperatorBuilder();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PreTransformOperator(List<TransformRule> list, List<Tuple3<String, String, Map<String, String>>> list2, boolean z) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.transformRules = list;
        this.udfFunctions = list2;
        this.shouldStoreSchemasInState = z;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Event>> output) {
        super.setup(streamTask, streamConfig, output);
        this.udfDescriptors = (List) this.udfFunctions.stream().map(tuple3 -> {
            return new UserDefinedFunctionDescriptor((String) tuple3.f0, (String) tuple3.f1, (Map) tuple3.f2);
        }).collect(Collectors.toList());
        this.transforms = new ArrayList();
        for (TransformRule transformRule : this.transformRules) {
            String tableInclusions = transformRule.getTableInclusions();
            String projection = transformRule.getProjection();
            String filter = transformRule.getFilter();
            String primaryKey = transformRule.getPrimaryKey();
            String partitionKey = transformRule.getPartitionKey();
            String tableOption = transformRule.getTableOption();
            Selectors build = new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
            this.transforms.add(new PreTransformer(build, TransformProjection.of(projection).orElse(null), TransformFilter.of(filter, this.udfDescriptors).orElse(null)));
            this.schemaMetadataTransformers.add(new Tuple2<>(build, new SchemaMetadataTransform(primaryKey, partitionKey, tableOption)));
        }
        this.preTransformProcessorMap = new ConcurrentHashMap();
        this.hasAsteriskMap = new ConcurrentHashMap();
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        if (this.shouldStoreSchemasInState) {
            this.state = stateInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor("originalSchemaState", byte[].class));
            if (stateInitializationContext.isRestored()) {
                Iterator it = ((Iterable) this.state.get()).iterator();
                while (it.hasNext()) {
                    PreTransformChangeInfo m18deserialize = PreTransformChangeInfo.SERIALIZER.m18deserialize(PreTransformChangeInfo.SERIALIZER.getVersion(), (byte[]) it.next());
                    this.preTransformChangeInfoMap.put(m18deserialize.getTableId(), m18deserialize);
                    cacheTransformRuleInfo(new CreateTableEvent(m18deserialize.getTableId(), m18deserialize.getPreTransformedSchema()));
                }
            }
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        if (this.shouldStoreSchemasInState) {
            this.state.update(new ArrayList((Collection) this.preTransformChangeInfoMap.values().stream().map(preTransformChangeInfo -> {
                try {
                    return PreTransformChangeInfo.SERIALIZER.serialize(preTransformChangeInfo);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }).collect(Collectors.toList())));
        }
    }

    public void finish() throws Exception {
        super.finish();
        clearOperator();
    }

    public void close() throws Exception {
        super.close();
        clearOperator();
        this.state = null;
    }

    public void processElement(StreamRecord<Event> streamRecord) throws Exception {
        ChangeEvent changeEvent = (Event) streamRecord.getValue();
        try {
            processEvent(changeEvent);
        } catch (Exception e) {
            TableId tableId = null;
            Schema schema = null;
            Schema schema2 = null;
            if (changeEvent instanceof ChangeEvent) {
                tableId = changeEvent.tableId();
                PreTransformChangeInfo preTransformChangeInfo = this.preTransformChangeInfoMap.get(tableId);
                if (preTransformChangeInfo != null) {
                    schema = preTransformChangeInfo.getSourceSchema();
                    schema2 = preTransformChangeInfo.getPreTransformedSchema();
                }
            }
            throw new TransformException("pre-transform", changeEvent, tableId, schema, schema2, e);
        }
    }

    private void processEvent(Event event) {
        if (event instanceof CreateTableEvent) {
            CreateTableEvent createTableEvent = (CreateTableEvent) event;
            if (this.preTransformProcessorMap.containsKey(createTableEvent.tableId())) {
                return;
            }
            this.output.collect(new StreamRecord(cacheCreateTable(createTableEvent)));
            this.alreadySentCreateTableEvents.add(createTableEvent.tableId());
            return;
        }
        if (event instanceof DropTableEvent) {
            this.preTransformProcessorMap.remove(((DropTableEvent) event).tableId());
            this.output.collect(new StreamRecord(event));
            return;
        }
        if (event instanceof TruncateTableEvent) {
            this.output.collect(new StreamRecord(event));
            return;
        }
        if (event instanceof SchemaChangeEvent) {
            lazilyEmitCreateTableEvent(event);
            SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
            this.preTransformProcessorMap.remove(schemaChangeEvent.tableId());
            cacheChangeSchema(schemaChangeEvent).ifPresent(schemaChangeEvent2 -> {
                this.output.collect(new StreamRecord(schemaChangeEvent2));
            });
            return;
        }
        if (event instanceof DataChangeEvent) {
            lazilyEmitCreateTableEvent(event);
            this.output.collect(new StreamRecord(processDataChangeEvent((DataChangeEvent) event)));
        }
    }

    private void lazilyEmitCreateTableEvent(Event event) {
        ChangeEvent changeEvent = (ChangeEvent) event;
        if (this.alreadySentCreateTableEvents.contains(changeEvent.tableId())) {
            return;
        }
        PreTransformChangeInfo preTransformChangeInfo = this.preTransformChangeInfoMap.get(changeEvent.tableId());
        this.output.collect(new StreamRecord(new CreateTableEvent(preTransformChangeInfo.getTableId(), preTransformChangeInfo.getPreTransformedSchema())));
        this.alreadySentCreateTableEvents.add(changeEvent.tableId());
    }

    private SchemaChangeEvent cacheCreateTable(CreateTableEvent createTableEvent) {
        TableId tableId = createTableEvent.tableId();
        Schema schema = createTableEvent.getSchema();
        CreateTableEvent transformCreateTableEvent = transformCreateTableEvent(createTableEvent);
        this.preTransformChangeInfoMap.put(tableId, PreTransformChangeInfo.of(tableId, schema, transformCreateTableEvent.getSchema()));
        return transformCreateTableEvent;
    }

    private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent schemaChangeEvent) {
        TableId tableId = schemaChangeEvent.tableId();
        PreTransformChangeInfo preTransformChangeInfo = this.preTransformChangeInfoMap.get(tableId);
        Schema applySchemaChangeEvent = SchemaUtils.applySchemaChangeEvent(preTransformChangeInfo.getSourceSchema(), schemaChangeEvent);
        Schema preTransformedSchema = preTransformChangeInfo.getPreTransformedSchema();
        Optional<SchemaChangeEvent> transformSchemaChangeEvent = this.hasAsteriskMap.getOrDefault(tableId, true).booleanValue() ? SchemaUtils.transformSchemaChangeEvent(true, preTransformChangeInfo.getSourceSchema().getColumnNames(), schemaChangeEvent) : SchemaUtils.transformSchemaChangeEvent(false, preTransformChangeInfo.getPreTransformedSchema().getColumnNames(), schemaChangeEvent);
        if (transformSchemaChangeEvent.isPresent()) {
            preTransformedSchema = SchemaUtils.applySchemaChangeEvent(preTransformChangeInfo.getPreTransformedSchema(), transformSchemaChangeEvent.get());
        }
        cachePreTransformProcessor(tableId, applySchemaChangeEvent);
        this.preTransformChangeInfoMap.put(tableId, PreTransformChangeInfo.of(tableId, applySchemaChangeEvent, preTransformedSchema));
        return transformSchemaChangeEvent;
    }

    private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) {
        TableId tableId = createTableEvent.tableId();
        if (this.transforms.stream().noneMatch(preTransformer -> {
            return preTransformer.getSelectors().isMatch(tableId);
        })) {
            this.hasAsteriskMap.put(tableId, true);
        } else {
            this.hasAsteriskMap.put(createTableEvent.tableId(), Boolean.valueOf(this.transforms.stream().filter(preTransformer2 -> {
                return preTransformer2.getSelectors().isMatch(tableId);
            }).anyMatch(preTransformer3 -> {
                return TransformParser.hasAsterisk((String) preTransformer3.getProjection().map((v0) -> {
                    return v0.getProjection();
                }).orElse(null));
            })));
        }
    }

    private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableEvent) {
        TableId tableId = createTableEvent.tableId();
        for (Tuple2<Selectors, SchemaMetadataTransform> tuple2 : this.schemaMetadataTransformers) {
            if (((Selectors) tuple2.f0).isMatch(tableId)) {
                createTableEvent = new CreateTableEvent(tableId, transformSchemaMetaData(createTableEvent.getSchema(), (SchemaMetadataTransform) tuple2.f1));
            }
        }
        cachePreTransformProcessor(tableId, createTableEvent.getSchema());
        return this.preTransformProcessorMap.containsKey(tableId) ? this.preTransformProcessorMap.get(tableId).preTransformCreateTableEvent(createTableEvent) : createTableEvent;
    }

    private void cachePreTransformProcessor(TableId tableId, Schema schema) {
        LinkedHashSet<Column> linkedHashSet = new LinkedHashSet<>();
        boolean z = false;
        for (PreTransformer preTransformer : this.transforms) {
            if (preTransformer.getSelectors().isMatch(tableId)) {
                processProjectionTransform(tableId, schema, linkedHashSet, preTransformer);
                z = true;
            }
        }
        if (z) {
            return;
        }
        processProjectionTransform(tableId, schema, linkedHashSet, null);
    }

    public void processProjectionTransform(TableId tableId, Schema schema, LinkedHashSet<Column> linkedHashSet, @Nullable PreTransformer preTransformer) {
        if (preTransformer == null) {
            linkedHashSet.addAll(schema.getColumns());
            this.hasAsteriskMap.put(tableId, true);
        } else {
            TransformProjection transformProjection = preTransformer.getProjection().get();
            if (TransformParser.hasAsterisk(transformProjection.getProjection())) {
                linkedHashSet.addAll(schema.getColumns());
                this.hasAsteriskMap.put(tableId, true);
            } else {
                TransformFilter orElse = preTransformer.getFilter().orElse(null);
                linkedHashSet.addAll(TransformParser.generateReferencedColumns(transformProjection.getProjection(), orElse != null ? orElse.getExpression() : null, schema.getColumns()));
                this.hasAsteriskMap.putIfAbsent(tableId, false);
            }
        }
        this.preTransformProcessorMap.put(tableId, new PreTransformProcessor(PreTransformChangeInfo.of(tableId, schema, schema.copy(new ArrayList(linkedHashSet)))));
    }

    private Schema transformSchemaMetaData(Schema schema, SchemaMetadataTransform schemaMetadataTransform) {
        Schema.Builder columns = Schema.newBuilder().setColumns(schema.getColumns());
        if (schemaMetadataTransform.getPrimaryKeys().isEmpty()) {
            columns.primaryKey(schema.primaryKeys());
        } else {
            columns.primaryKey(schemaMetadataTransform.getPrimaryKeys());
        }
        if (schemaMetadataTransform.getPartitionKeys().isEmpty()) {
            columns.partitionKey(schema.partitionKeys());
        } else {
            columns.partitionKey(schemaMetadataTransform.getPartitionKeys());
        }
        if (schemaMetadataTransform.getOptions().isEmpty()) {
            columns.options(schema.options());
        } else {
            columns.options(schemaMetadataTransform.getOptions());
        }
        return columns.build();
    }

    private DataChangeEvent processDataChangeEvent(DataChangeEvent dataChangeEvent) {
        if (!this.transforms.isEmpty()) {
            TableId tableId = dataChangeEvent.tableId();
            PreTransformProcessor preTransformProcessor = this.preTransformProcessorMap.get(tableId);
            Preconditions.checkArgument(preTransformProcessor != null, "Transform operator receives a data change event from table %s without a full schema view. This might happen if source with distributed tables doesn't emit CreateTableEvent first after fail-over. This is likely a bug, please consider filing an issue.", new Object[]{tableId});
            BinaryRecordData binaryRecordData = (BinaryRecordData) dataChangeEvent.before();
            BinaryRecordData binaryRecordData2 = (BinaryRecordData) dataChangeEvent.after();
            if (binaryRecordData != null) {
                dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, preTransformProcessor.processFillDataField(binaryRecordData));
            }
            if (binaryRecordData2 != null) {
                dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, preTransformProcessor.processFillDataField(binaryRecordData2));
            }
        }
        return dataChangeEvent;
    }

    private void clearOperator() {
        this.transforms = null;
        this.preTransformProcessorMap = null;
    }
}
