package org.apache.flink.cdc.runtime.operators.sink;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.operators.sink.exception.SinkWrapperException;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

@Internal
/* loaded from: input_file:org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.class */
public class DataSinkFunctionOperator extends StreamSink<Event> {
    private SchemaEvolutionClient schemaEvolutionClient;
    private final OperatorID schemaOperatorID;
    private final Set<TableId> processedTableIds;

    public DataSinkFunctionOperator(SinkFunction<Event> sinkFunction, OperatorID operatorID) {
        super(sinkFunction);
        this.schemaOperatorID = operatorID;
        this.processedTableIds = new HashSet();
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Object>> output) {
        super.setup(streamTask, streamConfig, output);
        this.schemaEvolutionClient = new SchemaEvolutionClient(streamTask.getEnvironment().getOperatorCoordinatorEventGateway(), this.schemaOperatorID);
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        this.schemaEvolutionClient.registerSubtask(getRuntimeContext().getIndexOfThisSubtask());
        super.initializeState(stateInitializationContext);
    }

    public void processElement(StreamRecord<Event> streamRecord) throws Exception {
        CreateTableEvent createTableEvent = (Event) streamRecord.getValue();
        try {
            if (createTableEvent instanceof FlushEvent) {
                handleFlushEvent((FlushEvent) createTableEvent);
                return;
            }
            if (createTableEvent instanceof CreateTableEvent) {
                this.processedTableIds.add(createTableEvent.tableId());
                super.processElement(streamRecord);
                return;
            }
            ChangeEvent changeEvent = (ChangeEvent) createTableEvent;
            if (!this.processedTableIds.contains(changeEvent.tableId())) {
                emitLatestSchema(changeEvent.tableId());
                this.processedTableIds.add(changeEvent.tableId());
            }
            this.processedTableIds.add(changeEvent.tableId());
            super.processElement(streamRecord);
        } catch (Exception e) {
            throw new SinkWrapperException(createTableEvent, e);
        }
    }

    private void handleFlushEvent(FlushEvent flushEvent) throws Exception {
        this.userFunction.finish();
        if (flushEvent.getSchemaChangeEventType() != SchemaChangeEventType.CREATE_TABLE) {
            flushEvent.getTableIds().stream().filter(tableId -> {
                return !this.processedTableIds.contains(tableId);
            }).forEach(tableId2 -> {
                LOG.info("Table {} has not been processed", tableId2);
                try {
                    emitLatestSchema(tableId2);
                    this.processedTableIds.add(tableId2);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }
        this.schemaEvolutionClient.notifyFlushSuccess(getRuntimeContext().getIndexOfThisSubtask(), flushEvent.getSourceSubTaskId());
    }

    private void emitLatestSchema(TableId tableId) throws Exception {
        Optional<Schema> latestEvolvedSchema = this.schemaEvolutionClient.getLatestEvolvedSchema(tableId);
        if (!latestEvolvedSchema.isPresent()) {
            throw new RuntimeException("Could not find schema message from SchemaRegistry for " + tableId);
        }
        super.processElement(new StreamRecord(new CreateTableEvent(tableId, latestEvolvedSchema.get())));
        this.processedTableIds.add(tableId);
    }
}
