package com.ververica.cdc.composer.flink;

import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.factories.DataSinkFactory;
import com.ververica.cdc.common.factories.FactoryHelper;
import com.ververica.cdc.common.pipeline.PipelineOptions;
import com.ververica.cdc.common.pipeline.SchemaChangeBehavior;
import com.ververica.cdc.common.sink.DataSink;
import com.ververica.cdc.composer.PipelineComposer;
import com.ververica.cdc.composer.PipelineExecution;
import com.ververica.cdc.composer.definition.PipelineDef;
import com.ververica.cdc.composer.definition.SinkDef;
import com.ververica.cdc.composer.flink.coordination.OperatorIDGenerator;
import com.ververica.cdc.composer.flink.translator.DataSinkTranslator;
import com.ververica.cdc.composer.flink.translator.DataSourceTranslator;
import com.ververica.cdc.composer.flink.translator.PartitioningTranslator;
import com.ververica.cdc.composer.flink.translator.RouteTranslator;
import com.ververica.cdc.composer.flink.translator.SchemaOperatorTranslator;
import com.ververica.cdc.composer.utils.FactoryDiscoveryUtils;
import com.ververica.cdc.runtime.serializer.event.EventSerializer;
import java.net.URI;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

@Internal
/* loaded from: input_file:com/ververica/cdc/composer/flink/FlinkPipelineComposer.class */
public class FlinkPipelineComposer implements PipelineComposer {
    private final StreamExecutionEnvironment env;
    private final boolean isBlocking;

    public static FlinkPipelineComposer ofRemoteCluster(Configuration configuration, List<Path> list) {
        Configuration configuration2 = new Configuration();
        configuration2.set(DeploymentOptions.TARGET, "remote");
        configuration2.addAll(configuration);
        StreamExecutionEnvironment streamExecutionEnvironment = new StreamExecutionEnvironment(configuration2);
        list.forEach(path -> {
            try {
                FlinkEnvironmentUtils.addJar(streamExecutionEnvironment, path.toUri().toURL());
            } catch (Exception e) {
                throw new RuntimeException(String.format("Unable to convert JAR path \"%s\" to URL when adding JAR to Flink environment", path), e);
            }
        });
        return new FlinkPipelineComposer(streamExecutionEnvironment, false);
    }

    public static FlinkPipelineComposer ofMiniCluster() {
        return new FlinkPipelineComposer(StreamExecutionEnvironment.getExecutionEnvironment(), true);
    }

    private FlinkPipelineComposer(StreamExecutionEnvironment streamExecutionEnvironment, boolean z) {
        this.env = streamExecutionEnvironment;
        this.isBlocking = z;
    }

    @Override // com.ververica.cdc.composer.PipelineComposer
    public PipelineExecution compose(PipelineDef pipelineDef) {
        int intValue = ((Integer) pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM)).intValue();
        this.env.getConfig().setParallelism(intValue);
        DataStream<Event> translate = new RouteTranslator().translate(new DataSourceTranslator().translate(pipelineDef.getSource(), this.env, pipelineDef.getConfig()), pipelineDef.getRoute());
        DataSink createDataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig());
        SchemaOperatorTranslator schemaOperatorTranslator = new SchemaOperatorTranslator((SchemaChangeBehavior) pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR), (String) pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID));
        DataStream<Event> translate2 = schemaOperatorTranslator.translate(translate, intValue, createDataSink.getMetadataApplier());
        OperatorIDGenerator operatorIDGenerator = new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
        new DataSinkTranslator().translate(pipelineDef.getSink(), new PartitioningTranslator().translate(translate2, intValue, intValue, operatorIDGenerator.generate()), createDataSink, operatorIDGenerator.generate());
        addFrameworkJars();
        return new FlinkPipelineExecution(this.env, (String) pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), this.isBlocking);
    }

    private DataSink createDataSink(SinkDef sinkDef, com.ververica.cdc.common.configuration.Configuration configuration) {
        DataSinkFactory factoryByIdentifier = FactoryDiscoveryUtils.getFactoryByIdentifier(sinkDef.getType(), DataSinkFactory.class);
        FactoryDiscoveryUtils.getJarPathByIdentifier(sinkDef.getType(), DataSinkFactory.class).ifPresent(url -> {
            FlinkEnvironmentUtils.addJar(this.env, url);
        });
        return factoryByIdentifier.createDataSink(new FactoryHelper.DefaultContext(sinkDef.getConfig(), configuration, Thread.currentThread().getContextClassLoader()));
    }

    private void addFrameworkJars() {
        try {
            HashSet hashSet = new HashSet();
            Optional<URL> containingJar = getContainingJar(Event.class);
            if (containingJar.isPresent()) {
                hashSet.add(containingJar.get().toURI());
            }
            Optional<URL> containingJar2 = getContainingJar(EventSerializer.class);
            if (containingJar2.isPresent()) {
                hashSet.add(containingJar2.get().toURI());
            }
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                FlinkEnvironmentUtils.addJar(this.env, ((URI) it.next()).toURL());
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to search and add Flink CDC framework JARs", e);
        }
    }

    private Optional<URL> getContainingJar(Class<?> cls) throws Exception {
        URL location = cls.getProtectionDomain().getCodeSource().getLocation();
        return Files.isDirectory(Paths.get(location.toURI()), new LinkOption[0]) ? Optional.empty() : Optional.of(location);
    }
}
