/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.node.etl.load.loader.db;

import com.alibaba.otter.node.common.config.ConfigClientService;
import com.alibaba.otter.node.etl.load.exception.LoadException;
import com.alibaba.otter.node.etl.load.loader.LoadStatsTracker;
import com.alibaba.otter.node.etl.load.loader.db.FileloadDumper;
import com.alibaba.otter.node.etl.load.loader.db.context.FileLoadContext;
import com.alibaba.otter.node.etl.load.loader.weight.WeightBuckets;
import com.alibaba.otter.node.etl.load.loader.weight.WeightController;
import com.alibaba.otter.shared.common.model.config.ConfigHelper;
import com.alibaba.otter.shared.common.model.config.channel.Channel;
import com.alibaba.otter.shared.common.model.config.data.DataMediaPair;
import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline;
import com.alibaba.otter.shared.common.utils.NioUtils;
import com.alibaba.otter.shared.common.utils.thread.NamedThreadFactory;
import com.alibaba.otter.shared.etl.model.FileBatch;
import com.alibaba.otter.shared.etl.model.FileData;
import com.alibaba.otter.shared.etl.model.Identity;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

public class FileLoadAction
implements InitializingBean,
DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(FileLoadAction.class);
    private int retry = 5;
    private ConfigClientService configClientService;
    private LoadStatsTracker loadStatsTracker;
    private boolean dump = true;
    private static final String WORKER_NAME = "FileLoadAction";
    private static final String WORKER_NAME_FORMAT = "pipelineId = %s , pipelineName = %s , FileLoadAction";
    private static final int DEFAULT_POOL_SIZE = 5;
    private int poolSize = 5;
    private ExecutorService executor;

    public FileLoadContext load(FileBatch fileBatch, File rootDir, WeightController controller) {
        if (!rootDir.exists()) {
            throw new LoadException(rootDir.getPath() + " is not exist");
        }
        FileLoadContext context = this.buildContext(fileBatch.getIdentity());
        context.setPrepareDatas(fileBatch.getFiles());
        boolean isDryRun = context.getPipeline().getParameters().isDryRun();
        try {
            WeightBuckets<FileData> buckets = this.buildWeightBuckets(fileBatch.getIdentity(), fileBatch.getFiles());
            List<Long> weights = buckets.weights();
            controller.start(weights);
            for (int i = 0; i < weights.size(); ++i) {
                Long weight = weights.get(i);
                controller.await(weight.intValue());
                if (logger.isInfoEnabled()) {
                    logger.debug("##start load for weight:{}\n", (Object)weight);
                }
                List<FileData> items = buckets.getItems(weight);
                if (context.getPipeline().getParameters().isDryRun().booleanValue()) {
                    this.dryRun(context, items, rootDir);
                } else {
                    this.moveFiles(context, items, rootDir);
                }
                controller.single(weight.intValue());
                if (!logger.isInfoEnabled()) continue;
                logger.debug("##end load for weight:{}\n", (Object)weight);
            }
            if (this.dump || isDryRun) {
                MDC.put((String)"load", (String)String.valueOf(fileBatch.getIdentity().getPipelineId()));
                logger.info(FileloadDumper.dumpContext("successed", context));
                MDC.remove((String)"load");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            if (this.dump || isDryRun) {
                MDC.put((String)"load", (String)String.valueOf(fileBatch.getIdentity().getPipelineId()));
                logger.info(FileloadDumper.dumpContext("error", context));
                MDC.remove((String)"load");
            }
        }
        catch (Exception e) {
            if (this.dump || isDryRun) {
                MDC.put((String)"load", (String)String.valueOf(fileBatch.getIdentity().getPipelineId()));
                logger.info(FileloadDumper.dumpContext("error", context));
                MDC.remove((String)"load");
            }
            throw new LoadException(e);
        }
        finally {
            NioUtils.delete((File)rootDir, (int)3);
        }
        return context;
    }

    private void adjustPoolSize(FileLoadContext context) {
        Pipeline pipeline = context.getPipeline();
        int newPoolSize = pipeline.getParameters().getFileLoadPoolSize();
        if (newPoolSize != this.poolSize) {
            this.poolSize = newPoolSize;
            if (this.executor instanceof ThreadPoolExecutor) {
                ThreadPoolExecutor pool = (ThreadPoolExecutor)this.executor;
                pool.setCorePoolSize(newPoolSize);
                pool.setMaximumPoolSize(newPoolSize);
            }
        }
    }

    private FileLoadContext buildContext(Identity identity) {
        FileLoadContext context = new FileLoadContext();
        context.setIdentity(identity);
        Channel channel = this.configClientService.findChannel(Long.valueOf(identity.getChannelId()));
        Pipeline pipeline = this.configClientService.findPipeline(Long.valueOf(identity.getPipelineId()));
        context.setChannel(channel);
        context.setPipeline(pipeline);
        return context;
    }

    private WeightBuckets<FileData> buildWeightBuckets(Identity identity, List<FileData> datas) {
        WeightBuckets<FileData> buckets = new WeightBuckets<FileData>();
        for (FileData data : datas) {
            DataMediaPair pair = ConfigHelper.findDataMediaPair((Pipeline)this.getPipeline(identity), (Long)data.getPairId());
            buckets.addItem(pair.getPushWeight(), data);
        }
        return buckets;
    }

    private Pipeline getPipeline(Identity identity) {
        return this.configClientService.findPipeline(Long.valueOf(identity.getPipelineId()));
    }

    private void dryRun(FileLoadContext context, List<FileData> fileDatas, File rootDir) {
        for (FileData fileData : fileDatas) {
            boolean isLocal = StringUtils.isBlank((String)fileData.getNameSpace());
            String entryName = null;
            entryName = true == isLocal ? FilenameUtils.getPath((String)fileData.getPath()) + FilenameUtils.getName((String)fileData.getPath()) : fileData.getNameSpace() + File.separator + fileData.getPath();
            File sourceFile = new File(rootDir, entryName);
            if (sourceFile.exists() && !sourceFile.isDirectory()) {
                if (!isLocal) {
                    throw new LoadException(fileData + " is not support!");
                }
                fileData.setSize(sourceFile.length());
                fileData.setLastModifiedTime(sourceFile.lastModified());
                context.getProcessedDatas().add(fileData);
                LoadStatsTracker.LoadCounter counter = this.loadStatsTracker.getStat(context.getIdentity()).getStat(fileData.getPairId());
                counter.getFileCount().incrementAndGet();
                counter.getFileSize().addAndGet(fileData.getSize());
                continue;
            }
            if (fileData.getEventType().isDelete()) {
                if (!isLocal) {
                    throw new LoadException(fileData + " is not support!");
                }
                context.getProcessedDatas().add(fileData);
                continue;
            }
            context.getFailedDatas().add(fileData);
        }
    }

    private void moveFiles(FileLoadContext context, List<FileData> fileDatas, File rootDir) {
        int cursor;
        Exception exception = null;
        this.adjustPoolSize(context);
        ExecutorCompletionService<Exception> executorComplition = new ExecutorCompletionService<Exception>(this.executor);
        ArrayList<Future<Exception>> results = new ArrayList<Future<Exception>>();
        for (FileData fileData : fileDatas) {
            Future<Exception> future = executorComplition.submit(new FileLoadWorker(context, rootDir, fileData));
            results.add(future);
            if (!future.isDone()) continue;
            try {
                exception = future.get();
            }
            catch (Exception exception2) {
                exception = exception2;
            }
            if (exception == null) continue;
            for (Future future2 : results) {
                if (future2.isDone() || future2.isCancelled()) continue;
                future2.cancel(true);
            }
            throw exception instanceof LoadException ? (LoadException)((Object)exception) : new LoadException(exception);
        }
        int resultSize = results.size();
        for (cursor = 0; cursor < resultSize; ++cursor) {
            try {
                Future result = executorComplition.take();
                exception = (Exception)result.get();
                continue;
            }
            catch (Exception e) {
                exception = e;
                break;
            }
        }
        if (cursor != resultSize) {
            for (Future future : results) {
                if (future.isDone() || future.isCancelled()) continue;
                future.cancel(true);
            }
        }
        if (exception != null) {
            throw exception instanceof LoadException ? (LoadException)((Object)exception) : new LoadException(exception);
        }
    }

    private void doMove(FileLoadContext context, File rootDir, FileData fileData) throws IOException {
        boolean isLocal = StringUtils.isBlank((String)fileData.getNameSpace());
        String entryName = null;
        entryName = true == isLocal ? FilenameUtils.getPath((String)fileData.getPath()) + FilenameUtils.getName((String)fileData.getPath()) : fileData.getNameSpace() + File.separator + fileData.getPath();
        File sourceFile = new File(rootDir, entryName);
        if (sourceFile.exists() && !sourceFile.isDirectory()) {
            if (!isLocal) {
                throw new LoadException(fileData + " is not support!");
            }
            File targetFile = new File(fileData.getPath());
            NioUtils.copy((File)sourceFile, (File)targetFile, (int)this.retry);
            if (!targetFile.exists()) {
                throw new LoadException(String.format("copy/rename [%s] to [%s] failed by unknow reason", sourceFile.getPath(), targetFile.getPath()));
            }
            fileData.setSize(sourceFile.length());
            fileData.setLastModifiedTime(sourceFile.lastModified());
            context.getProcessedDatas().add(fileData);
            LoadStatsTracker.LoadCounter counter = this.loadStatsTracker.getStat(context.getIdentity()).getStat(fileData.getPairId());
            counter.getFileCount().incrementAndGet();
            counter.getFileSize().addAndGet(fileData.getSize());
        } else if (fileData.getEventType().isDelete()) {
            if (!isLocal) {
                throw new LoadException(fileData + " is not support!");
            }
            File targetFile = new File(fileData.getPath());
            if (NioUtils.delete((File)targetFile, (int)this.retry)) {
                context.getProcessedDatas().add(fileData);
            } else {
                context.getFailedDatas().add(fileData);
            }
        } else {
            context.getFailedDatas().add(fileData);
        }
    }

    public void afterPropertiesSet() throws Exception {
        this.executor = new ThreadPoolExecutor(this.poolSize, this.poolSize, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(this.poolSize * 4), (ThreadFactory)new NamedThreadFactory(WORKER_NAME), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void destroy() throws Exception {
        this.executor.shutdownNow();
    }

    public void setConfigClientService(ConfigClientService configClientService) {
        this.configClientService = configClientService;
    }

    public void setLoadStatsTracker(LoadStatsTracker loadStatsTracker) {
        this.loadStatsTracker = loadStatsTracker;
    }

    public void setRetry(int retry) {
        this.retry = retry;
    }

    public void setDump(boolean dump) {
        this.dump = dump;
    }

    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    private class FileLoadWorker
    implements Callable<Exception> {
        private FileLoadContext context;
        private File rootDir;
        private FileData fileData;

        public FileLoadWorker(FileLoadContext context, File rootDir, FileData fileData) {
            this.context = context;
            this.rootDir = rootDir;
            this.fileData = fileData;
        }

        @Override
        public Exception call() throws Exception {
            Thread.currentThread().setName(String.format(FileLoadAction.WORKER_NAME_FORMAT, this.context.getPipeline().getId(), this.context.getPipeline().getName()));
            try {
                MDC.put((String)"otter", (String)String.valueOf(this.context.getPipeline().getId()));
                if (this.fileData == null) {
                    Exception exception = null;
                    return exception;
                }
                int count = 0;
                Exception exception = null;
                while (count++ < FileLoadAction.this.retry) {
                    try {
                        FileLoadAction.this.doMove(this.context, this.rootDir, this.fileData);
                        Exception exception2 = null;
                        return exception2;
                    }
                    catch (Exception e) {
                        exception = e;
                        if (count >= FileLoadAction.this.retry) continue;
                        Thread.sleep(50L);
                    }
                }
                throw new LoadException(String.format("FileLoadWorker is error! createFile failed[%s]", this.fileData.getPath()), exception);
            }
            finally {
                MDC.remove((String)"otter");
            }
        }
    }
}

