/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.node.etl.extract.extractor;

import com.alibaba.otter.node.etl.common.datasource.DataSourceService;
import com.alibaba.otter.node.etl.extract.exceptions.ExtractException;
import com.alibaba.otter.node.etl.extract.extractor.AbstractExtractor;
import com.alibaba.otter.shared.common.model.config.ConfigHelper;
import com.alibaba.otter.shared.common.model.config.data.DataMedia;
import com.alibaba.otter.shared.common.model.config.data.DataMediaPair;
import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline;
import com.alibaba.otter.shared.common.utils.extension.ExtensionFactory;
import com.alibaba.otter.shared.common.utils.thread.ExecutorTemplate;
import com.alibaba.otter.shared.common.utils.thread.ExecutorTemplateGetter;
import com.alibaba.otter.shared.etl.extend.processor.EventProcessor;
import com.alibaba.otter.shared.etl.extend.processor.support.DataSourceFetcher;
import com.alibaba.otter.shared.etl.extend.processor.support.DataSourceFetcherAware;
import com.alibaba.otter.shared.etl.model.DbBatch;
import com.alibaba.otter.shared.etl.model.EventData;
import com.alibaba.otter.shared.etl.model.RowBatch;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.sql.DataSource;
import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;

public class ProcessorExtractor
extends AbstractExtractor<DbBatch> {
    private ExtensionFactory extensionFactory;
    private DataSourceService dataSourceService;
    private ExecutorTemplateGetter executorTemplateGetter;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void extract(DbBatch param) throws ExtractException {
        ExecutorTemplate executorTemplate = null;
        try {
            RowBatch rowBatch = param.getRowBatch();
            final Pipeline pipeline = this.getPipeline(rowBatch.getIdentity().getPipelineId());
            List eventDatas = rowBatch.getDatas();
            final Set<EventData> removeDatas = Collections.synchronizedSet(new HashSet());
            executorTemplate = this.executorTemplateGetter.get();
            executorTemplate.start();
            executorTemplate.adjustPoolSize(pipeline.getParameters().getExtractPoolSize().intValue());
            block2: for (final EventData eventData : eventDatas) {
                List dataMediaPairs = ConfigHelper.findDataMediaPairByMediaId((Pipeline)pipeline, (Long)eventData.getTableId());
                if (dataMediaPairs == null) {
                    throw new ExtractException("ERROR ## the dataMediaId = " + eventData.getTableId() + " dataMediaPair is null,please check");
                }
                for (DataMediaPair dataMediaPair : dataMediaPairs) {
                    if (!dataMediaPair.isExistFilter()) continue;
                    final EventProcessor eventProcessor = (EventProcessor)this.extensionFactory.getExtension(EventProcessor.class, dataMediaPair.getFilterData());
                    if (eventProcessor instanceof DataSourceFetcherAware) {
                        ((DataSourceFetcherAware)eventProcessor).setDataSourceFetcher(new DataSourceFetcher(){

                            public DataSource fetch(Long tableId) {
                                DataMedia dataMedia = ConfigHelper.findDataMedia((Pipeline)pipeline, (Long)tableId);
                                return (DataSource)ProcessorExtractor.this.dataSourceService.getDataSource(pipeline.getId(), dataMedia.getSource());
                            }
                        });
                        executorTemplate.submit(new Runnable(){

                            @Override
                            public void run() {
                                MDC.put((String)"otter", (String)String.valueOf(pipeline.getId()));
                                boolean process = eventProcessor.process(eventData);
                                if (!process) {
                                    removeDatas.add(eventData);
                                }
                            }
                        });
                        continue;
                    }
                    boolean process = eventProcessor.process(eventData);
                    if (process) continue;
                    removeDatas.add(eventData);
                    continue block2;
                }
            }
            executorTemplate.waitForResult();
            if (!CollectionUtils.isEmpty(removeDatas)) {
                eventDatas.removeAll(removeDatas);
            }
            if (executorTemplate == null) return;
        }
        catch (Throwable throwable) {
            if (executorTemplate == null) throw throwable;
            this.executorTemplateGetter.release(executorTemplate);
            throw throwable;
        }
        this.executorTemplateGetter.release(executorTemplate);
    }

    public void setExtensionFactory(ExtensionFactory extensionFactory) {
        this.extensionFactory = extensionFactory;
    }

    public void setDataSourceService(DataSourceService dataSourceService) {
        this.dataSourceService = dataSourceService;
    }

    public void setExecutorTemplateGetter(ExecutorTemplateGetter executorTemplateGetter) {
        this.executorTemplateGetter = executorTemplateGetter;
    }
}

