package org.apache.flink.cdc.runtime.operators.schema.regular;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.class */
public class BatchSchemaOperator extends AbstractStreamOperator<Event> implements OneInputStreamOperator<Event, Event>, Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(BatchSchemaOperator.class);
    private final String timezone;
    private final List<RouteRule> routingRules;
    private volatile transient Map<TableId, Schema> originalSchemaMap;
    private volatile transient Map<TableId, Schema> evolvedSchemaMap;
    private transient TableIdRouter router;
    private transient SchemaDerivator derivator;
    protected transient SchemaManager schemaManager;
    protected MetadataApplier metadataApplier;
    private boolean alreadyMergedCreateTableTables = false;

    public BatchSchemaOperator(List<RouteRule> list, MetadataApplier metadataApplier, String str) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.timezone = str;
        this.routingRules = list;
        this.metadataApplier = metadataApplier;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Event>> output) {
        super.setup(streamTask, streamConfig, output);
    }

    public void open() throws Exception {
        super.open();
        this.originalSchemaMap = new HashMap();
        this.evolvedSchemaMap = new HashMap();
        this.router = new TableIdRouter(this.routingRules);
        this.derivator = new SchemaDerivator();
        this.schemaManager = new SchemaManager(SchemaChangeBehavior.IGNORE);
    }

    public void processElement(StreamRecord<Event> streamRecord) throws Exception {
        Event event = (Event) streamRecord.getValue();
        if (event instanceof CreateTableEvent) {
            handleCreateTableEvent((CreateTableEvent) event);
        } else {
            if (!(event instanceof DataChangeEvent)) {
                throw new RuntimeException("Unknown event type in Batch record: " + event);
            }
            if (!this.alreadyMergedCreateTableTables) {
                handleFirstDataChangeEvent();
                this.alreadyMergedCreateTableTables = true;
            }
            handleDataChangeEvent((DataChangeEvent) event);
        }
    }

    private void handleCreateTableEvent(CreateTableEvent createTableEvent) throws Exception {
        this.originalSchemaMap.put(createTableEvent.tableId(), createTableEvent.getSchema());
    }

    private void handleFirstDataChangeEvent() {
        SchemaDerivator.deduceMergedCreateTableEvent(this.router, (List) this.originalSchemaMap.entrySet().stream().map(entry -> {
            return new CreateTableEvent((TableId) entry.getKey(), (Schema) entry.getValue());
        }).collect(Collectors.toList())).forEach(createTableEvent -> {
            this.evolvedSchemaMap.put(createTableEvent.tableId(), createTableEvent.getSchema());
            applyAndUpdateEvolvedSchemaChange(createTableEvent);
            this.output.collect(new StreamRecord(createTableEvent));
        });
    }

    private void handleDataChangeEvent(DataChangeEvent dataChangeEvent) {
        TableId tableId = dataChangeEvent.tableId();
        Schema schema = this.originalSchemaMap.get(dataChangeEvent.tableId());
        for (TableId tableId2 : this.router.route(tableId)) {
            Schema schema2 = this.evolvedSchemaMap.get(tableId2);
            this.output.collect(new StreamRecord(this.derivator.coerceDataRecord(this.timezone, DataChangeEvent.route(dataChangeEvent, tableId2), schema, schema2).orElseThrow(() -> {
                return new IllegalStateException(String.format("Unable to coerce data record from %s (schema: %s) to %s (schema: %s)", tableId, schema, tableId2, schema2));
            })));
        }
    }

    private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent schemaChangeEvent) {
        this.metadataApplier.applySchemaChange(schemaChangeEvent);
        this.schemaManager.applyEvolvedSchemaChange(schemaChangeEvent);
        LOG.info("Successfully applied schema change event {} to external system.", schemaChangeEvent);
        return true;
    }
}
