package com.senseidb.indexing;

import com.browseengine.bobo.api.BoboSegmentReader;
import com.senseidb.conf.SenseiSchema;
import com.senseidb.gateway.SenseiGateway;
import com.senseidb.jmx.JmxUtil;
import com.senseidb.metrics.MetricsConstants;
import com.senseidb.plugin.SenseiPluginRegistry;
import com.senseidb.search.node.SenseiIndexingManager;
import com.senseidb.search.plugin.PluggableSearchEngineManager;
import com.senseidb.util.JSONUtil;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.Timer;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.management.StandardMBean;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.lucene.util.BytesRef;
import org.json.JSONObject;
import proj.zoie.api.DataConsumer;
import proj.zoie.api.DataProvider;
import proj.zoie.api.Zoie;
import proj.zoie.api.ZoieException;
import proj.zoie.api.ZoieMultiReader;
import proj.zoie.impl.indexing.StreamDataProvider;
import proj.zoie.impl.indexing.ZoieConfig;
import proj.zoie.mbean.DataProviderAdmin;
import proj.zoie.mbean.DataProviderAdminMBean;

/* loaded from: input_file:com/senseidb/indexing/DefaultStreamingIndexingManager.class */
public class DefaultStreamingIndexingManager implements SenseiIndexingManager<JSONObject> {
    private static final Logger logger = Logger.getLogger(DefaultStreamingIndexingManager.class);
    public static final String CONFIG_PREFIX = "sensei.index.manager.default";
    private static final String MAX_PARTITION_ID = "maxpartition.id";
    private static final String EVTS_PER_MIN = "eventsPerMin";
    private static final String BATCH_SIZE = "batchSize";
    private static final String VOLATILE_TIME = "maxVolatileTimeInMillis";
    private static final String EVENT_CREATED_TIMESTAMP_FIELD = "eventCreatedTimestampField";
    private static Meter ProviderBatchSizeMeter;
    private static Meter EventMeter;
    private static Meter UpdateBatchSizeMeter;
    private static Timer IndexingLatencyTimer;
    private final String _eventCreatedTimestampField;
    private final SenseiSchema _senseiSchema;
    private final Configuration _myconfig;
    private final SenseiGateway<?> _gateway;
    private final ShardingStrategy _shardingStrategy;
    private final Comparator<String> _versionComparator;
    private final PluggableSearchEngineManager pluggableSearchEngineManager;
    private final SenseiPluginRegistry pluginRegistry;
    private StreamDataProvider<JSONObject> _dataProvider = null;
    private String _oldestSinceKey = null;
    private Map<Integer, Zoie<BoboSegmentReader, JSONObject>> _zoieSystemMap = null;
    private final LinkedHashMap<Integer, Collection<DataConsumer.DataEvent<JSONObject>>> _dataCollectorMap = new LinkedHashMap<>();

    /* loaded from: input_file:com/senseidb/indexing/DefaultStreamingIndexingManager$DataDispatcher.class */
    private class DataDispatcher implements DataConsumer<JSONObject> {
        int _maxPartitionId;
        private final String _uidField;
        private volatile String _currentVersion = null;

        public DataDispatcher(int i, String str) {
            this._maxPartitionId = i;
            this._uidField = str;
        }

        private void reportIndexingLatency(JSONObject jSONObject) {
            if (DefaultStreamingIndexingManager.this._eventCreatedTimestampField != null) {
                long optLong = jSONObject.optLong(DefaultStreamingIndexingManager.this._eventCreatedTimestampField);
                if (optLong > 0) {
                    DefaultStreamingIndexingManager.IndexingLatencyTimer.update(System.currentTimeMillis() - optLong, TimeUnit.MILLISECONDS);
                }
            }
        }

