package com.arcadedb.database.async;

import com.arcadedb.ContextConfiguration;
import com.arcadedb.GlobalConfiguration;
import com.arcadedb.database.BasicDatabase;
import com.arcadedb.database.Database;
import com.arcadedb.database.DatabaseContext;
import com.arcadedb.database.DatabaseInternal;
import com.arcadedb.database.DocumentCallback;
import com.arcadedb.database.MutableDocument;
import com.arcadedb.database.RID;
import com.arcadedb.database.Record;
import com.arcadedb.engine.Bucket;
import com.arcadedb.engine.ErrorRecordCallback;
import com.arcadedb.engine.WALFile;
import com.arcadedb.exception.DatabaseOperationException;
import com.arcadedb.graph.Vertex;
import com.arcadedb.index.IndexCursor;
import com.arcadedb.index.IndexInternal;
import com.arcadedb.log.LogManager;
import com.arcadedb.schema.DocumentType;
import com.arcadedb.schema.EdgeType;
import com.conversantmedia.util.concurrent.PushPullBlockingQueue;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;

/* loaded from: input_file:com/arcadedb/database/async/DatabaseAsyncExecutorImpl.class */
public class DatabaseAsyncExecutorImpl implements DatabaseAsyncExecutor {
    private final DatabaseInternal database;
    private final ContextConfiguration configuration;
    private AsyncThread[] executorThreads;
    private int commitEvery;
    public static final DatabaseAsyncTask FORCE_EXIT = new DatabaseAsyncTask() { // from class: com.arcadedb.database.async.DatabaseAsyncExecutorImpl.1
        @Override // com.arcadedb.database.async.DatabaseAsyncTask
        public void execute(AsyncThread asyncThread, DatabaseInternal databaseInternal) {
        }

        public String toString() {
            return "FORCE_EXIT";
        }
    };
    private OkCallback onOkCallback;
    private ErrorCallback onErrorCallback;
    private final Random random = new Random();
    private int parallelLevel = 1;
    private int backPressurePercentage = 0;
    private boolean transactionUseWAL = true;
    private WALFile.FLUSH_TYPE transactionSync = WALFile.FLUSH_TYPE.NO;
    private long checkForStalledQueuesMaxDelay = 5000;
    private final AtomicLong transactionCounter = new AtomicLong();
    private final AtomicLong commandRoundRobinIndex = new AtomicLong();
    private AtomicLong counterScheduledTasks = new AtomicLong();

    /* loaded from: input_file:com/arcadedb/database/async/DatabaseAsyncExecutorImpl$AsyncTaskFactory.class */
    public interface AsyncTaskFactory {
        DatabaseAsyncAbstractCallbackTask create();
    }

    /* loaded from: input_file:com/arcadedb/database/async/DatabaseAsyncExecutorImpl$AsyncThread.class */
    public class AsyncThread extends Thread {
        public final BlockingQueue<DatabaseAsyncTask> queue;
        public final DatabaseInternal database;
        public volatile boolean shutdown;
        public volatile boolean forceShutdown;
        public AtomicBoolean executingTask;
        public long count;

        private AsyncThread(DatabaseInternal databaseInternal, int i) {
            super("AsyncExecutor-" + databaseInternal.getName() + "-" + i);
            this.shutdown = false;
            this.forceShutdown = false;
            this.executingTask = new AtomicBoolean(false);
            this.count = 0L;
            this.database = databaseInternal;
            int valueAsInteger = databaseInternal.getConfiguration().getValueAsInteger(GlobalConfiguration.ASYNC_OPERATIONS_QUEUE_SIZE) / DatabaseAsyncExecutorImpl.this.parallelLevel;
            valueAsInteger = valueAsInteger < 1 ? 1 : valueAsInteger;
            String valueAsString = databaseInternal.getConfiguration().getValueAsString(GlobalConfiguration.ASYNC_OPERATIONS_QUEUE_IMPL);
            if ("fast".equalsIgnoreCase(valueAsString)) {
                this.queue = new PushPullBlockingQueue(valueAsInteger);
            } else if ("standard".equalsIgnoreCase(valueAsString)) {
                this.queue = new ArrayBlockingQueue(valueAsInteger);
            } else {
                LogManager.instance().log(this, Level.WARNING, "Error on async operation queue implementation setting: %s is not supported", valueAsString);
                this.queue = new ArrayBlockingQueue(valueAsInteger);
            }
            DatabaseAsyncExecutorImpl.this.backPressurePercentage = databaseInternal.getConfiguration().getValueAsInteger(GlobalConfiguration.ASYNC_BACK_PRESSURE);
        }

