package org.apache.atlas.repository.patches;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.pc.WorkItemConsumer;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/atlas/repository/patches/ConcurrentPatchProcessor.class */
public abstract class ConcurrentPatchProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentPatchProcessor.class);
    private static final String NUM_WORKERS_PROPERTY = "atlas.patch.numWorkers";
    private static final String BATCH_SIZE_PROPERTY = "atlas.patch.batchSize";
    private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS";
    private static final String WORKER_NAME_PREFIX = "patchWorkItem";
    public static final int NUM_WORKERS;
    public static final int BATCH_SIZE;
    private final EntityGraphMapper entityGraphMapper;
    private final AtlasGraph graph;
    private final GraphBackedSearchIndexer indexer;
    private final AtlasTypeRegistry typeRegistry;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/atlas/repository/patches/ConcurrentPatchProcessor$Consumer.class */
    public static class Consumer extends WorkItemConsumer<Long> {
        private int MAX_COMMIT_RETRY_COUNT;
        private final AtlasGraph graph;
        private final AtlasTypeRegistry typeRegistry;
        private final AtomicLong counter;
        private final ConcurrentPatchProcessor individualItemProcessor;

        public Consumer(AtlasGraph atlasGraph, AtlasTypeRegistry atlasTypeRegistry, BlockingQueue<Long> blockingQueue, ConcurrentPatchProcessor concurrentPatchProcessor) {
            super(blockingQueue);
            this.MAX_COMMIT_RETRY_COUNT = 3;
            this.graph = atlasGraph;
            this.typeRegistry = atlasTypeRegistry;
            this.counter = new AtomicLong(0L);
            this.individualItemProcessor = concurrentPatchProcessor;
        }

        protected void doCommit() {
            if (this.counter.get() % ConcurrentPatchProcessor.BATCH_SIZE == 0) {
                ConcurrentPatchProcessor.LOG.info("Processed: {}", Long.valueOf(this.counter.get()));
                attemptCommit();
            }
        }

        protected void commitDirty() {
            attemptCommit();
            ConcurrentPatchProcessor.LOG.info("Total: Commit: {}", Long.valueOf(this.counter.get()));
            super.commitDirty();
        }

        private void attemptCommit() {
            for (int i = 1; i <= this.MAX_COMMIT_RETRY_COUNT; i++) {
                try {
                    this.graph.commit();
                    return;
                } catch (Exception e) {
                    ConcurrentPatchProcessor.LOG.error("Commit exception: ", Integer.valueOf(i), e);
                    try {
                        Thread.currentThread();
                        Thread.sleep(300 * i);
                    } catch (InterruptedException e2) {
                        ConcurrentPatchProcessor.LOG.error("Commit exception: Pause: Interrputed!", e2);
                    }
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void processItem(Long l) {
            String typeName;
            AtlasEntityType entityTypeByName;
            this.counter.incrementAndGet();
            AtlasVertex vertex = this.graph.getVertex(Long.toString(l.longValue()));
            if (vertex == null) {
                ConcurrentPatchProcessor.LOG.warn("processItem(vertexId={}): AtlasVertex not found!", l);
                return;
            }
            if (AtlasGraphUtilsV2.isTypeVertex(vertex) || (entityTypeByName = this.typeRegistry.getEntityTypeByName((typeName = AtlasGraphUtilsV2.getTypeName(vertex)))) == null) {
                return;
            }
            try {
                this.individualItemProcessor.processVertexItem(l, vertex, typeName, entityTypeByName);
                doCommit();
            } catch (AtlasBaseException e) {
                ConcurrentPatchProcessor.LOG.error("Error processing: {}", l, e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/atlas/repository/patches/ConcurrentPatchProcessor$ConsumerBuilder.class */
    public static class ConsumerBuilder implements WorkItemBuilder<Consumer, Long> {
        private final AtlasTypeRegistry typeRegistry;
        private final AtlasGraph graph;
        private final ConcurrentPatchProcessor patchItemProcessor;

        public ConsumerBuilder(AtlasGraph atlasGraph, AtlasTypeRegistry atlasTypeRegistry, ConcurrentPatchProcessor concurrentPatchProcessor) {
            this.graph = atlasGraph;
            this.typeRegistry = atlasTypeRegistry;
            this.patchItemProcessor = concurrentPatchProcessor;
        }

        public Consumer build(BlockingQueue<Long> blockingQueue) {
            return new Consumer(this.graph, this.typeRegistry, blockingQueue, this.patchItemProcessor);
        }

        /* renamed from: build, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Runnable m120build(BlockingQueue blockingQueue) {
            return build((BlockingQueue<Long>) blockingQueue);
        }
    }

    public ConcurrentPatchProcessor(PatchContext patchContext) {
        this.graph = patchContext.getGraph();
        this.indexer = patchContext.getIndexer();
        this.typeRegistry = patchContext.getTypeRegistry();
        this.entityGraphMapper = patchContext.getEntityGraphMapper();
    }

    public EntityGraphMapper getEntityGraphMapper() {
        return this.entityGraphMapper;
    }

    public AtlasGraph getGraph() {
        return this.graph;
    }

    public GraphBackedSearchIndexer getIndexer() {
        return this.indexer;
    }

    public AtlasTypeRegistry getTypeRegistry() {
        return this.typeRegistry;
    }

    public void apply() throws AtlasBaseException {
        prepareForExecution();
        execute();
    }

    protected abstract void prepareForExecution() throws AtlasBaseException;

    protected abstract void submitVerticesToUpdate(WorkItemManager workItemManager);

    protected abstract void processVertexItem(Long l, AtlasVertex atlasVertex, String str, AtlasEntityType atlasEntityType) throws AtlasBaseException;

    private void execute() {
        WorkItemManager workItemManager = new WorkItemManager(new ConsumerBuilder(this.graph, this.typeRegistry, this), WORKER_NAME_PREFIX, BATCH_SIZE, NUM_WORKERS, false);
        try {
            submitVerticesToUpdate(workItemManager);
            workItemManager.drain();
        } finally {
            try {
                workItemManager.shutdown();
            } catch (InterruptedException e) {
                LOG.error("ConcurrentPatchProcessor.execute(): interrupted during WorkItemManager shutdown.", e);
            }
        }
    }

    static {
        int i = 3;
        int i2 = 300;
        try {
            Configuration configuration = ApplicationProperties.get();
            i = configuration.getInt(NUM_WORKERS_PROPERTY, configuration.getInt(ATLAS_SOLR_SHARDS, 1) * 3);
            i2 = configuration.getInt(BATCH_SIZE_PROPERTY, 300);
            LOG.info("ConcurrentPatchProcessor: {}={}, {}={}", new Object[]{NUM_WORKERS_PROPERTY, Integer.valueOf(i), BATCH_SIZE_PROPERTY, Integer.valueOf(i2)});
        } catch (Exception e) {
            LOG.error("Error retrieving configuration.", e);
        }
        NUM_WORKERS = i;
        BATCH_SIZE = i2;
    }
}