        private JSONObject rewriteData(JSONObject jSONObject, int i) {
            String optString = jSONObject.optString("type", null);
            JSONObject optJSONObject = jSONObject.optJSONObject("data");
            if (optJSONObject == null) {
                optJSONObject = jSONObject;
            } else if (optString != null) {
                try {
                    optJSONObject.put("type", optString);
                } catch (Exception e) {
                    DefaultStreamingIndexingManager.logger.error("Should never happen", e);
                }
            }
            reportIndexingLatency(optJSONObject);
            if (SenseiSchema.EVENT_TYPE_UPDATE.equalsIgnoreCase(optString)) {
                Zoie zoie = (Zoie) DefaultStreamingIndexingManager.this._zoieSystemMap.get(Integer.valueOf(i));
                try {
                    List indexReaders = zoie.getIndexReaders();
                    try {
                        if (indexReaders == null) {
                            DefaultStreamingIndexingManager.logger.error("Cannot found original doc for and update event: " + jSONObject);
                            return null;
                        }
                        try {
                            BytesRef bytesRef = null;
                            long parseLong = Long.parseLong(optJSONObject.getString(DefaultStreamingIndexingManager.this._senseiSchema.getUidField()));
                            Iterator it = indexReaders.iterator();
                            while (it.hasNext()) {
                                bytesRef = ((ZoieMultiReader) it.next()).getStoredValue(parseLong);
                                if (bytesRef != null) {
                                    break;
                                }
                            }
                            byte[] decompress = DefaultStreamingIndexingManager.this._senseiSchema.isCompressSrcData() ? DefaultJsonSchemaInterpreter.decompress(bytesRef.bytes) : bytesRef.bytes;
                            if (decompress == null) {
                                DefaultStreamingIndexingManager.logger.error("Cannot found original doc for and update event: " + jSONObject);
                                zoie.returnIndexReaders(indexReaders);
                                return null;
                            }
                            JSONUtil.FastJSONObject fastJSONObject = new JSONUtil.FastJSONObject(new String(decompress, "UTF-8"));
                            Iterator<String> keys = optJSONObject.keys();
                            while (keys.hasNext()) {
                                String next = keys.next();
                                fastJSONObject.put(next, optJSONObject.get(next));
                            }
                            optJSONObject = fastJSONObject;
                            zoie.returnIndexReaders(indexReaders);
                        } catch (Exception e2) {
                            DefaultStreamingIndexingManager.logger.error(e2.getMessage(), e2);
                            zoie.returnIndexReaders(indexReaders);
                            return null;
                        }
                    } catch (Throwable th) {
                        zoie.returnIndexReaders(indexReaders);
                        throw th;
                    }
                } catch (Exception e3) {
                    DefaultStreamingIndexingManager.logger.error(e3.getMessage(), e3);
                    return null;
                }
            }
            return optJSONObject;
        }