        public boolean isShutdown() {
            return this.shutdown;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            DatabaseAsyncTask poll;
            DatabaseContext.INSTANCE.init(this.database);
            DatabaseContext.INSTANCE.getContext(this.database.getDatabasePath()).asyncMode = true;
            this.database.getTransaction().setUseWAL(DatabaseAsyncExecutorImpl.this.transactionUseWAL);
            this.database.setWALFlush(DatabaseAsyncExecutorImpl.this.transactionSync);
            this.database.getTransaction().begin(Database.TRANSACTION_ISOLATION_LEVEL.READ_COMMITTED);
            while (!this.forceShutdown) {
                try {
                    poll = this.queue.poll(500L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.queue.clear();
                } catch (Throwable th) {
                    LogManager.instance().log((Object) this, Level.SEVERE, "Error on executing asynchronous operation (asyncThread=%s)", th, (Object) getName());
                }
                if (poll != null) {
                    this.executingTask.set(true);
                    try {
                        try {
                            LogManager.instance().log(this, Level.FINE, "Received async message %s (threadId=%d)", poll, Long.valueOf(Thread.currentThread().getId()));
                        } catch (Throwable th2) {
                            try {
                                poll.completed();
                                this.executingTask.set(false);
                                throw th2;
                            } catch (Throwable th3) {
                                throw th3;
                            }
                        }
                    } finally {
                        try {
                        } catch (Throwable th4) {
                        }
                    }
                    if (poll == DatabaseAsyncExecutorImpl.FORCE_EXIT) {
                        try {
                            poll.completed();
                            this.executingTask.set(false);
                            break;
                        } finally {
                        }
                    }
                    if (poll.requiresActiveTx() && !this.database.isTransactionActive()) {
                        this.database.begin();
                    }
                    poll.execute(this, this.database);
                    this.count++;
                    if (this.database.isTransactionActive() && this.count % DatabaseAsyncExecutorImpl.this.commitEvery == 0) {
                        this.database.commit();
                        this.database.begin();
                    }
                    try {
                        poll.completed();
                        this.executingTask.set(false);
                    } finally {
                    }
                } else if (this.shutdown) {
                    break;
                }
            }
            try {
                if (this.database.isOpen() && this.database.isTransactionActive()) {
                    this.database.commit();
                }
                onOk();
            } catch (Exception e2) {
                onError(e2);
            }
        }

        public void onError(Throwable th) {
            DatabaseAsyncExecutorImpl.this.onError(th);
        }

        public void onOk() {
            DatabaseAsyncExecutorImpl.this.onOk();
        }

        public boolean isExecutingTask() {
            return this.executingTask.get();
        }
    }

    /* loaded from: input_file:com/arcadedb/database/async/DatabaseAsyncExecutorImpl$DBAsyncStats.class */
    public static class DBAsyncStats {
        public long queueSize;
        public long scheduledTasks;
    }

    public DatabaseAsyncExecutorImpl(DatabaseInternal databaseInternal, ContextConfiguration contextConfiguration) {
        this.database = databaseInternal;
        this.configuration = contextConfiguration;
        this.commitEvery = databaseInternal.getConfiguration().getValueAsInteger(GlobalConfiguration.ASYNC_TX_BATCH_SIZE);
        createThreads(databaseInternal.getConfiguration().getValueAsInteger(GlobalConfiguration.ASYNC_WORKER_THREADS));
    }

