package net.sf.filePiper.model;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import org.apache.log4j.Logger;

/* loaded from: input_file:net/sf/filePiper/model/ProcessorThread.class */
public class ProcessorThread extends Thread implements FileProcessorEnvironment, PipeComponent {
    private static Logger log = Logger.getLogger(ProcessorThread.class);
    private FileProcessor processor;
    private InputStream is;
    private InputFileInfo inputInfo;
    private Pipeline line;
    private PipeComponent nextComponent;
    private byte[] consumeBuffer;
    private PipelineEnvironment mainReporting;
    private boolean thisShouldContinue;

    public ProcessorThread(FileProcessor fileProcessor, Pipeline pipeline, PipeComponent pipeComponent, PipelineEnvironment pipelineEnvironment) {
        super(fileProcessor.getProcessorName());
        this.consumeBuffer = new byte[1024];
        this.line = pipeline;
        this.processor = fileProcessor;
        this.nextComponent = pipeComponent;
        this.mainReporting = pipelineEnvironment;
        this.thisShouldContinue = true;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            this.processor.startBatch(this);
        } catch (IOException e) {
            log.error("Error in processor " + this.processor + " startBatch(..) method", e);
            this.mainReporting.finished(e);
            this.thisShouldContinue = false;
        }
        while (shouldContinue() && this.thisShouldContinue) {
            synchronized (this) {
                while (shouldContinue() && this.thisShouldContinue && this.is == null) {
                    try {
                        wait();
                    } catch (Exception e2) {
                        log.warn("Wait interrupted by exception", e2);
                    }
                }
            }
            if (this.is != null && shouldContinue()) {
                try {
                    try {
                        this.processor.process(this.is, this.inputInfo, this);
                        releaseInputStream();
                    } catch (Exception e3) {
                        log.error("Error in processor " + this.processor + " for input: " + this.inputInfo, e3);
                        this.mainReporting.finished(e3);
                        this.thisShouldContinue = false;
                        releaseInputStream();
                    }
                } catch (Throwable th) {
                    releaseInputStream();
                    throw th;
                }
            }
        }
        this.thisShouldContinue = false;
        try {
            this.processor.endBatch(this);
        } catch (IOException e4) {
            log.error("Error in processor " + this.processor + " batch end", e4);
            this.mainReporting.finished(e4);
        }
        try {
            this.nextComponent.finished();
        } catch (IOException e5) {
            log.error("Error in processor " + this.nextComponent + " when finished signal forwarded from " + this, e5);
            this.mainReporting.finished(e5);
        }
        if (log.isDebugEnabled()) {
            log.debug("End of Thread " + this);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:10:0x0017, code lost:
    
        r4.is.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:16:0x0021, code lost:
    
        r5 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:17:0x0022, code lost:
    
        net.sf.filePiper.model.ProcessorThread.log.warn("Failed to close input steam in processor " + r4.processor + " for input: " + r4.inputInfo, r5);
     */
    /* JADX WARN: Code restructure failed: missing block: B:2:0x0004, code lost:
    
        if (r4.is != null) goto L14;
     */
    /* JADX WARN: Code restructure failed: missing block: B:8:0x0014, code lost:
    
        if (r4.is.read(r4.consumeBuffer) > 0) goto L17;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private synchronized void releaseInputStream() {
        /*
            r4 = this;
            r0 = r4
            java.io.InputStream r0 = r0.is
            if (r0 == 0) goto L7e
        L7:
            r0 = r4
            java.io.InputStream r0 = r0.is     // Catch: java.lang.Exception -> L21
            r1 = r4
            byte[] r1 = r1.consumeBuffer     // Catch: java.lang.Exception -> L21
            int r0 = r0.read(r1)     // Catch: java.lang.Exception -> L21
            r5 = r0
            r0 = r5
            if (r0 > 0) goto L7
            r0 = r4
            java.io.InputStream r0 = r0.is     // Catch: java.lang.Exception -> L21
            r0.close()     // Catch: java.lang.Exception -> L21
            goto L4b
        L21:
            r5 = move-exception
            org.apache.log4j.Logger r0 = net.sf.filePiper.model.ProcessorThread.log
            java.lang.StringBuilder r1 = new java.lang.StringBuilder
            r2 = r1
            r2.<init>()
            java.lang.String r2 = "Failed to close input steam in processor "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r4
            net.sf.filePiper.model.FileProcessor r2 = r2.processor
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r2 = " for input: "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r4
            net.sf.filePiper.model.InputFileInfo r2 = r2.inputInfo
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r1 = r1.toString()
            r2 = r5
            r0.warn(r1, r2)
        L4b:
            r0 = r4
            r1 = 0
            r0.is = r1
            org.apache.log4j.Logger r0 = net.sf.filePiper.model.ProcessorThread.log
            boolean r0 = r0.isDebugEnabled()
            if (r0 == 0) goto L7e
            org.apache.log4j.Logger r0 = net.sf.filePiper.model.ProcessorThread.log
            java.lang.StringBuilder r1 = new java.lang.StringBuilder
            r2 = r1
            r2.<init>()
            java.lang.String r2 = "    <<< "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r4
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r2 = " release inputStream for input: "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r4
            net.sf.filePiper.model.InputFileInfo r2 = r2.inputInfo
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r1 = r1.toString()
            r0.debug(r1)
        L7e:
            r0 = r4
            r0.notifyAll()
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: net.sf.filePiper.model.ProcessorThread.releaseInputStream():void");
    }

    @Override // java.lang.Thread
    public String toString() {
        return getClass().getSimpleName() + "[" + this.processor.getProcessorName() + "@" + System.identityHashCode(this.processor) + "]";
    }

    @Override // net.sf.filePiper.model.PipeComponent
    public synchronized void processInputStream(InputStream inputStream, InputFileInfo inputFileInfo) {
        while (shouldContinue() && this.thisShouldContinue && this.is != null) {
            try {
                wait();
            } catch (Exception e) {
                log.warn("Wait interrupted by exception", e);
            }
        }
        if (shouldContinue() && this.thisShouldContinue) {
            if (log.isDebugEnabled()) {
                log.debug("    >>> " + this + " starts for input " + inputFileInfo);
            }
            this.is = inputStream;
            this.inputInfo = inputFileInfo;
            notifyAll();
        }
    }

    @Override // net.sf.filePiper.model.PipeComponent
    public OutputStream createOutputStream(InputFileInfo inputFileInfo) throws IOException {
        PipedInputStream pipedInputStream = new PipedInputStream();
        PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream);
        processInputStream(pipedInputStream, inputFileInfo);
        return pipedOutputStream;
    }

    @Override // net.sf.filePiper.model.PipeComponent
    public synchronized void finished() throws IOException {
        if (log.isDebugEnabled()) {
            log.debug("<<Finished>> signal in " + this);
        }
        while (shouldContinue() && this.thisShouldContinue && this.is != null) {
            try {
                wait();
            } catch (Exception e) {
                log.warn("Wait interrupted by exception", e);
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("||| last input stream done in " + this);
        }
        this.thisShouldContinue = false;
        notifyAll();
    }

    @Override // net.sf.filePiper.model.FileProcessorEnvironment
    public OutputStream getOutputStream(InputFileInfo inputFileInfo) throws IOException {
        return this.nextComponent.createOutputStream(inputFileInfo);
    }

    @Override // net.sf.filePiper.model.FileProcessorEnvironment
    public boolean shouldContinue() {
        return this.mainReporting.shouldContinue();
    }

    @Override // net.sf.filePiper.model.FileProcessorEnvironment
    public Pipeline getPipeline() {
        return this.line;
    }

    @Override // net.sf.filePiper.model.FileProcessorEnvironment
    public ExecutionPhase getCurrentPhase() {
        return this.mainReporting.isAborted() ? ExecutionPhase.ABORTED : this.mainReporting.isErrored() ? ExecutionPhase.ERRORED : !this.thisShouldContinue ? ExecutionPhase.DONE : this.mainReporting.isRunning() ? ExecutionPhase.RUNNING : ExecutionPhase.NONE;
    }
}