        public void consume(Collection<DataConsumer.DataEvent<JSONObject>> collection) throws ZoieException {
            LinkedList linkedList;
            DefaultStreamingIndexingManager.UpdateBatchSizeMeter.mark(collection.size());
            DefaultStreamingIndexingManager.ProviderBatchSizeMeter.mark(DefaultStreamingIndexingManager.this._dataProvider.getBatchSize());
            DefaultStreamingIndexingManager.EventMeter.mark(DefaultStreamingIndexingManager.this._dataProvider.getEventCount());
            try {
                for (DataConsumer.DataEvent<JSONObject> dataEvent : collection) {
                    JSONObject jSONObject = (JSONObject) dataEvent.getData();
                    if (jSONObject != null) {
                        this._currentVersion = dataEvent.getVersion();
                        if (DefaultStreamingIndexingManager.this.pluggableSearchEngineManager != null && DefaultStreamingIndexingManager.this.pluggableSearchEngineManager.acceptEventsForAllPartitions()) {
                            jSONObject = DefaultStreamingIndexingManager.this.pluggableSearchEngineManager.update(jSONObject, this._currentVersion);
                        }
                        int caculateShard = DefaultStreamingIndexingManager.this._shardingStrategy.caculateShard(this._maxPartitionId, jSONObject);
                        Collection collection2 = (Collection) DefaultStreamingIndexingManager.this._dataCollectorMap.get(Integer.valueOf(caculateShard));
                        if (collection2 != null) {
                            if (DefaultStreamingIndexingManager.this.pluggableSearchEngineManager != null && !DefaultStreamingIndexingManager.this.pluggableSearchEngineManager.acceptEventsForAllPartitions()) {
                                DefaultStreamingIndexingManager.this.pluggableSearchEngineManager.update(jSONObject, dataEvent.getVersion());
                            }
                            JSONObject rewriteData = rewriteData(jSONObject, caculateShard);
                            if (rewriteData != null) {
                                if (rewriteData != jSONObject) {
                                    dataEvent = new DataConsumer.DataEvent<>(rewriteData, dataEvent.getVersion(), dataEvent.isDelete());
                                }
                                collection2.add(dataEvent);
                            }
                        }
                    }
                }
                Iterator it = DefaultStreamingIndexingManager.this._zoieSystemMap.keySet().iterator();
                while (it.hasNext()) {
                    int intValue = ((Integer) it.next()).intValue();
                    Zoie zoie = (Zoie) DefaultStreamingIndexingManager.this._zoieSystemMap.get(Integer.valueOf(intValue));
                    if (zoie != null && (linkedList = (LinkedList) DefaultStreamingIndexingManager.this._dataCollectorMap.get(Integer.valueOf(intValue))) != null) {
                        if (linkedList.size() == 0) {
                            JSONUtil.FastJSONObject fastJSONObject = new JSONUtil.FastJSONObject();
                            fastJSONObject.put("type", SenseiSchema.EVENT_TYPE_SKIP);
                            fastJSONObject.put(this._uidField, 0L);
                            linkedList.add(new DataConsumer.DataEvent(fastJSONObject, this._currentVersion));
                        } else if (this._currentVersion != null && !this._currentVersion.equals(((DataConsumer.DataEvent) linkedList.getLast()).getVersion())) {
                            DataConsumer.DataEvent dataEvent2 = (DataConsumer.DataEvent) linkedList.pollLast();
                            linkedList.add(new DataConsumer.DataEvent(dataEvent2.getData(), this._currentVersion, dataEvent2.isDelete()));
                        }
                        zoie.consume(linkedList);
                    }
                    DefaultStreamingIndexingManager.this._dataCollectorMap.put(Integer.valueOf(intValue), new LinkedList());
                }
            } catch (Exception e) {
                throw new ZoieException(e.getMessage(), e);
            }
        }

        public String getVersion() {
            return this._currentVersion;
        }

        public Comparator<String> getVersionComparator() {
            return DefaultStreamingIndexingManager.this._versionComparator;
        }
    }

    public DefaultStreamingIndexingManager(SenseiSchema senseiSchema, Configuration configuration, SenseiPluginRegistry senseiPluginRegistry, SenseiGateway<?> senseiGateway, ShardingStrategy shardingStrategy, PluggableSearchEngineManager pluggableSearchEngineManager) {
        this._myconfig = configuration.subset(CONFIG_PREFIX);
        this._eventCreatedTimestampField = this._myconfig.getString(EVENT_CREATED_TIMESTAMP_FIELD, (String) null);
        this.pluginRegistry = senseiPluginRegistry;
        this._senseiSchema = senseiSchema;
        this._gateway = senseiGateway;
        this.pluggableSearchEngineManager = pluggableSearchEngineManager;
        if (this._gateway != null) {
            this._versionComparator = this._gateway.getVersionComparator();
        } else {
            this._versionComparator = ZoieConfig.DEFAULT_VERSION_COMPARATOR;
        }
        this._shardingStrategy = shardingStrategy;
    }

    public void updateOldestSinceKey(String str) {
        if (this._oldestSinceKey == null) {
            this._oldestSinceKey = str;
            if (this._dataProvider != null) {
                this._dataProvider.setStartingOffset(this._oldestSinceKey);
                return;
            }
            return;
        }
        if (str == null || this._versionComparator.compare(str, this._oldestSinceKey) >= 0) {
            return;
        }
        this._oldestSinceKey = str;
        if (this._dataProvider != null) {
            this._dataProvider.setStartingOffset(this._oldestSinceKey);
        }
    }

