package com.senseidb.dataprovider.http;

import java.io.IOException;
import java.io.InputStream;
import java.util.Comparator;
import java.util.Iterator;
import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.HeaderElement;
import org.apache.http.HttpEntity;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.HttpVersion;
import org.apache.http.StatusLine;
import org.apache.http.client.entity.GzipDecompressingEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.scheme.PlainSocketFactory;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.scheme.SchemeRegistry;
import org.apache.http.conn.scheme.SchemeSocketFactory;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.SingleClientConnManager;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.protocol.HttpContext;
import org.apache.log4j.Logger;
import proj.zoie.api.DataConsumer;
import proj.zoie.impl.indexing.StreamDataProvider;

/* loaded from: input_file:com/senseidb/dataprovider/http/HttpStreamDataProvider.class */
public abstract class HttpStreamDataProvider<D> extends StreamDataProvider<D> implements HttpDataProviderAdminMBean {
    private static final Logger logger = Logger.getLogger(HttpStreamDataProvider.class);
    protected final String _baseUrl;
    private final ClientConnectionManager _httpClientManager;
    private DefaultHttpClient _httpclient;
    public static final int DEFAULT_TIMEOUT_MS = 10000;
    public static final int DEFAULT_RETRYTIME_MS = 5000;
    public static final String DEFAULT_OFFSET_PARAM = "offset";
    public static final String DFEAULT_DATA_PARAM = "data";
    protected final int _fetchSize;
    protected final String _password;
    protected String _offset;
    protected String _initialOffset;
    private final boolean _disableHttps;
    private Iterator<DataConsumer.DataEvent<D>> _currentDataIter;
    private volatile boolean _stopped;
    private int _retryTime;
    private volatile long _httpGetLatency;
    private volatile long _responseParseLatency;

    public HttpStreamDataProvider(Comparator<String> comparator, String str, String str2, int i, String str3, boolean z) {
        super(comparator);
        this._baseUrl = str;
        this._password = str2;
        this._fetchSize = i;
        this._offset = str3;
        this._disableHttps = z;
        this._initialOffset = null;
        this._currentDataIter = null;
        this._stopped = true;
        this._httpGetLatency = 0L;
        this._responseParseLatency = 0L;
        Scheme scheme = new Scheme("http", 80, (SchemeSocketFactory) PlainSocketFactory.getSocketFactory());
        SchemeRegistry schemeRegistry = new SchemeRegistry();
        schemeRegistry.register(scheme);
        BasicHttpParams basicHttpParams = new BasicHttpParams();
        basicHttpParams.setParameter("http.protocol.version", HttpVersion.HTTP_1_1);
        basicHttpParams.setParameter("http.protocol.content-charset", "UTF-8");
        basicHttpParams.setIntParameter("http.connection.timeout", DEFAULT_RETRYTIME_MS);
        basicHttpParams.setIntParameter("http.socket.linger", 0);
        basicHttpParams.setBooleanParameter("http.tcp.nodelay", true);
        basicHttpParams.setIntParameter("http.socket.timeout", DEFAULT_RETRYTIME_MS);
        basicHttpParams.setIntParameter("http.socket.buffer-size", 1048576);
        basicHttpParams.setBooleanParameter("http.socket.reuseaddr", true);
        this._httpClientManager = new SingleClientConnManager(schemeRegistry);
        this._httpclient = new DefaultHttpClient(this._httpClientManager, basicHttpParams);
        if (!this._disableHttps) {
            this._httpclient = HttpsClientDecorator.decorate(this._httpclient);
        }
        this._httpclient.addRequestInterceptor(new HttpRequestInterceptor() { // from class: com.senseidb.dataprovider.http.HttpStreamDataProvider.1
            @Override // org.apache.http.HttpRequestInterceptor
            public void process(HttpRequest httpRequest, HttpContext httpContext) throws HttpException, IOException {
                if (httpRequest.containsHeader("Accept-Encoding")) {
                    return;
                }
                httpRequest.addHeader("Accept-Encoding", "gzip");
            }
        });
        this._httpclient.addResponseInterceptor(new HttpResponseInterceptor() { // from class: com.senseidb.dataprovider.http.HttpStreamDataProvider.2
            @Override // org.apache.http.HttpResponseInterceptor
            public void process(HttpResponse httpResponse, HttpContext httpContext) throws HttpException, IOException {
                Header contentEncoding = httpResponse.getEntity().getContentEncoding();
                if (contentEncoding != null) {
                    for (HeaderElement headerElement : contentEncoding.getElements()) {
                        if (headerElement.getName().equalsIgnoreCase("gzip")) {
                            httpResponse.setEntity(new GzipDecompressingEntity(httpResponse.getEntity()));
                            return;
                        }
                    }
                }
            }
        });
        this._retryTime = DEFAULT_RETRYTIME_MS;
    }

