package org.irenical.dumpy.impl.stream;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.irenical.dumpy.DumpyThreadFactory;
import org.irenical.dumpy.api.IExtractor;
import org.irenical.dumpy.api.IJob;
import org.irenical.dumpy.api.ILoader;
import org.irenical.dumpy.api.IStream;
import org.irenical.dumpy.api.IStreamProcessor;
import org.irenical.dumpy.impl.ExecutorTerminator;
import org.irenical.dumpy.impl.LoaderResponseHandler;
import org.irenical.dumpy.impl.db.DumpyDB;
import org.irenical.dumpy.impl.model.DumpyBlockingQueue;
import org.irenical.dumpy.impl.model.PaginatedResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/irenical/dumpy/impl/stream/ErrorStreamProcessor.class */
public class ErrorStreamProcessor implements IStreamProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(ErrorStreamProcessor.class);
    private final DumpyDB dumpyDB;
    private boolean isRunning = false;
    private final ExecutorService loaderResponseExecutor = Executors.newCachedThreadPool(new DumpyThreadFactory());

    public ErrorStreamProcessor(DumpyDB dumpyDB) {
        this.dumpyDB = dumpyDB;
    }

    public <ERROR extends Exception> void start() throws Exception {
        this.isRunning = true;
    }

    public void stop() throws Exception {
        this.isRunning = false;
        ExecutorTerminator.terminate(10L, Long.MAX_VALUE, this.loaderResponseExecutor);
    }

    public <ERROR extends Exception> boolean isRunning() throws Exception {
        return this.isRunning && this.dumpyDB.isRunning() && !this.loaderResponseExecutor.isTerminated();
    }

    @Override // org.irenical.dumpy.api.IStreamProcessor
    public <TYPE, ERROR extends Exception> void process(IJob iJob, IStream<TYPE, ERROR> iStream) throws Exception {
        LOGGER.debug("[ processor( " + iStream.getCode() + " ) ] stream start");
        IExtractor<TYPE, ERROR> extractor = iStream.getExtractor();
        ILoader<TYPE> loader = iStream.getLoader();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new DumpyBlockingQueue(1000), new DumpyThreadFactory());
        String str = null;
        boolean z = true;
        while (isRunning() && z) {
            try {
                PaginatedResponse<String> paginatedResponse = this.dumpyDB.get(iJob.getCode(), iStream.getCode(), str);
                List<String> values = paginatedResponse.getValues();
                if (values != null && !values.isEmpty()) {
                    String str2 = null;
                    boolean z2 = true;
                    while (isRunning() && z2) {
                        IExtractor.Response<TYPE> response = extractor.get(paginatedResponse.values, str2);
                        List<IExtractor.Entity<TYPE>> values2 = response.getValues();
                        if (isRunning() && values2 != null && !values2.isEmpty()) {
                            this.loaderResponseExecutor.execute(new LoaderResponseHandler(this.dumpyDB, iJob, iStream, threadPoolExecutor.submit(() -> {
                                return loader.load(values2);
                            }), new LinkedList(values2)));
                        }
                        str2 = response.getCursor();
                        z2 = response.hasNext();
                    }
                }
                str = paginatedResponse.cursor;
                z = paginatedResponse.hasNext;
                if (values == null || values.isEmpty()) {
                    Thread.sleep(1000L);
                }
            } finally {
                ExecutorTerminator.terminate(10L, Long.MAX_VALUE, threadPoolExecutor);
                LOGGER.debug("[ processor( " + iStream.getCode() + " ) ] stream done");
            }
        }
    }
}