    @Override // com.senseidb.search.node.SenseiIndexingManager
    public void initialize(Map<Integer, Zoie<BoboSegmentReader, JSONObject>> map) throws Exception {
        DataDispatcher dataDispatcher = new DataDispatcher(this._myconfig.getInt(MAX_PARTITION_ID) + 1, this._senseiSchema.getUidField());
        this._zoieSystemMap = map;
        Iterator<Integer> it = map.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            updateOldestSinceKey(map.get(Integer.valueOf(intValue)).getVersion());
            this._dataCollectorMap.put(Integer.valueOf(intValue), new LinkedList());
        }
        if (this.pluggableSearchEngineManager != null && this.pluggableSearchEngineManager.getOldestVersion() != null && !"".equals(this.pluggableSearchEngineManager.getOldestVersion())) {
            updateOldestSinceKey(this.pluggableSearchEngineManager.getOldestVersion());
        }
        this._dataProvider = buildDataProvider();
        if (this._dataProvider != null) {
            this._dataProvider.setDataConsumer(dataDispatcher);
        }
    }

    @Override // com.senseidb.search.node.SenseiIndexingManager
    public DataProvider<JSONObject> getDataProvider() {
        return this._dataProvider;
    }

    private StreamDataProvider<JSONObject> buildDataProvider() throws ConfigurationException {
        StreamDataProvider<JSONObject> streamDataProvider = null;
        if (this._gateway != null) {
            try {
                streamDataProvider = this._gateway.buildDataProvider(this._senseiSchema, this._oldestSinceKey, this.pluginRegistry, this._shardingStrategy, this._zoieSystemMap.keySet());
                streamDataProvider.setMaxEventsPerMinute(this._myconfig.getLong(EVTS_PER_MIN, 40000L));
                streamDataProvider.setBatchSize(this._myconfig.getInt(BATCH_SIZE, 1));
                streamDataProvider.setMaxVolatileTime(this._myconfig.getLong(VOLATILE_TIME, Long.MAX_VALUE));
                try {
                    JmxUtil.registerMBean(new StandardMBean(new DataProviderAdmin(streamDataProvider), DataProviderAdminMBean.class), "indexing-manager", "stream-data-provider");
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            } catch (Exception e2) {
                throw new ConfigurationException(e2.getMessage(), e2);
            }
        }
        return streamDataProvider;
    }

    @Override // com.senseidb.search.node.SenseiIndexingManager
    public void shutdown() {
        if (this.pluggableSearchEngineManager != null) {
            this.pluggableSearchEngineManager.close();
        }
        if (this._dataProvider != null) {
            this._dataProvider.stop();
        }
    }

    @Override // com.senseidb.search.node.SenseiIndexingManager
    public void start() throws Exception {
        if (this._dataProvider == null) {
            logger.warn("no data stream configured, no indexing events are flowing.");
        } else {
            this._dataProvider.start();
        }
    }

    @Override // com.senseidb.search.node.SenseiIndexingManager
    public void syncWithVersion(long j, String str) throws ZoieException {
        Iterator<Integer> it = this._zoieSystemMap.keySet().iterator();
        while (it.hasNext()) {
            Zoie<BoboSegmentReader, JSONObject> zoie = this._zoieSystemMap.get(Integer.valueOf(it.next().intValue()));
            if (zoie != null) {
                zoie.syncWithVersion(j, str);
            }
        }
    }

    static {
        ProviderBatchSizeMeter = null;
        EventMeter = null;
        UpdateBatchSizeMeter = null;
        IndexingLatencyTimer = null;
        try {
            ProviderBatchSizeMeter = Metrics.newMeter(new MetricName(MetricsConstants.Domain, "meter", "provider-batch-size", "indexing-manager"), "provide-batch-size", TimeUnit.SECONDS);
            UpdateBatchSizeMeter = Metrics.newMeter(new MetricName(MetricsConstants.Domain, "meter", "update-batch-size", "indexing-manager"), "update-batch-size", TimeUnit.SECONDS);
            EventMeter = Metrics.newMeter(new MetricName(MetricsConstants.Domain, "meter", "indexing-events", "indexing-manager"), "indexing-events", TimeUnit.SECONDS);
            IndexingLatencyTimer = Metrics.newTimer(new MetricName(MetricsConstants.Domain, "timer", "indexing-latency", "indexing-manager"), TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }
}
