package org.openbase.bco.ontology.lib.manager.buffer;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import javafx.util.Pair;
import org.openbase.bco.ontology.lib.commun.rsb.RsbCommunication;
import org.openbase.bco.ontology.lib.commun.web.SparqlUpdateWeb;
import org.openbase.bco.ontology.lib.system.config.OntConfig;
import org.openbase.jps.exception.JPServiceException;
import org.openbase.jul.exception.CouldNotPerformException;
import org.openbase.jul.exception.CouldNotProcessException;
import org.openbase.jul.exception.printer.ExceptionPrinter;
import org.openbase.jul.exception.printer.LogLevel;
import org.openbase.jul.extension.rsb.iface.RSBInformer;
import org.openbase.jul.schedule.GlobalScheduledExecutorService;
import org.openbase.jul.schedule.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rst.domotic.ontology.OntologyChangeType;

/* loaded from: input_file:org/openbase/bco/ontology/lib/manager/buffer/TransactionBufferImpl.class */
public class TransactionBufferImpl implements TransactionBuffer {
    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionBufferImpl.class);
    private final Queue<Pair<String, Boolean>> queue = new ConcurrentLinkedQueue();
    private final OntologyChangeType.OntologyChange.Category category = OntologyChangeType.OntologyChange.Category.UNKNOWN;
    private final Stopwatch stopwatch = new Stopwatch();
    private Future future;

    @Override // org.openbase.bco.ontology.lib.manager.buffer.TransactionBuffer
    public void createAndStartQueue(RSBInformer<OntologyChangeType.OntologyChange> rSBInformer) throws CouldNotPerformException {
        try {
            this.future = GlobalScheduledExecutorService.scheduleWithFixedDelay(() -> {
                while (!this.queue.isEmpty()) {
                    Pair<String, Boolean> peek = this.queue.peek();
                    String str = (String) peek.getKey();
                    try {
                        if (((Boolean) peek.getValue()).booleanValue() ? SparqlUpdateWeb.sparqlUpdateToAllDataBases(str, OntConfig.ServerServiceForm.UPDATE) : SparqlUpdateWeb.sparqlUpdateToMainOntology(str, OntConfig.ServerServiceForm.UPDATE)) {
                            this.queue.poll();
                        } else {
                            this.stopwatch.waitForStart(5000L);
                        }
                    } catch (InterruptedException | JPServiceException e) {
                        this.future.cancel(true);
                        ExceptionPrinter.printHistory(e, LOGGER, LogLevel.ERROR);
                    } catch (CouldNotPerformException e2) {
                        this.queue.poll();
                        ExceptionPrinter.printHistory("Dropped broken queue entry. Server could not perform, cause of client error... wrong update? Queue entry is: " + str, e2, LOGGER, LogLevel.ERROR);
                    }
                    if (this.queue.isEmpty() && rSBInformer != null) {
                        RsbCommunication.startNotification(rSBInformer, OntologyChangeType.OntologyChange.newBuilder().addCategory(this.category).build());
                    }
                }
            }, 0L, 1L, TimeUnit.SECONDS);
        } catch (IllegalArgumentException | RejectedExecutionException | CouldNotPerformException e) {
            throw new CouldNotProcessException("Could not process transactionBuffer thread!", e);
        }
    }

    @Override // org.openbase.bco.ontology.lib.manager.buffer.TransactionBuffer
    public void insertData(Pair<String, Boolean> pair) throws CouldNotProcessException {
        if (!this.queue.offer(pair)) {
            throw new CouldNotProcessException("Could not add element to queue!");
        }
    }
}
