package com.github.dataprocessor;

import com.github.dataprocessor.provider.Page;
import com.github.dataprocessor.slice.DefaultSliceParser;
import com.github.dataprocessor.slice.DefaultSliceRecorder;
import com.github.dataprocessor.slice.Slice;
import com.github.dataprocessor.slice.SliceParser;
import com.github.dataprocessor.slice.SliceRecorder;
import com.github.dataprocessor.threadpool.DefaultThreadPoolFactory;
import com.github.dataprocessor.threadpool.ThreadPoolFactory;
import com.github.dataprocessor.util.RetryUtil;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/dataprocessor/DataProcessorTemplate.class */
public abstract class DataProcessorTemplate<T, S> implements DataProcessor {
    private static final Logger logger = LoggerFactory.getLogger(DataProcessorTemplate.class);
    private static final String THREAD_NAME = "processor";
    private static final int DEFAULT_SLICES_THREAD_NUM = 8;
    private static final int DEFAULT_NUM_PER_BATCH = 1000;
    private int state;
    private AtomicLong counter;
    private int slicesThreadNum;
    private SliceParser<S> sliceParser;
    private SliceRecorder<S> sliceRecorder;
    private int numPerBatch;
    private long launchInterval;
    private ThreadPoolFactory threadPoolFactory;
    private int retryTime;
    private boolean retryNullable;

    public DataProcessorTemplate(ThreadPoolFactory threadPoolFactory, SliceParser<S> sliceParser, SliceRecorder<S> sliceRecorder, int i, int i2) {
        this.counter = new AtomicLong();
        this.launchInterval = 3000L;
        this.retryTime = 3;
        this.retryNullable = true;
        this.threadPoolFactory = threadPoolFactory;
        this.sliceParser = sliceParser;
        this.sliceRecorder = sliceRecorder;
        this.numPerBatch = i;
        this.slicesThreadNum = i2;
    }

    public DataProcessorTemplate(int i) {
        this(new DefaultSliceParser(), i, DEFAULT_SLICES_THREAD_NUM);
    }

    public DataProcessorTemplate(int i, int i2) {
        this(new DefaultSliceParser(), i, i2);
    }

