package org.apache.camel.component.nitrite;

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.support.DefaultConsumer;
import org.dizitart.no2.event.ChangeInfo;
import org.dizitart.no2.event.ChangeListener;
import org.dizitart.no2.event.ChangedItem;

/* loaded from: input_file:org/apache/camel/component/nitrite/NitriteConsumer.class */
public class NitriteConsumer extends DefaultConsumer {
    private final NitriteEndpoint endpoint;
    private NitriteChangeListener changeListener;

    /* loaded from: input_file:org/apache/camel/component/nitrite/NitriteConsumer$NitriteChangeListener.class */
    private class NitriteChangeListener implements ChangeListener {
        private NitriteChangeListener() {
        }

        public void onChange(ChangeInfo changeInfo) {
            for (ChangedItem changedItem : changeInfo.getChangedItems()) {
                Exchange createExchange = NitriteConsumer.this.createExchange(false);
                Message message = createExchange.getMessage();
                message.setHeader(NitriteConstants.CHANGE_TIMESTAMP, Long.valueOf(changedItem.getChangeTimestamp()));
                message.setHeader(NitriteConstants.CHANGE_TYPE, changedItem.getChangeType());
                message.setBody(changedItem.getDocument());
                try {
                    try {
                        NitriteConsumer.this.getProcessor().process(createExchange);
                        if (createExchange.getException() != null) {
                            NitriteConsumer.this.getExceptionHandler().handleException("Error processing exchange", createExchange, createExchange.getException());
                        }
                        NitriteConsumer.this.releaseExchange(createExchange, false);
                    } catch (Exception e) {
                        createExchange.setException(e);
                        if (createExchange.getException() != null) {
                            NitriteConsumer.this.getExceptionHandler().handleException("Error processing exchange", createExchange, createExchange.getException());
                        }
                        NitriteConsumer.this.releaseExchange(createExchange, false);
                    }
                } catch (Throwable th) {
                    if (createExchange.getException() != null) {
                        NitriteConsumer.this.getExceptionHandler().handleException("Error processing exchange", createExchange, createExchange.getException());
                    }
                    NitriteConsumer.this.releaseExchange(createExchange, false);
                    throw th;
                }
            }
        }
    }

    public NitriteConsumer(NitriteEndpoint nitriteEndpoint, Processor processor) {
        super(nitriteEndpoint, processor);
        this.endpoint = nitriteEndpoint;
    }

    protected void doInit() throws Exception {
        super.doInit();
        this.changeListener = new NitriteChangeListener();
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.endpoint.getNitriteCollection().register(this.changeListener);
    }

    protected void doStop() throws Exception {
        if (this.changeListener != null) {
            this.endpoint.getNitriteCollection().deregister(this.changeListener);
        }
        super.doStop();
    }
}