    public void setRetryTime(int i) {
        this._retryTime = i;
    }

    public int getRetryTime() {
        return this._retryTime;
    }

    public void setStartingOffset(String str) {
        this._initialOffset = str;
    }

    protected abstract String buildGetString(String str);

    protected abstract Iterator<DataConsumer.DataEvent<D>> parse(InputStream inputStream) throws Exception;

    private Iterator<DataConsumer.DataEvent<D>> fetchBatch() throws HttpException {
        try {
            try {
                HttpGet httpGet = new HttpGet(buildGetString(this._offset));
                long currentTimeMillis = System.currentTimeMillis();
                HttpResponse execute = this._httpclient.execute(httpGet);
                this._httpGetLatency = System.currentTimeMillis() - currentTimeMillis;
                HttpEntity entity = execute.getEntity();
                StatusLine statusLine = execute.getStatusLine();
                if (statusLine.getStatusCode() >= 400) {
                    try {
                        IOUtils.closeQuietly(entity.getContent());
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                    throw new HttpException(statusLine.getReasonPhrase());
                }
                try {
                    InputStream content = entity.getContent();
                    long currentTimeMillis2 = System.currentTimeMillis();
                    Iterator<DataConsumer.DataEvent<D>> parse = parse(content);
                    this._responseParseLatency = System.currentTimeMillis() - currentTimeMillis2;
                    if (content != null) {
                        IOUtils.closeQuietly(content);
                    }
                    return parse;
                } catch (Exception e2) {
                    logger.error(e2.getMessage(), e2);
                    httpGet.abort();
                    throw new HttpException(e2.getMessage(), e2);
                }
            } catch (IOException e3) {
                throw new HttpException(e3.getMessage(), e3);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                IOUtils.closeQuietly((InputStream) null);
            }
            throw th;
        }
    }

    public DataConsumer.DataEvent<D> next() {
        if (this._stopped) {
            return null;
        }
        if (this._currentDataIter == null || !this._currentDataIter.hasNext()) {
            while (true) {
                if (this._stopped) {
                    break;
                }
                try {
                    Iterator<DataConsumer.DataEvent<D>> fetchBatch = fetchBatch();
                    if (fetchBatch != null && fetchBatch.hasNext()) {
                        this._currentDataIter = fetchBatch;
                        break;
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug("no more data");
                    }
                    synchronized (this) {
                        try {
                            wait(this._retryTime);
                        } catch (InterruptedException e) {
                            return null;
                        }
                    }
                    return null;
                } catch (HttpException e2) {
                    logger.error(e2.getMessage(), e2);
                    try {
                        logger.error("retrying in " + this._retryTime + "ms");
                        synchronized (this) {
                            wait(this._retryTime);
                        }
                    } catch (InterruptedException e3) {
                        return null;
                    }
                }
            }
        }
        DataConsumer.DataEvent<D> dataEvent = null;
        if (this._currentDataIter != null && this._currentDataIter.hasNext()) {
            dataEvent = this._currentDataIter.next();
            if (dataEvent != null) {
                this._offset = dataEvent.getVersion();
            }
        }
        return dataEvent;
    }

    public void reset() {
        if (this._initialOffset != null) {
            this._offset = this._initialOffset;
        }
    }

    @Override // com.senseidb.dataprovider.http.HttpDataProviderAdminMBean
    public long getHttpGetLatency() {
        return this._httpGetLatency;
    }

    @Override // com.senseidb.dataprovider.http.HttpDataProviderAdminMBean
    public long getResponseParseLatency() {
        return this._responseParseLatency;
    }

    public void start() {
        super.start();
        this._stopped = false;
    }

    public void stop() {
        synchronized (this) {
            this._stopped = true;
            notifyAll();
        }
        try {
            super.stop();
            if (this._httpClientManager != null) {
                this._httpClientManager.shutdown();
            }
        } catch (Throwable th) {
            if (this._httpClientManager != null) {
                this._httpClientManager.shutdown();
            }
            throw th;
        }
    }
}