    public DBAsyncStats getStats() {
        DBAsyncStats dBAsyncStats = new DBAsyncStats();
        dBAsyncStats.queueSize = 0L;
        if (this.executorThreads != null) {
            for (int i = 0; i < this.executorThreads.length; i++) {
                dBAsyncStats.queueSize += this.executorThreads[i].queue.size();
            }
        }
        dBAsyncStats.scheduledTasks = this.counterScheduledTasks.get();
        return dBAsyncStats;
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void setTransactionUseWAL(boolean z) {
        this.transactionUseWAL = z;
        createThreads(this.parallelLevel);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public boolean isTransactionUseWAL() {
        return this.transactionUseWAL;
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public WALFile.FLUSH_TYPE getTransactionSync() {
        return this.transactionSync;
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void setTransactionSync(WALFile.FLUSH_TYPE flush_type) {
        this.transactionSync = flush_type;
        createThreads(this.parallelLevel);
    }

    public long getCheckForStalledQueuesMaxDelay() {
        return this.checkForStalledQueuesMaxDelay;
    }

    public void setCheckForStalledQueuesMaxDelay(long j) {
        this.checkForStalledQueuesMaxDelay = j;
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void onOk(OkCallback okCallback) {
        this.onOkCallback = okCallback;
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void onError(ErrorCallback errorCallback) {
        this.onErrorCallback = errorCallback;
    }

    public void compact(IndexInternal indexInternal) {
        if (indexInternal.scheduleCompaction()) {
            scheduleTask(getBestSlot(), new DatabaseAsyncIndexCompaction(indexInternal), false, this.backPressurePercentage);
        }
    }

    private int getBestSlot() {
        int i = 0;
        int i2 = -1;
        for (int i3 = 0; i3 < this.executorThreads.length; i3++) {
            int size = this.executorThreads[i3].queue.size() + (this.executorThreads[i3].isExecutingTask() ? 1 : 0);
            if (size == 0) {
                return i3;
            }
            if (i2 == -1 || size < i) {
                i = size;
                i2 = i3;
            }
        }
        return i2;
    }

    private int getRandomSlot() {
        return this.random.nextInt(this.executorThreads.length);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void waitCompletion() {
        waitCompletion(0L);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public boolean waitCompletion(long j) {
        if (this.executorThreads == null) {
            return true;
        }
        DatabaseAsyncAbstractCallbackTask[] databaseAsyncAbstractCallbackTaskArr = new DatabaseAsyncAbstractCallbackTask[this.executorThreads.length];
        for (int i = 0; i < this.executorThreads.length; i++) {
            try {
                databaseAsyncAbstractCallbackTaskArr[i] = new DatabaseAsyncCompletion();
                this.executorThreads[i].queue.put(databaseAsyncAbstractCallbackTaskArr[i]);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        if (j <= 0) {
            j = Long.MAX_VALUE;
        }
        long j2 = j;
        long currentTimeMillis = System.currentTimeMillis();
        for (DatabaseAsyncAbstractCallbackTask databaseAsyncAbstractCallbackTask : databaseAsyncAbstractCallbackTaskArr) {
            try {
                if (!databaseAsyncAbstractCallbackTask.waitCompletion(j2)) {
                    return false;
                }
                j2 = j - (System.currentTimeMillis() - currentTimeMillis);
                if (j2 < 1) {
                    return false;
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        return true;
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void query(String str, String str2, AsyncResultsetCallback asyncResultsetCallback, Object... objArr) {
        scheduleTask(getSlot((int) this.commandRoundRobinIndex.getAndIncrement()), new DatabaseAsyncCommand(this.configuration, true, str, str2, objArr, asyncResultsetCallback), true, this.backPressurePercentage);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void query(String str, String str2, AsyncResultsetCallback asyncResultsetCallback, Map<String, Object> map) {
        scheduleTask(getSlot((int) this.commandRoundRobinIndex.getAndIncrement()), new DatabaseAsyncCommand(this.configuration, true, str, str2, map, asyncResultsetCallback), true, this.backPressurePercentage);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void command(String str, String str2, AsyncResultsetCallback asyncResultsetCallback, Object... objArr) {
        scheduleTask(getSlot((int) this.commandRoundRobinIndex.getAndIncrement()), new DatabaseAsyncCommand(this.configuration, false, str, str2, objArr, asyncResultsetCallback), true, this.backPressurePercentage);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void command(String str, String str2, AsyncResultsetCallback asyncResultsetCallback, Map<String, Object> map) {
        scheduleTask(getSlot((int) this.commandRoundRobinIndex.getAndIncrement()), new DatabaseAsyncCommand(this.configuration, false, str, str2, map, asyncResultsetCallback), true, this.backPressurePercentage);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void scanType(String str, boolean z, DocumentCallback documentCallback) {
        scanType(str, z, documentCallback, null);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void scanType(String str, boolean z, DocumentCallback documentCallback, ErrorRecordCallback errorRecordCallback) {
        try {
            List<Bucket> buckets = this.database.getSchema().getType(str).getBuckets(z);
            CountDownLatch countDownLatch = new CountDownLatch(buckets.size());
            for (Bucket bucket : buckets) {
                scheduleTask(getSlot(bucket.getFileId()), new DatabaseAsyncScanBucket(countDownLatch, documentCallback, errorRecordCallback, bucket), true, this.backPressurePercentage);
            }
            countDownLatch.await();
        } catch (Exception e) {
            throw new DatabaseOperationException("Error on executing parallel scan of type '" + String.valueOf(this.database.getSchema().getType(str)) + "'", e);
        }
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void transaction(BasicDatabase.TransactionScope transactionScope) {
        transaction(transactionScope, this.database.getConfiguration().getValueAsInteger(GlobalConfiguration.TX_RETRIES));
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void transaction(BasicDatabase.TransactionScope transactionScope, int i) {
        transaction(transactionScope, i, null, null);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void transaction(BasicDatabase.TransactionScope transactionScope, int i, OkCallback okCallback, ErrorCallback errorCallback) {
        transaction(transactionScope, i, okCallback, errorCallback, getSlot((int) this.transactionCounter.getAndIncrement()));
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void transaction(BasicDatabase.TransactionScope transactionScope, int i, OkCallback okCallback, ErrorCallback errorCallback, int i2) {
        scheduleTask(i2, new DatabaseAsyncTransaction(transactionScope, i, okCallback, errorCallback), true, this.backPressurePercentage);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void createRecord(MutableDocument mutableDocument, NewRecordCallback newRecordCallback) {
        createRecord(mutableDocument, newRecordCallback, (ErrorCallback) null);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void createRecord(MutableDocument mutableDocument, NewRecordCallback newRecordCallback, ErrorCallback errorCallback) {
        DocumentType type = mutableDocument.getType();
        if (mutableDocument.getIdentity() != null) {
            throw new IllegalArgumentException("Cannot create a new record because it is already persistent");
        }
        Bucket bucketIdByRecord = type.getBucketIdByRecord(mutableDocument, false);
        scheduleTask(getSlot(bucketIdByRecord.getFileId()), new DatabaseAsyncCreateRecord(mutableDocument, bucketIdByRecord, newRecordCallback, errorCallback), true, this.backPressurePercentage);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void createRecord(Record record, String str, NewRecordCallback newRecordCallback) {
        createRecord(record, str, newRecordCallback, null);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void createRecord(Record record, String str, NewRecordCallback newRecordCallback, ErrorCallback errorCallback) {
        Bucket bucketByName = this.database.getSchema().getBucketByName(str);
        int slot = getSlot(bucketByName.getFileId());
        if (record.getIdentity() != null) {
            throw new IllegalArgumentException("Cannot create a new record because it is already persistent");
        }
        scheduleTask(slot, new DatabaseAsyncCreateRecord(record, bucketByName, newRecordCallback, errorCallback), true, this.backPressurePercentage);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void updateRecord(MutableDocument mutableDocument, UpdatedRecordCallback updatedRecordCallback) {
        updateRecord(mutableDocument, updatedRecordCallback, null);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void updateRecord(MutableDocument mutableDocument, UpdatedRecordCallback updatedRecordCallback, ErrorCallback errorCallback) {
        if (mutableDocument.getIdentity() == null) {
            throw new IllegalArgumentException("Cannot updated a not persistent record");
        }
        scheduleTask(getSlot(mutableDocument.getIdentity().getBucketId()), new DatabaseAsyncUpdateRecord(mutableDocument, updatedRecordCallback, errorCallback), true, this.backPressurePercentage);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void deleteRecord(Record record, DeletedRecordCallback deletedRecordCallback) {
        deleteRecord(record, deletedRecordCallback, null);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void deleteRecord(Record record, DeletedRecordCallback deletedRecordCallback, ErrorCallback errorCallback) {
        if (record.getIdentity() == null) {
            throw new IllegalArgumentException("Cannot delete a not persistent record");
        }
        scheduleTask(getSlot(record.getIdentity().getBucketId()), new DatabaseAsyncDeleteRecord(record, deletedRecordCallback, errorCallback), true, this.backPressurePercentage);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    @Deprecated
    public void newEdge(Vertex vertex, String str, RID rid, boolean z, boolean z2, NewEdgeCallback newEdgeCallback, Object... objArr) {
        if (!z && ((EdgeType) this.database.getSchema().getType(str)).isBidirectional()) {
            throw new IllegalArgumentException("Edge type '" + str + "' is not bidirectional");
        }
        newEdge(vertex, str, rid, z2, newEdgeCallback, objArr);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void newEdge(Vertex vertex, String str, RID rid, boolean z, NewEdgeCallback newEdgeCallback, Object... objArr) {
        if (vertex == null) {
            throw new IllegalArgumentException("Source vertex is null");
        }
        if (rid == null) {
            throw new IllegalArgumentException("Destination vertex is null");
        }
        int slot = getSlot(vertex.getIdentity().getBucketId());
        int slot2 = getSlot(rid.getBucketId());
        boolean isBidirectional = ((EdgeType) this.database.getSchema().getType(str)).isBidirectional();
        if (slot == slot2) {
            scheduleTask(slot, new CreateEdgeAsyncTask(vertex, rid, str, objArr, z, newEdgeCallback), true, this.backPressurePercentage);
        } else {
            scheduleTask(slot, new CreateEdgeAsyncTask(vertex, rid, str, objArr, z, (edge, z2, z3) -> {
                if (isBidirectional) {
                    scheduleTask(slot2, new CreateIncomingEdgeAsyncTask(vertex.getIdentity(), rid, edge, (edge, z2, z3) -> {
                        if (newEdgeCallback != null) {
                            newEdgeCallback.call(edge, z2, z3);
                        }
                    }), true, 0);
                } else if (newEdgeCallback != null) {
                    newEdgeCallback.call(edge, z2, z3);
                }
            }), true, this.backPressurePercentage);
        }
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void newEdgeByKeys(String str, String str2, Object obj, String str3, String str4, Object obj2, boolean z, String str5, boolean z2, boolean z3, NewEdgeCallback newEdgeCallback, Object... objArr) {
        newEdgeByKeys(str, new String[]{str2}, new Object[]{obj}, str3, new String[]{str4}, new Object[]{obj2}, z, str5, z2, z3, newEdgeCallback, objArr);
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void newEdgeByKeys(String str, String[] strArr, Object[] objArr, String str2, String[] strArr2, Object[] objArr2, boolean z, String str3, boolean z2, boolean z3, NewEdgeCallback newEdgeCallback, Object... objArr3) {
        if (strArr == null) {
            throw new IllegalArgumentException("Source vertex key is null");
        }
        if (strArr.length != objArr.length) {
            throw new IllegalArgumentException("Source vertex key and value arrays have different sizes");
        }
        if (strArr2 == null) {
            throw new IllegalArgumentException("Destination vertex key is null");
        }
        if (strArr2.length != objArr2.length) {
            throw new IllegalArgumentException("Destination vertex key and value arrays have different sizes");
        }
        IndexCursor lookupByKey = this.database.lookupByKey(str, strArr, objArr);
        IndexCursor lookupByKey2 = this.database.lookupByKey(str2, strArr2, objArr2);
        RID identity = lookupByKey.hasNext() ? lookupByKey.next().getIdentity() : null;
        RID identity2 = lookupByKey2.hasNext() ? lookupByKey2.next().getIdentity() : null;
        if (identity == null && identity2 == null) {
            if (!z) {
                throw new IllegalArgumentException("Cannot find source and destination vertices with respectively key " + Arrays.toString(strArr) + "=" + Arrays.toString(objArr) + " and " + Arrays.toString(strArr2) + "=" + Arrays.toString(objArr2));
            }
            scheduleTask(getRandomSlot(), new CreateBothVerticesAndEdgeAsyncTask(str, strArr, objArr, str2, strArr2, objArr2, str3, objArr3, z3, newEdgeCallback), true, this.backPressurePercentage);
        } else if (identity != null && identity2 == null) {
            if (!z) {
                throw new IllegalArgumentException("Cannot find destination vertex with key " + Arrays.toString(strArr2) + "=" + Arrays.toString(objArr2));
            }
            scheduleTask(getSlot(identity.getBucketId()), new CreateDestinationVertexAndEdgeAsyncTask(identity, str2, strArr2, objArr2, str3, objArr3, z3, newEdgeCallback), true, this.backPressurePercentage);
        } else if (identity != null || identity2 == null) {
            newEdge(identity.asVertex(true), str3, identity2, z3, newEdgeCallback, objArr3);
        } else {
            if (!z) {
                throw new IllegalArgumentException("Cannot find source vertex with key " + Arrays.toString(strArr) + "=" + Arrays.toString(objArr));
            }
            scheduleTask(getSlot(identity2.getBucketId()), new CreateSourceVertexAndEdgeAsyncTask(str, strArr, objArr, identity2, str3, objArr3, z3, newEdgeCallback), true, this.backPressurePercentage);
        }
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void kill() {
        if (this.executorThreads != null) {
            for (int i = 0; i < this.executorThreads.length; i++) {
                this.executorThreads[i].forceShutdown = true;
            }
            this.executorThreads = null;
        }
    }

    public void close() {
        shutdownThreads();
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public int getParallelLevel() {
        return this.parallelLevel;
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void setParallelLevel(int i) {
        if (i != this.parallelLevel) {
            createThreads(i);
        }
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public int getBackPressure() {
        return this.backPressurePercentage;
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void setBackPressure(int i) {
        this.backPressurePercentage = i;
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public int getCommitEvery() {
        return this.commitEvery;
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void setCommitEvery(int i) {
        this.commitEvery = i;
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public int getThreadCount() {
        if (this.executorThreads != null) {
            return this.executorThreads.length;
        }
        return 0;
    }

    private void createThreads(int i) {
        if (i < 1) {
            i = 1;
        }
        shutdownThreads();
        this.executorThreads = new AsyncThread[i];
        for (int i2 = 0; i2 < i; i2++) {
            this.executorThreads[i2] = new AsyncThread(this.database, i2);
            this.executorThreads[i2].start();
        }
        this.parallelLevel = i;
    }

    private void shutdownThreads() {
        try {
            if (this.executorThreads != null) {
                for (int i = 0; i < this.executorThreads.length; i++) {
                    this.executorThreads[i].shutdown = true;
                }
                for (int i2 = 0; i2 < this.executorThreads.length; i2++) {
                    this.executorThreads[i2].queue.put(FORCE_EXIT);
                    this.executorThreads[i2].join(10000L);
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            this.executorThreads = null;
        }
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void onOk() {
        if (this.onOkCallback != null) {
            try {
                this.onOkCallback.call();
            } catch (Exception e) {
                LogManager.instance().log((Object) this, Level.SEVERE, "Error on invoking onOk() callback for asynchronous operation %s", (Throwable) e, (Object) this);
            }
        }
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public void onError(Throwable th) {
        if (this.onErrorCallback != null) {
            try {
                this.onErrorCallback.call(th);
            } catch (Exception e) {
                LogManager.instance().log((Object) this, Level.SEVERE, "Error on invoking onError() callback for asynchronous operation %s", th, (Object) this);
            }
        }
    }

    public boolean scheduleTask(int i, DatabaseAsyncTask databaseAsyncTask, boolean z, int i2) {
        int remainingCapacity;
        if (i == -1) {
            try {
                i = getBestSlot();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new DatabaseOperationException("Error on executing asynchronous task " + String.valueOf(databaseAsyncTask));
            }
        }
        BlockingQueue<DatabaseAsyncTask> blockingQueue = this.executorThreads[i].queue;
        if (i2 > 0 && (remainingCapacity = 100 - ((blockingQueue.remainingCapacity() * 100) / (blockingQueue.remainingCapacity() + blockingQueue.size()))) >= i2) {
            Thread.sleep(remainingCapacity);
        }
        if (!z) {
            boolean offer = blockingQueue.offer(databaseAsyncTask);
            if (offer) {
                this.counterScheduledTasks.incrementAndGet();
            }
            return offer;
        }
        if (!blockingQueue.offer(databaseAsyncTask, this.checkForStalledQueuesMaxDelay, TimeUnit.MILLISECONDS)) {
            DatabaseAsyncTask peek = blockingQueue.peek();
            while (!blockingQueue.offer(databaseAsyncTask, this.checkForStalledQueuesMaxDelay, TimeUnit.MILLISECONDS)) {
                DatabaseAsyncTask peek2 = blockingQueue.peek();
                if (peek2 != null && peek2 == peek) {
                    throw new DatabaseOperationException("Asynchronous queue " + i + " is stalled. This could happen when an asynchronous task schedules more asynchronous tasks");
                }
                if (i2 > 0) {
                    Thread.sleep(100 + (4 * (100 - ((blockingQueue.remainingCapacity() * 100) / (blockingQueue.remainingCapacity() + blockingQueue.size())))));
                }
            }
        }
        this.counterScheduledTasks.incrementAndGet();
        return true;
    }

    public int getSlot(int i) {
        return (i & Integer.MAX_VALUE) % this.executorThreads.length;
    }

    @Override // com.arcadedb.database.async.DatabaseAsyncExecutor
    public boolean isProcessing() {
        if (this.executorThreads == null) {
            return false;
        }
        for (int i = 0; i < this.executorThreads.length; i++) {
            if (this.executorThreads[i].isExecutingTask() || this.executorThreads[i].queue.size() > 0) {
                return true;
            }
        }
        return false;
    }
}
