package org.apache.flink.cdc.runtime.operators.schema.regular;

import java.io.Serializable;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
import org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.class */
public class SchemaOperator extends AbstractStreamOperator<Event> implements OneInputStreamOperator<Event, Event>, Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class);
    private final String timezone;
    private final Duration rpcTimeout;
    private final SchemaChangeBehavior schemaChangeBehavior;
    private final List<RouteRule> routingRules;
    private transient int subTaskId;
    private transient TaskOperatorEventGateway toCoordinator;
    private transient SchemaOperatorMetrics schemaOperatorMetrics;
    private volatile transient Map<TableId, Schema> originalSchemaMap;
    private volatile transient Map<TableId, Schema> evolvedSchemaMap;
    private transient TableIdRouter router;
    private transient SchemaDerivator derivator;

    @VisibleForTesting
    public SchemaOperator(List<RouteRule> list) {
        this(list, PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
    }

    @VisibleForTesting
    public SchemaOperator(List<RouteRule> list, Duration duration) {
        this(list, duration, SchemaChangeBehavior.EVOLVE);
    }

    @VisibleForTesting
    public SchemaOperator(List<RouteRule> list, Duration duration, SchemaChangeBehavior schemaChangeBehavior) {
        this(list, duration, schemaChangeBehavior, "UTC");
    }

    public SchemaOperator(List<RouteRule> list, Duration duration, SchemaChangeBehavior schemaChangeBehavior, String str) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.rpcTimeout = duration;
        this.schemaChangeBehavior = schemaChangeBehavior;
        this.timezone = str;
        this.routingRules = list;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Event>> output) {
        super.setup(streamTask, streamConfig, output);
        this.toCoordinator = streamTask.getEnvironment().getOperatorCoordinatorEventGateway();
    }

    public void open() throws Exception {
        super.open();
        this.schemaOperatorMetrics = new SchemaOperatorMetrics(getRuntimeContext().getMetricGroup(), this.schemaChangeBehavior);
        this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
        this.originalSchemaMap = new HashMap();
        this.evolvedSchemaMap = new HashMap();
        this.router = new TableIdRouter(this.routingRules);
        this.derivator = new SchemaDerivator();
    }

    public void processElement(StreamRecord<Event> streamRecord) throws Exception {
        Event event = (Event) streamRecord.getValue();
        if (event instanceof SchemaChangeEvent) {
            handleSchemaChangeEvent((SchemaChangeEvent) event);
        } else {
            if (!(event instanceof DataChangeEvent)) {
                throw new RuntimeException("Unknown event type in Stream record: " + event);
            }
            handleDataChangeEvent((DataChangeEvent) event);
        }
    }

    private void handleSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent) throws Exception {
        TableId tableId = schemaChangeEvent.tableId();
        this.originalSchemaMap.compute(tableId, (tableId2, schema) -> {
            return SchemaUtils.applySchemaChangeEvent(schema, schemaChangeEvent);
        });
        this.schemaOperatorMetrics.increaseSchemaChangeEvents(serialVersionUID);
        List<TableId> route = this.router.route(tableId);
        LOG.info("{}> Sending the FlushEvent.", Integer.valueOf(this.subTaskId));
        this.output.collect(new StreamRecord(new FlushEvent(this.subTaskId, route, schemaChangeEvent.getType())));
        LOG.info("{}> Going to request schema change...", Integer.valueOf(this.subTaskId));
        SchemaChangeResponse requestSchemaChange = requestSchemaChange(tableId, schemaChangeEvent);
        LOG.info("{}> Finished schema change events: {}", Integer.valueOf(this.subTaskId), requestSchemaChange.getAppliedSchemaChangeEvents());
        LOG.info("{}> Refreshed evolved schemas: {}", Integer.valueOf(this.subTaskId), requestSchemaChange.getEvolvedSchemas());
        List<SchemaChangeEvent> appliedSchemaChangeEvents = requestSchemaChange.getAppliedSchemaChangeEvents();
        this.evolvedSchemaMap.putAll(requestSchemaChange.getEvolvedSchemas());
        Iterator<SchemaChangeEvent> it = appliedSchemaChangeEvents.iterator();
        while (it.hasNext()) {
            this.output.collect(new StreamRecord(it.next()));
        }
        this.schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(appliedSchemaChangeEvents.size());
    }

    private void handleDataChangeEvent(DataChangeEvent dataChangeEvent) {
        TableId tableId = dataChangeEvent.tableId();
        Schema schema = this.originalSchemaMap.get(dataChangeEvent.tableId());
        for (TableId tableId2 : this.router.route(tableId)) {
            Schema schema2 = this.evolvedSchemaMap.get(tableId2);
            this.output.collect(new StreamRecord(this.derivator.coerceDataRecord(this.timezone, DataChangeEvent.route(dataChangeEvent, tableId2), schema, schema2).orElseThrow(() -> {
                return new IllegalStateException(String.format("Unable to coerce data record from %s (schema: %s) to %s (schema: %s)", tableId, schema, tableId2, schema2));
            })));
        }
    }

    private SchemaChangeResponse requestSchemaChange(TableId tableId, SchemaChangeEvent schemaChangeEvent) {
        return (SchemaChangeResponse) sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent, this.subTaskId));
    }

    private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse> RESPONSE sendRequestToCoordinator(REQUEST request) {
        try {
            return (RESPONSE) CoordinationResponseUtils.unwrap((CoordinationResponse) this.toCoordinator.sendRequestToCoordinator(getOperatorID(), new SerializedValue(request)).get(this.rpcTimeout.toMillis(), TimeUnit.MILLISECONDS));
        } catch (Exception e) {
            throw new IllegalStateException("Failed to send request to coordinator: " + request.toString(), e);
        }
    }

    @VisibleForTesting
    public void registerInitialSchema(TableId tableId, Schema schema) {
        this.originalSchemaMap.put(tableId, schema);
        this.evolvedSchemaMap.put(tableId, schema);
    }
}
