package com.ververica.cdc.runtime.operators.schema;

import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.FlushEvent;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
import com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:com/ververica/cdc/runtime/operators/schema/SchemaOperator.class */
public class SchemaOperator extends AbstractStreamOperator<Event> implements OneInputStreamOperator<Event, Event> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class);
    private transient TaskOperatorEventGateway toCoordinator;

    public SchemaOperator() {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Event>> output) {
        super.setup(streamTask, streamConfig, output);
        this.toCoordinator = streamTask.getEnvironment().getOperatorCoordinatorEventGateway();
    }

    public void processElement(StreamRecord<Event> streamRecord) {
        SchemaChangeEvent schemaChangeEvent = (Event) streamRecord.getValue();
        if (!(schemaChangeEvent instanceof SchemaChangeEvent)) {
            this.output.collect(streamRecord);
            return;
        }
        TableId tableId = schemaChangeEvent.tableId();
        LOG.info("Table {} received SchemaChangeEvent and start to be blocked.", tableId.toString());
        handleSchemaChangeEvent(tableId, schemaChangeEvent);
    }

    private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) {
        if (requestSchemaChange(tableId, schemaChangeEvent).isShouldSendFlushEvent()) {
            LOG.info("Sending the FlushEvent for table {} in subtask {}.", tableId, Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
            this.output.collect(new StreamRecord(new FlushEvent(tableId)));
            this.output.collect(new StreamRecord(schemaChangeEvent));
            requestReleaseUpstream();
        }
    }

    private SchemaChangeResponse requestSchemaChange(TableId tableId, SchemaChangeEvent schemaChangeEvent) {
        return (SchemaChangeResponse) sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent));
    }

    private ReleaseUpstreamResponse requestReleaseUpstream() {
        return (ReleaseUpstreamResponse) sendRequestToCoordinator(new ReleaseUpstreamRequest());
    }

    private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse> RESPONSE sendRequestToCoordinator(REQUEST request) {
        try {
            return (RESPONSE) CoordinationResponseUtils.unwrap((CoordinationResponse) this.toCoordinator.sendRequestToCoordinator(getOperatorID(), new SerializedValue(request)).get());
        } catch (Exception e) {
            throw new IllegalStateException("Failed to send request to coordinator: " + request.toString(), e);
        }
    }
}