    public DataProcessorTemplate(DefaultSliceParser<S> defaultSliceParser, int i, int i2) {
        this(new DefaultThreadPoolFactory(), defaultSliceParser, new DefaultSliceRecorder(defaultSliceParser), i, i2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DataProcessorTemplate() {
        this(new DefaultSliceParser(), DEFAULT_NUM_PER_BATCH, DEFAULT_SLICES_THREAD_NUM);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DataProcessorTemplate(String str) {
        this(DEFAULT_NUM_PER_BATCH, DEFAULT_SLICES_THREAD_NUM);
        this.sliceParser = new DefaultSliceParser();
        this.sliceRecorder = new DefaultSliceRecorder(this.sliceParser, str);
        this.threadPoolFactory = new DefaultThreadPoolFactory();
        this.numPerBatch = DEFAULT_NUM_PER_BATCH;
        this.slicesThreadNum = DEFAULT_SLICES_THREAD_NUM;
    }

    protected abstract Set<Slice<S>> generateSlices();

    protected abstract Page<T> getResources(Slice<S> slice, Page<T> page) throws Exception;

    protected abstract Callable<?> createTask(List<T> list);

    @Override // com.github.dataprocessor.DataProcessor
    public boolean process() {
        runState();
        try {
            return launchSlices(generateSlices());
        } finally {
            this.state = 0;
        }
    }

    private boolean launchSlices(Set<Slice<S>> set) {
        long currentTimeMillis = System.currentTimeMillis();
        this.sliceRecorder.clearRecord();
        this.counter.set(0L);
        Set<Slice<S>> launchBySliceTasks = launchBySliceTasks(set);
        if (!launchBySliceTasks.isEmpty()) {
            logger.info("有{}个分片失败了，尝试重新处理失败的分片: {}", Integer.valueOf(launchBySliceTasks.size()), launchBySliceTasks);
            launchBySliceTasks = launchBySliceTasks(launchBySliceTasks);
        }
        if (launchBySliceTasks.isEmpty()) {
            logger.info("数据处理任务全部完成，总量:{}, 共耗时:{}", Long.valueOf(this.counter.get()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            return true;
        }
        logger.warn("数据处理任务执行结束但有执行失败的分片{}个，总量:{}, 共耗时:{}，", new Object[]{Integer.valueOf(launchBySliceTasks.size()), Long.valueOf(this.counter.get()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
        logger.warn("出错的分片: " + launchBySliceTasks);
        return false;
    }

    @Override // com.github.dataprocessor.DataProcessor
    public boolean processErrorSlices() {
        long currentTimeMillis = System.currentTimeMillis();
        runState();
        logger.info("开始处理失败的分片");
        try {
            this.counter.set(0L);
            Set<Slice<S>> errorSlices = this.sliceRecorder.getErrorSlices();
            if (errorSlices == null || errorSlices.isEmpty()) {
                logger.info("没有获取到需要重新处理的错误分片");
                this.state = 0;
                return true;
            }
            errorSlices.removeAll(this.sliceRecorder.getCompletedSlices());
            if (errorSlices.isEmpty()) {
                logger.info("有获取到{}个失败的分片，但之前已全部处理完成");
                this.state = 0;
                return true;
            }
            logger.info("共获取到 {} 个处理失败的分片，现在开始处理", Integer.valueOf(errorSlices.size()));
            Set<Slice<S>> launchBySliceTasks = launchBySliceTasks(errorSlices);
            if (launchBySliceTasks.isEmpty()) {
                logger.info("失败的分片重新处理完毕，总量: {}, 耗时: {}", Integer.valueOf(errorSlices.size()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                this.state = 0;
                return true;
            }
            logger.info("失败的分片重新处理完毕但有再次失败的分片{}个，总错误分片数量: {}, 耗时: {}", new Object[]{Integer.valueOf(launchBySliceTasks.size()), Integer.valueOf(errorSlices.size()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
            logger.warn("再次出错的分片: " + launchBySliceTasks);
            this.state = 0;
            return false;
        } catch (Throwable th) {
            this.state = 0;
            throw th;
        }
    }

    @Override // com.github.dataprocessor.DataProcessor
    public boolean resumeProgress() {
        runState();
        try {
            Set<Slice<S>> allSlices = this.sliceRecorder.getAllSlices();
            if (allSlices == null || allSlices.isEmpty()) {
                logger.warn("没有读取到上次执行的分片记录，无法恢复上次的未完成任务，请重新进行全量处理");
                throw new IllegalStateException("没有读取到上次执行的分片记录，无法恢复上次的未完成任务，请重新进行全量处理");
            }
            Set<Slice<S>> completedSlices = this.sliceRecorder.getCompletedSlices();
            if (completedSlices == null || completedSlices.isEmpty()) {
                logger.warn("没有读取已完成的分片，无法恢复上次的未完成任务，请重新进行全量处理");
                throw new IllegalStateException("没有读取已完成的分片，无法恢复上次的未完成任务，请重新进行全量处理");
            }
            if (!allSlices.removeAll(completedSlices)) {
                logger.warn("上次记录的时间分片与已完成的分片无法取差集，请确认用于分片的类型是否实现了equals和hashCode方法，且上次的记录没有被篡改");
                throw new IllegalStateException("上次记录的时间分片与已完成的分片无法取差集，请确认用于分片的类型是否实现了equals和hashCode方法，且上次的记录没有被篡改");
            }
            logger.info("开始恢复上次未完成的任务");
            launchSlices(allSlices);
            logger.info("恢复上次未完成的任务结束");
            logger.info("尝试处理错误数据，若没有错误数据需要处理，则为全部成功");
            return processErrorSlices();
        } finally {
            this.state = 0;
        }
    }

    private Set<Slice<S>> launchBySliceTasks(Set<Slice<S>> set) {
        if (set == null || set.isEmpty()) {
            logger.warn("没有需要执行的分片");
            return Collections.emptySet();
        }
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        this.sliceRecorder.saveAllSlices(set);
        try {
            logger.info("分片任务开始启动，同时开始分片数:{}, 共有{}个分片需要处理", Integer.valueOf(this.slicesThreadNum), Integer.valueOf(set.size()));
        } catch (InterruptedException e) {
            logger.error("分片任务启动发生异常", e);
            Thread.currentThread().interrupt();
        }
        if (set.size() == 1) {
            launchSlice(linkedHashSet, set.iterator().next());
            return linkedHashSet;
        }
        ExecutorService createThreadPool = this.threadPoolFactory.createThreadPool(this.slicesThreadNum, "processor-sliceLauncher");
        for (Slice<S> slice : set) {
            if (slice != null) {
                createThreadPool.execute(() -> {
                    launchSlice(linkedHashSet, slice);
                });
                Thread.sleep(this.launchInterval);
            }
        }
        logger.info("分片任务启动完成，等待执行");
        createThreadPool.shutdown();
        createThreadPool.awaitTermination(7L, TimeUnit.DAYS);
        logger.info("分片任务执行完成，总量: " + this.counter.get());
        return linkedHashSet;
    }

    private void launchSlice(Set<Slice<S>> set, Slice<S> slice) {
        boolean z = false;
        try {
            z = processBySlice(slice);
        } catch (InterruptedException e) {
            logger.error("处理批次发生异常, 分片: " + slice, e);
        }
        if (z) {
            logger.info("分片任务 {} 完成, 当前处理总数: {}", slice.toString(), Long.valueOf(this.counter.get()));
            this.sliceRecorder.saveCompletedSlice(slice);
        } else {
            logger.info("当前时间分片处理失败: " + slice);
            this.sliceRecorder.saveErrorSlice(slice);
            set.add(slice);
        }
    }

    private boolean processBySlice(Slice<S> slice) throws InterruptedException {
        Page page;
        long currentTimeMillis = System.currentTimeMillis();
        long j = 0;
        LinkedList linkedList = new LinkedList();
        ExecutorService executorService = null;
        Page page2 = null;
        do {
            logger.info("从来源获取需要处理的资源开始");
            Page page3 = page2;
            try {
                page = (Page) RetryUtil.retryCall(() -> {
                    return getResources(slice, page3);
                }, this.retryTime, false);
                if (page == null) {
                    logger.info("分页获取到null值，认为本分片处理失败:" + slice);
                    return false;
                }
                List<T> data = page.getData();
                if (data == null || data.size() <= 0) {
                    logger.info("查无数据，认为本批次数据已全部获取完成");
                    break;
                }
                logger.info("从来源获取需要处理的资源结束，数据量: " + data.size());
                if (useSingleThread(j, page.isHasNext(), data.size())) {
                    try {
                        logger.debug("使用单线线程执行分批导入任务, count:{}, hasNext:{}, srcSize:{}", new Object[]{Long.valueOf(j), Boolean.valueOf(page.isHasNext()), Integer.valueOf(data.size())});
                        RetryUtil.retryCall(createTask(data), this.retryTime, this.retryNullable);
                    } catch (Exception e) {
                        logger.error("分片任务执行有异常，本分片处理失败: " + slice, e);
                        return false;
                    }
                } else {
                    if (executorService == null) {
                        int desiredThreadNum = desiredThreadNum() / this.slicesThreadNum;
                        logger.info("创建分批处理线程池,线程数量: " + desiredThreadNum);
                        executorService = this.threadPoolFactory.createThreadPool(desiredThreadNum, "processor-" + slice.getBegin() + "-" + slice.getEnd());
                    }
                    linkedList.addAll(execTask(executorService, data));
                }
                page2 = page;
                j += data.size();
            } catch (Exception e2) {
                logger.error("分片任务执行有异常，本分片处理失败: " + slice, e2);
                return false;
            }
        } while (page.isHasNext());
        logger.info("本批次 {} 任务同步启动，等待执行", slice);
        if (executorService != null) {
            executorService.shutdown();
            executorService.awaitTermination(1L, TimeUnit.HOURS);
        }
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            try {
                Object obj = ((Future) it.next()).get();
                if (obj == null || Objects.equals(obj, false)) {
                    return false;
                }
            } catch (Exception e3) {
                logger.error("分片任务执行有异常，本分片处理失败: " + slice, e3);
                return false;
            }
        }
        logger.info("批次 {} 处理完成，共处理 {} 条数据，耗时: {}", new Object[]{slice, Long.valueOf(j), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
        this.counter.addAndGet(j);
        return true;
    }

    private List<Future> execTask(ExecutorService executorService, List<T> list) {
        if (list.size() <= this.numPerBatch) {
            return Collections.singletonList(submitRetryTask(executorService, createTask(list)));
        }
        LinkedList linkedList = new LinkedList();
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 >= list.size()) {
                return linkedList;
            }
            int i3 = i2 + this.numPerBatch;
            linkedList.add(submitRetryTask(executorService, createTask(list.subList(i2, i3 > list.size() ? list.size() : i3))));
            i = i2 + this.numPerBatch;
        }
    }

    private Future submitRetryTask(ExecutorService executorService, Callable<?> callable) {
        return executorService.submit(() -> {
            return RetryUtil.retryCall(callable, this.retryTime, this.retryNullable);
        });
    }

    private void runState() {
        ensureState();
        this.state = 1;
    }

    private boolean useSingleThread(long j, boolean z, int i) {
        return desiredThreadNum() / this.slicesThreadNum <= 1 || (j == 0 && !z && i <= this.numPerBatch);
    }

    private int desiredThreadNum() {
        return (Runtime.getRuntime().availableProcessors() * 2) + 1;
    }

    public void setSlicesThreadNum(int i) {
        requirePositive(i, "分片任务执行线程数必须大于0, slicesThreadNum:");
        ensureState();
        this.slicesThreadNum = i;
    }

    public void setSliceRecorder(SliceRecorder<S> sliceRecorder) {
        ensureState();
        this.sliceRecorder = sliceRecorder;
    }

    public void setNumPerBatch(int i) {
        requirePositive(i, "每批次的数量必须大于0，numPerBatch:");
        ensureState();
        this.numPerBatch = i;
    }

    private void requirePositive(int i, String str) {
        if (i <= 0) {
            throw new IllegalArgumentException(str + i);
        }
    }

    private void requireNotNegative(long j, String str) {
        if (j < 0) {
            throw new IllegalArgumentException(str + j);
        }
    }

    public void setSliceParser(SliceParser<S> sliceParser) {
        ensureState();
        this.sliceParser = sliceParser;
    }

    public void setLaunchInterval(long j) {
        requireNotNegative(j, "启动间隔不能为负数，launchInterval:");
        ensureState();
        this.launchInterval = j;
    }

    public void setThreadPoolFactory(ThreadPoolFactory threadPoolFactory) {
        if (threadPoolFactory == null) {
            throw new NullPointerException("线程池工厂不能为空");
        }
        ensureState();
        this.threadPoolFactory = threadPoolFactory;
    }

    public void setRetryTime(int i) {
        requireNotNegative(i, "重试次数不能为负数: retryTime:");
        ensureState();
        this.retryTime = i;
    }

    public void setRetryNullable(boolean z) {
        ensureState();
        this.retryNullable = z;
    }

    private void ensureState() {
        if (this.state != 0) {
            throw new ConcurrentModificationException("当前有任务正在执行");
        }
    }

    public long getLaunchInterval() {
        return this.launchInterval;
    }

    public int getNumPerBatch() {
        return this.numPerBatch;
    }

    public SliceRecorder<S> getSliceRecorder() {
        return this.sliceRecorder;
    }

    public int getSlicesThreadNum() {
        return this.slicesThreadNum;
    }

    public SliceParser<S> getSliceParser() {
        return this.sliceParser;
    }

    public ThreadPoolFactory getThreadPoolFactory() {
        return this.threadPoolFactory;
    }

    public int getRetryTime() {
        return this.retryTime;
    }

    public boolean isRetryNullable() {
        return this.retryNullable;
    }
}
